You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by st...@apache.org on 2011/06/04 21:46:00 UTC
svn commit: r1131480 - in /hbase/trunk: CHANGES.txt
src/main/java/org/apache/hadoop/hbase/io/WritableDelayed.java
src/main/java/org/apache/hadoop/hbase/ipc/HBaseServer.java
Author: stack
Date: Sat Jun 4 19:46:00 2011
New Revision: 1131480
URL: http://svn.apache.org/viewvc?rev=1131480&view=rev
Log:
HBASE-3899 enhance HBase RPC to support free-ing up server handler threads even if response is not ready -- BACKED IT OUT
Modified:
hbase/trunk/CHANGES.txt
hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/WritableDelayed.java
hbase/trunk/src/main/java/org/apache/hadoop/hbase/ipc/HBaseServer.java
Modified: hbase/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hbase/trunk/CHANGES.txt?rev=1131480&r1=1131479&r2=1131480&view=diff
==============================================================================
--- hbase/trunk/CHANGES.txt (original)
+++ hbase/trunk/CHANGES.txt Sat Jun 4 19:46:00 2011
@@ -276,8 +276,6 @@ Release 0.91.0 - Unreleased
RS web UIs
HBASE-3691 Add compressor support for 'snappy', google's compressor
(Nichole Treadway and Nicholas Telford)
- HBASE-3899 enhance HBase RPC to support free-ing up server handler threads
- even if response is not ready (Dhruba Borthakur)
Release 0.90.4 - Unreleased
Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/WritableDelayed.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/WritableDelayed.java?rev=1131480&r1=1131479&r2=1131480&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/WritableDelayed.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/WritableDelayed.java Sat Jun 4 19:46:00 2011
@@ -1,33 +0,0 @@
-/*
- * Copyright 2010 The Apache Software Foundation
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.hbase.io;
-
-/**
- * An optional interface to indicate the the values are not immediately
- * readable.
- */
-public interface WritableDelayed {
- /**
- * Provide a hint to the caller to indicate that
- * data is not ready yet.
- */
- public boolean isDelayed();
-}
Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/ipc/HBaseServer.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/ipc/HBaseServer.java?rev=1131480&r1=1131479&r2=1131480&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/ipc/HBaseServer.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/ipc/HBaseServer.java Sat Jun 4 19:46:00 2011
@@ -57,7 +57,6 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.io.WritableWithSize;
-import org.apache.hadoop.hbase.io.WritableDelayed;
import org.apache.hadoop.hbase.util.ByteBufferOutputStream;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.Writable;
@@ -98,7 +97,7 @@ public abstract class HBaseServer implem
/** Default value for above param */
private static final int DEFAULT_WARN_RESPONSE_SIZE = 100 * 1024 * 1024;
- private static int warnResponseSize;
+ private final int warnResponseSize;
public static final Log LOG =
LogFactory.getLog("org.apache.hadoop.ipc.HBaseServer");
@@ -151,25 +150,6 @@ public abstract class HBaseServer implem
}
return null;
}
- /**
- * If invoked from inside an RPC, startDelay tells the current call to not
- * send out * responses back to the client until endDelay() is invoked.
- */
- public static void startDelay(Object o) {
- Call call = (Call) o;
- if (call != null) {
- call.startDelay();
- }
- }
- public static void endDelay(Object o) throws IOException {
- Call call = (Call) o;
- if (call != null) {
- call.endDelay();
- }
- }
- public static Object getCall() {
- return CurCall.get();
- }
/** Returns remote address as a string when invoked inside an RPC.
* Returns null in case of an error.
* @return String
@@ -261,19 +241,13 @@ public abstract class HBaseServer implem
protected long timestamp; // the time received when response is null
// the time served when response is not null
protected ByteBuffer response; // the response for this call
- private boolean delayResponse; // shall the RPC layer delay the response?
- private boolean doneResponse; // have we already responded to the client?
- private Responder responder; // the thread that sends out responses
- public Call(int id, Writable param, Connection connection, Responder responder) {
+ public Call(int id, Writable param, Connection connection) {
this.id = id;
this.param = param;
this.connection = connection;
this.timestamp = System.currentTimeMillis();
this.response = null;
- this.delayResponse = false;
- this.doneResponse = false;
- this.responder = responder;
}
@Override
@@ -281,40 +255,9 @@ public abstract class HBaseServer implem
return param.toString() + " from " + connection.toString();
}
- public synchronized void setResponse(ByteBuffer response) {
+ public void setResponse(ByteBuffer response) {
this.response = response;
}
-
- /**
- * Delay this call, do not send its response back to the client
- */
- public synchronized void startDelay() {
- this.delayResponse = true;
- }
-
- /**
- * It is ok to send responses back to the client now. This is
- * typically called by a non-server-handler thread.
- */
- public synchronized void endDelay() throws IOException {
- this.delayResponse = false;
- if (response != null && !doneResponse) {
- doneResponse = true;
- responder.doRespond(this);
- }
- }
-
- /*
- * If we have a reponse, and delay is not set, then respond
- * immediately. Otherwise, do not respond to client. This is
- * called the by the RPC code in the context of the Handler thread.
- */
- public synchronized void sendResponseIfReady() throws IOException {
- if (response != null && !doneResponse && !delayResponse) {
- doneResponse = true; // going to respond as soon as this call returns.
- responder.doRespond(this);
- }
- }
}
/** Listens on the socket. Creates jobs for the handler threads*/
@@ -857,40 +800,15 @@ public abstract class HBaseServer implem
}
//
- // Enqueue for background thread to send responses out later.
- //
- private void enqueueInSelector(Call call) throws IOException {
- incPending();
- try {
- // Wakeup the thread blocked on select, only then can the call
- // to channel.register() complete.
- SocketChannel channel = call.connection.channel;
- writeSelector.wakeup();
- channel.register(writeSelector, SelectionKey.OP_WRITE, call);
- } catch (ClosedChannelException e) {
- //Its ok. channel might be closed else where.
- } finally {
- decPending();
- }
- }
-
- //
// Enqueue a response from the application.
//
void doRespond(Call call) throws IOException {
- // set the serve time when the response has to be sent later
- call.timestamp = System.currentTimeMillis();
-
- boolean doRegister = false;
synchronized (call.connection.responseQueue) {
call.connection.responseQueue.addLast(call);
if (call.connection.responseQueue.size() == 1) {
- doRegister = !processResponse(call.connection.responseQueue, false);
+ processResponse(call.connection.responseQueue, true);
}
}
- if (doRegister) {
- enqueueInSelector(call); // tell background thread to send it out later
- }
}
private synchronized void incPending() { // call waiting to be enqueued.
@@ -1085,7 +1003,7 @@ public abstract class HBaseServer implem
Writable param = ReflectionUtils.newInstance(paramClass, conf); // read param
param.readFields(dis);
- Call call = new Call(id, param, this, responder);
+ Call call = new Call(id, param, this);
if (priorityCallQueue != null && getQosLevel(param) > highPriorityLevel) {
priorityCallQueue.put(call);
@@ -1152,10 +1070,6 @@ public abstract class HBaseServer implem
}
CurCall.set(null);
- if (!(value instanceof WritableDelayed)) {
- processRpcResponse(call, value, error, errorClass);
- }
-
int size = BUFFER_INITIAL_SIZE;
if (value instanceof WritableWithSize) {
// get the size hint.
@@ -1192,7 +1106,7 @@ public abstract class HBaseServer implem
call.setResponse(buf.getByteBuffer());
- call.sendResponseIfReady(); // maybe delay sending out response to the client
+ responder.doRespond(call);
} catch (InterruptedException e) {
if (running) { // unexpected -- log it
LOG.info(getName() + " caught: " +
@@ -1218,51 +1132,6 @@ public abstract class HBaseServer implem
}
- // Packages the rpc invocation response back into the call.
- static public void processRpcResponse(Object callobj, Writable value,
- String error, String errorClass) throws IOException {
- Call call = (Call)callobj;
-
- int size = Handler.BUFFER_INITIAL_SIZE;
- if (value instanceof WritableWithSize) {
- // get the size hint.
- WritableWithSize ohint = (WritableWithSize)value;
- long hint = ohint.getWritableSize() + Bytes.SIZEOF_BYTE + Bytes.SIZEOF_INT;
- if (hint > 0) {
- if ((hint) > Integer.MAX_VALUE) {
- // oops, new problem.
- IOException ioe =
- new IOException("Result buffer size too large: " + hint);
- errorClass = ioe.getClass().getName();
- error = StringUtils.stringifyException(ioe);
- } else {
- size = (int)hint;
- }
- }
- }
- ByteBufferOutputStream buf = new ByteBufferOutputStream(size);
- DataOutputStream out = new DataOutputStream(buf);
- out.writeInt(call.id); // write call id
- out.writeBoolean(error != null); // write error flag
-
- if (error == null) {
- value.write(out);
- } else {
- WritableUtils.writeString(out, errorClass);
- WritableUtils.writeString(out, error);
- }
-
- if (buf.size() > warnResponseSize) {
- LOG.warn("responseTooLarge for: "+call+": Size: "
- + StringUtils.humanReadableInt(buf.size()));
- }
- if (LOG.isDebugEnabled()) {
- LOG.debug("Served: " + ((Invocation)call.param).getMethodName());
- }
- call.setResponse(buf.getByteBuffer());
- call.sendResponseIfReady(); // maybe delay sending out response to the client
- }
-
/**
* Gets the QOS level for this call. If it is higher than the highPriorityLevel and there
* are priorityHandlers available it will be processed in it's own thread set.