You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by en...@apache.org on 2014/03/05 07:35:22 UTC
svn commit: r1574385 - in /hbase/branches/hbase-10070: ./ dev-support/
hbase-client/src/main/java/org/apache/hadoop/hbase/client/
hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/
hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/ hbase-serv...
Author: enis
Date: Wed Mar 5 06:35:21 2014
New Revision: 1574385
URL: http://svn.apache.org/r1574385
Log:
HBASE-10525 Allow the client to use a different thread for writing to ease interrupt (Nicolas Liochon)
Modified:
hbase/branches/hbase-10070/ (props changed)
hbase/branches/hbase-10070/dev-support/findbugs-exclude.xml
hbase/branches/hbase-10070/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionManager.java
hbase/branches/hbase-10070/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClient.java
hbase/branches/hbase-10070/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java
hbase/branches/hbase-10070/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestHCM.java
Propchange: hbase/branches/hbase-10070/
------------------------------------------------------------------------------
Merged /hbase/trunk:r1571210
Modified: hbase/branches/hbase-10070/dev-support/findbugs-exclude.xml
URL: http://svn.apache.org/viewvc/hbase/branches/hbase-10070/dev-support/findbugs-exclude.xml?rev=1574385&r1=1574384&r2=1574385&view=diff
==============================================================================
--- hbase/branches/hbase-10070/dev-support/findbugs-exclude.xml (original)
+++ hbase/branches/hbase-10070/dev-support/findbugs-exclude.xml Wed Mar 5 06:35:21 2014
@@ -115,6 +115,14 @@
</Match>
<Match>
+ <Class name="org.apache.hadoop.hbase.ipc.RpcClient$Connection"/>
+ <Or>
+ <Bug pattern="IS2_INCONSISTENT_SYNC"/>
+ <Bug pattern="NN_NAKED_NOTIFY"/>
+ </Or>
+ </Match>
+
+ <Match>
<Class name="org.apache.hadoop.hbase.regionserver.HRegion"/>
<Or>
<Method name="startRegionOperation"/>
Modified: hbase/branches/hbase-10070/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionManager.java
URL: http://svn.apache.org/viewvc/hbase/branches/hbase-10070/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionManager.java?rev=1574385&r1=1574384&r2=1574385&view=diff
==============================================================================
--- hbase/branches/hbase-10070/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionManager.java (original)
+++ hbase/branches/hbase-10070/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionManager.java Wed Mar 5 06:35:21 2014
@@ -738,6 +738,7 @@ class ConnectionManager {
* @param rpcClient Client we should use instead.
* @return Previous rpcClient
*/
+ @VisibleForTesting
RpcClient setRpcClient(final RpcClient rpcClient) {
RpcClient oldRpcClient = this.rpcClient;
this.rpcClient = rpcClient;
@@ -745,6 +746,14 @@ class ConnectionManager {
}
/**
+ * For tests only.
+ */
+ @VisibleForTesting
+ RpcClient getRpcClient() {
+ return rpcClient;
+ }
+
+ /**
* An identifier that will remain the same for a given connection.
* @return
*/
Modified: hbase/branches/hbase-10070/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClient.java
URL: http://svn.apache.org/viewvc/hbase/branches/hbase-10070/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClient.java?rev=1574385&r1=1574384&r2=1574385&view=diff
==============================================================================
--- hbase/branches/hbase-10070/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClient.java (original)
+++ hbase/branches/hbase-10070/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClient.java Wed Mar 5 06:35:21 2014
@@ -70,11 +70,13 @@ import org.apache.hadoop.security.token.
import org.apache.hadoop.security.token.TokenSelector;
import org.cloudera.htrace.Span;
import org.cloudera.htrace.Trace;
+import org.cloudera.htrace.TraceScope;
import javax.net.SocketFactory;
import javax.security.sasl.SaslException;
import java.io.BufferedInputStream;
import java.io.BufferedOutputStream;
+import java.io.Closeable;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
@@ -96,6 +98,8 @@ import java.util.LinkedList;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Random;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
@@ -115,13 +119,13 @@ public class RpcClient {
protected final AtomicInteger callIdCnt = new AtomicInteger();
protected final AtomicBoolean running = new AtomicBoolean(true); // if client runs
final protected Configuration conf;
- final protected int minIdleTimeBeforeClose; // if the connection is iddle for more than this
+ protected final int minIdleTimeBeforeClose; // if the connection is iddle for more than this
// time (in ms), it will be closed at any moment.
final protected int maxRetries; //the max. no. of retries for socket connections
final protected long failureSleep; // Time to sleep before retry on failure.
protected final boolean tcpNoDelay; // if T then disable Nagle's Algorithm
protected final boolean tcpKeepAlive; // if T then use keepalives
- protected FailedServers failedServers;
+ protected final FailedServers failedServers;
private final Codec codec;
private final CompressionCodec compressor;
private final IPCUtil ipcUtil;
@@ -140,10 +144,14 @@ public class RpcClient {
public final static String FAILED_SERVER_EXPIRY_KEY = "hbase.ipc.client.failed.servers.expiry";
public final static int FAILED_SERVER_EXPIRY_DEFAULT = 2000;
+ public final static String IDLE_TIME = "hbase.ipc.client.connection.minIdleTimeBeforeClose";
+
public static final String IPC_CLIENT_FALLBACK_TO_SIMPLE_AUTH_ALLOWED_KEY =
"hbase.ipc.client.fallback-to-simple-auth-allowed";
public static final boolean IPC_CLIENT_FALLBACK_TO_SIMPLE_AUTH_ALLOWED_DEFAULT = false;
+ public static final String ALLOWS_INTERRUPTS = "hbase.ipc.client.allowsInterrupt";
+
// thread-specific RPC timeout, which may override that of what was passed in.
// This is used to change dynamically the timeout (for read only) when retrying: if
// the time allowed for the operation is less than the usual socket timeout, then
@@ -224,15 +232,6 @@ public class RpcClient {
}
/**
- * Set the socket timeout
- * @param conf Configuration
- * @param socketTimeout the socket timeout
- */
- public static void setSocketTimeout(Configuration conf, int socketTimeout) {
- conf.setInt(SOCKET_TIMEOUT, socketTimeout);
- }
-
- /**
* @return the socket timeout
*/
static int getSocketTimeout(Configuration conf) {
@@ -252,7 +251,7 @@ public class RpcClient {
// The return type. Used to create shell into which we deserialize the response if any.
Message responseDefaultType;
IOException error; // exception, null if value
- boolean done; // true when call is done
+ volatile boolean done; // true when call is done
long startTime;
final MethodDescriptor md;
@@ -261,7 +260,7 @@ public class RpcClient {
this.param = param;
this.md = md;
this.cells = cells;
- this.startTime = System.currentTimeMillis();
+ this.startTime = EnvironmentEdgeManager.currentTimeMillis();
this.responseDefaultType = responseDefaultType;
this.id = callIdCnt.getAndIncrement();
}
@@ -325,6 +324,24 @@ public class RpcClient {
return new Connection(remoteId, codec, compressor);
}
+ /**
+ * see {@link org.apache.hadoop.hbase.ipc.RpcClient.Connection.CallSender}
+ */
+ private static class CallFuture {
+ Call call;
+ int priority;
+ Span span;
+
+ // We will use this to stop the writer
+ final static CallFuture DEATH_PILL = new CallFuture(null, -1, null);
+
+ CallFuture(Call call, int priority, Span span) {
+ this.call = call;
+ this.priority = priority;
+ this.span = span;
+ }
+ }
+
/** Thread that reads responses and notifies callers. Each connection owns a
* socket connected to a remote address. Calls are multiplexed through this
* socket: responses may be delivered out of order. */
@@ -349,6 +366,123 @@ public class RpcClient {
new ConcurrentSkipListMap<Integer, Call>();
protected final AtomicBoolean shouldCloseConnection = new AtomicBoolean();
+ protected final CallSender callSender;
+
+
+ /**
+ * If the client wants to interrupt its calls easily (i.e. call Thread#interrupt),
+ * it gets into a java issue: an interruption during a write closes the socket/channel.
+ * A way to avoid this is to use a different thread for writing. This way, on interruptions,
+ * we either cancel the writes or ignore the answer if the write is already done, but we
+ * don't stop the write in the middle.
+ * This adds a thread per region server in the client, so it's kept as an option.
+ * <p>
+ * The implementation is simple: the client threads adds their call to the queue, and then
+ * wait for an answer. The CallSender blocks on the queue, and writes the calls one
+ * after the other. On interruption, the client cancels its call. The CallSender checks that
+ * the call has not been canceled before writing it.
+ * </p>
+ * When the connection closes, all the calls not yet sent are dismissed. The client thread
+ * is notified with an appropriate exception, as if the call was already sent but the answer
+ * not yet received.
+ * </p>
+ */
+ private class CallSender extends Thread implements Closeable {
+ protected final BlockingQueue<CallFuture> callsToWrite;
+
+
+ public CallFuture sendCall(Call call, int priority, Span span)
+ throws InterruptedException, IOException {
+ CallFuture cts = new CallFuture(call, priority, span);
+ callsToWrite.add(cts);
+ checkIsOpen(); // We check after the put, to be sure that the call we added won't stay
+ // in the list while the cleanup was already done.
+ return cts;
+ }
+
+ public void close(){
+ assert shouldCloseConnection.get();
+ callsToWrite.offer(CallFuture.DEATH_PILL);
+ // We don't care if we can't add the death pill to the queue: the writer
+ // won't be blocked in the 'take', as its queue is full.
+ }
+
+ CallSender(String name, Configuration conf) {
+ int queueSize = conf.getInt("hbase.ipc.client.write.queueSize", 1000);
+ callsToWrite = new ArrayBlockingQueue<CallFuture>(queueSize);
+ setDaemon(true);
+ setName(name + " - writer");
+ }
+
+ public void cancel(CallFuture cts){
+ cts.call.done = true;
+ callsToWrite.remove(cts);
+ calls.remove(cts.call.id);
+ }
+
+ /**
+ * Reads the call from the queue, write them on the socket.
+ */
+ @Override
+ public void run() {
+ while (!shouldCloseConnection.get()) {
+ CallFuture cts = null;
+ try {
+ cts = callsToWrite.take();
+ } catch (InterruptedException e) {
+ markClosed(new InterruptedIOException());
+ }
+
+ if (cts == null || cts == CallFuture.DEATH_PILL){
+ assert shouldCloseConnection.get();
+ break;
+ }
+
+ if (cts.call.done) {
+ continue;
+ }
+
+ if (remoteId.rpcTimeout > 0) {
+ long waitTime = EnvironmentEdgeManager.currentTimeMillis() - cts.call.getStartTime();
+ if (waitTime >= remoteId.rpcTimeout) {
+ IOException ie = new CallTimeoutException("Call id=" + cts.call.id +
+ ", waitTime=" + waitTime + ", rpcTimetout=" + remoteId.rpcTimeout +
+ ", expired before being sent to the server.");
+ cts.call.setException(ie); // includes a notify
+ continue;
+ }
+ }
+
+ try {
+ Connection.this.tracedWriteRequest(cts.call, cts.priority, cts.span);
+ } catch (IOException e) {
+ LOG.warn("call write error for call #" + cts.call.id + ", message =" + e.getMessage());
+ cts.call.setException(e);
+ markClosed(e);
+ }
+ }
+
+ cleanup();
+ }
+
+ /**
+ * Cleans the call not yet sent when we finish.
+ */
+ private void cleanup() {
+ assert shouldCloseConnection.get();
+
+ IOException ie = new IOException("Connection to " + server + " is closing.");
+ while (true) {
+ CallFuture cts = callsToWrite.poll();
+ if (cts == null) {
+ break;
+ }
+ if (cts.call != null && !cts.call.done) {
+ cts.call.setException(ie);
+ }
+ }
+ }
+ }
Connection(ConnectionId remoteId, final Codec codec, final CompressionCodec compressor)
throws IOException {
@@ -421,6 +555,13 @@ public class RpcClient {
((ticket==null)?" from an unknown user": (" from "
+ ticket.getUserName())));
this.setDaemon(true);
+
+ if (conf.getBoolean(ALLOWS_INTERRUPTS, false)) {
+ callSender = new CallSender(getName(), conf);
+ callSender.start();
+ } else {
+ callSender = null;
+ }
}
private UserInformation getUserInfo(UserGroupInformation ugi) {
@@ -470,7 +611,7 @@ public class RpcClient {
}
}
- protected void closeConnection() {
+ protected synchronized void closeConnection() {
if (socket == null) {
return;
}
@@ -556,23 +697,38 @@ public class RpcClient {
*
* Return true if it is time to read a response; false otherwise.
*/
- protected synchronized boolean waitForWork() throws InterruptedException{
- while (calls.isEmpty() && !shouldCloseConnection.get() && running.get() ) {
- wait(minIdleTimeBeforeClose);
+ protected synchronized boolean waitForWork() throws InterruptedException {
+ // beware of the concurrent access to the calls list: we can add calls, but as well
+ // remove them.
+ long waitUntil = EnvironmentEdgeManager.currentTimeMillis() + minIdleTimeBeforeClose;
+ while (!shouldCloseConnection.get() && running.get() &&
+ EnvironmentEdgeManager.currentTimeMillis() < waitUntil && calls.isEmpty()) {
+ wait(Math.min(minIdleTimeBeforeClose, 1000));
}
- if (!calls.isEmpty() && !shouldCloseConnection.get() && running.get()) {
- return true;
- } else if (shouldCloseConnection.get()) {
- return false;
- } else if (calls.isEmpty()) {
- markClosed(new IOException("idle connection closed or stopped"));
+ if (shouldCloseConnection.get()) {
return false;
- } else { // get stopped but there are still pending requests
- markClosed((IOException)new IOException().initCause(
- new InterruptedException()));
+ }
+
+ if (!running.get()) {
+ markClosed(new IOException("stopped with " + calls.size() + " pending request(s)"));
return false;
}
+
+ if (!calls.isEmpty()) {
+ // shouldCloseConnection can be set to true by a parallel thread here. The caller
+ // will need to check anyway.
+ return true;
+ }
+
+ // Connection is idle.
+ // We expect the number of calls to be zero here, but actually someone can
+ // adds a call at the any moment, as there is no synchronization between this task
+ // and adding new calls. It's not a big issue, but it will get an exception.
+ markClosed(new IOException(
+ "idle connection closed with " + calls.size() + " pending request(s)"));
+
+ return false;
}
public InetSocketAddress getRemoteAddress() {
@@ -590,7 +746,7 @@ public class RpcClient {
readResponse();
}
} catch (Throwable t) {
- LOG.warn(getName() + ": unexpected exception receiving call responses", t);
+ LOG.debug(getName() + ": unexpected exception receiving call responses", t);
markClosed(new IOException("Unexpected exception receiving call responses", t));
}
@@ -811,9 +967,8 @@ public class RpcClient {
/**
* Write the connection header.
- * Out is not synchronized because only the first thread does this.
*/
- private void writeConnectionHeader() throws IOException {
+ private synchronized void writeConnectionHeader() throws IOException {
synchronized (this.out) {
this.out.writeInt(this.header.getSerializedSize());
this.header.writeTo(this.out);
@@ -852,8 +1007,18 @@ public class RpcClient {
cleanupCalls();
- if (LOG.isDebugEnabled())
- LOG.debug(getName() + ": ipc connection closed");
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(getName() + ": ipc connection to " + server + " closed");
+ }
+ }
+
+ protected void tracedWriteRequest(Call call, int priority, Span span) throws IOException {
+ TraceScope ts = Trace.continueSpan(span);
+ try {
+ writeRequest(call, priority, span);
+ } finally {
+ ts.close();
+ }
}
/**
@@ -864,15 +1029,12 @@ public class RpcClient {
* @param priority
* @see #readResponse()
*/
- @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "NN_NAKED_NOTIFY",
- justification = "on close the reader thread must stop")
- protected void writeRequest(Call call, final int priority) throws IOException {
+ private void writeRequest(Call call, final int priority, Span span) throws IOException {
RequestHeader.Builder builder = RequestHeader.newBuilder();
builder.setCallId(call.id);
- if (Trace.isTracing()) {
- Span s = Trace.currentSpan();
- builder.setTraceInfo(RPCTInfo.newBuilder().
- setParentId(s.getSpanId()).setTraceId(s.getTraceId()));
+ if (span != null) {
+ builder.setTraceInfo(
+ RPCTInfo.newBuilder().setParentId(span.getSpanId()).setTraceId(span.getTraceId()));
}
builder.setMethodName(call.md.getName());
builder.setRequestParam(call.param != null);
@@ -890,28 +1052,32 @@ public class RpcClient {
// is still valid, and, if so we do the write to the socket. If the write fails, we don't
// know where we stand, we have to close the connection.
checkIsOpen();
- calls.put(call.id, call); // On error, the call will be removed by the timeout.
- try {
- synchronized (this.out) { // FindBugs IS2_INCONSISTENT_SYNC
- if (Thread.interrupted()) throw new InterruptedIOException();
- checkIsOpen();
+ IOException writeException = null;
+ synchronized (this.out) {
+ if (Thread.interrupted()) throw new InterruptedIOException();
- try {
- IPCUtil.write(this.out, header, call.param, cellBlock);
- } catch (IOException e) {
- // We set the value inside the synchronized block, this way the next in line
- // won't even try to write
- shouldCloseConnection.set(true);
- throw e;
- }
- }
- } finally {
- synchronized (this) {
- // We added a call, and may start the connection clode. In both cases, we
- // need to notify the reader.
- notifyAll();
+ calls.put(call.id, call); // We put first as we don't want the connection to become idle.
+ checkIsOpen(); // Now we're checking that it didn't became idle in between.
+
+ try {
+ IPCUtil.write(this.out, header, call.param, cellBlock);
+ } catch (IOException e) {
+ // We set the value inside the synchronized block, this way the next in line
+ // won't even try to write
+ shouldCloseConnection.set(true);
+ writeException = e;
}
}
+
+ // We added a call, and may be started the connection close. In both cases, we
+ // need to notify the reader.
+ synchronized (this) {
+ notifyAll();
+ }
+
+ // Now that we notified, we can rethrow the exception if any. Otherwise we're good.
+ if (writeException != null) throw writeException;
+
if (LOG.isDebugEnabled()) {
LOG.debug(getName() + ": wrote request header " + TextFormat.shortDebugString(header));
}
@@ -922,7 +1088,7 @@ public class RpcClient {
*/
protected void readResponse() {
if (shouldCloseConnection.get()) return;
- int totalSize = -1;
+ int totalSize;
try {
// See HBaseServer.Call.setResponse for where we write out the response.
// Total size of the response. Unused. But have to read it in anyways.
@@ -936,7 +1102,8 @@ public class RpcClient {
TextFormat.shortDebugString(responseHeader) + ", totalSize: " + totalSize + " bytes");
}
Call call = calls.remove(id);
- if (call == null) {
+ boolean expectedCall = (call != null && !call.done);
+ if (!expectedCall) {
// So we got a response for which we have no corresponding 'call' here on the client-side.
// We probably timed out waiting, cleaned up all references, and now the server decides
// to return a response. There is nothing we can do w/ the response at this stage. Clean
@@ -945,7 +1112,7 @@ public class RpcClient {
int readSoFar = IPCUtil.getTotalSizeWhenWrittenDelimited(responseHeader);
int whatIsLeftToRead = totalSize - readSoFar;
LOG.debug("Unknown callId: " + id + ", skipping over this response of " +
- whatIsLeftToRead + " bytes");
+ whatIsLeftToRead + " bytes");
IOUtils.skipFully(in, whatIsLeftToRead);
}
if (responseHeader.hasException()) {
@@ -954,12 +1121,12 @@ public class RpcClient {
if (isFatalConnectionException(exceptionResponse)) {
markClosed(re);
} else {
- if (call != null) call.setException(re);
+ if (expectedCall) call.setException(re);
}
} else {
Message value = null;
// Call may be null because it may have timedout and been cleaned up on this side already
- if (call != null && call.responseDefaultType != null) {
+ if (expectedCall && call.responseDefaultType != null) {
Builder builder = call.responseDefaultType.newBuilderForType();
builder.mergeDelimitedFrom(in);
value = builder.build();
@@ -973,7 +1140,7 @@ public class RpcClient {
}
// it's possible that this call may have been cleaned up due to a RPC
// timeout, so check if it still exists before setting the value.
- if (call != null) call.setResponse(value, cellBlockScanner);
+ if (expectedCall) call.setResponse(value, cellBlockScanner);
}
} catch (IOException e) {
if (e instanceof SocketTimeoutException && remoteId.rpcTimeout > 0) {
@@ -985,9 +1152,7 @@ public class RpcClient {
markClosed(e);
}
} finally {
- if (remoteId.rpcTimeout > 0) {
- cleanupCalls(remoteId.rpcTimeout);
- }
+ cleanupCalls(remoteId.rpcTimeout);
}
}
@@ -1015,34 +1180,42 @@ public class RpcClient {
e.getStackTrace(), doNotRetry);
}
- @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "NN_NAKED_NOTIFY",
- justification = "on close the reader thread must stop")
- protected void markClosed(IOException e) {
+ protected synchronized void markClosed(IOException e) {
if (e == null) throw new NullPointerException();
if (shouldCloseConnection.compareAndSet(false, true)) {
+ LOG.warn(getName() + ": marking at should close, reason =" + e.getMessage());
if (LOG.isDebugEnabled()) {
- LOG.debug(getName() + ": marking at should closed, reason =" + e.getMessage());
+ LOG.debug(getName() + ": marking at should close, reason =" + e.getMessage());
}
- synchronized (this) {
- notifyAll();
+ if (callSender != null) {
+ callSender.close();
}
+ notifyAll();
}
}
/* Cleanup all calls and mark them as done */
protected void cleanupCalls() {
- cleanupCalls(0);
+ cleanupCalls(-1);
}
- @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="NN_NAKED_NOTIFY",
- justification="Notify because timeout")
- protected void cleanupCalls(long rpcTimeout) {
+ /**
+ * Cleanup the calls older than a given timeout, in milli seconds.
+ * @param rpcTimeout -1 for all calls, > 0 otherwise. 0 means no timeout and does nothing.
+ */
+ protected synchronized void cleanupCalls(long rpcTimeout) {
+ if (rpcTimeout == 0) return;
+
Iterator<Entry<Integer, Call>> itor = calls.entrySet().iterator();
while (itor.hasNext()) {
Call c = itor.next().getValue();
- long waitTime = System.currentTimeMillis() - c.getStartTime();
- if (waitTime >= rpcTimeout) {
+ long waitTime = EnvironmentEdgeManager.currentTimeMillis() - c.getStartTime();
+ if (rpcTimeout < 0) {
+ IOException ie = new IOException("Call id=" + c.id + ", waitTime=" + waitTime);
+ c.setException(ie);
+ itor.remove();
+ } else if (waitTime >= rpcTimeout) {
IOException ie = new CallTimeoutException("Call id=" + c.id +
", waitTime=" + waitTime + ", rpcTimeout=" + rpcTimeout);
c.setException(ie);
@@ -1050,36 +1223,21 @@ public class RpcClient {
} else {
// This relies on the insertion order to be the call id order. This is not
// true under 'difficult' conditions (gc, ...).
+ rpcTimeout -= waitTime;
break;
}
}
- if (!calls.isEmpty()) {
- Call firstCall = calls.get(calls.firstKey());
- long maxWaitTime = System.currentTimeMillis() - firstCall.getStartTime();
- if (maxWaitTime < rpcTimeout) {
- rpcTimeout -= maxWaitTime;
- }
- }
-
- try {
- if (!shouldCloseConnection.get()) {
- setSocketTimeout(socket, (int) rpcTimeout);
+ if (!shouldCloseConnection.get() && socket != null && rpcTimeout > 0) {
+ try {
+ socket.setSoTimeout((int)rpcTimeout);
+ } catch (SocketException e) {
+ LOG.warn("Couldn't change timeout, which may result in longer than expected calls");
}
- } catch (SocketException e) {
- LOG.warn("Couldn't lower timeout, which may result in longer than expected calls");
}
}
}
- @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="IS2_INCONSISTENT_SYNC",
- justification="Presume sync not needed setting socket timeout")
- private static void setSocketTimeout(final Socket socket, final int rpcTimeout)
- throws java.net.SocketException {
- if (socket == null) return;
- socket.setSoTimeout(rpcTimeout);
- }
-
/**
* Client-side call timeout
*/
@@ -1110,8 +1268,7 @@ public class RpcClient {
* @param localAddr client socket bind address
*/
RpcClient(Configuration conf, String clusterId, SocketFactory factory, SocketAddress localAddr) {
- this.minIdleTimeBeforeClose =
- conf.getInt("hbase.ipc.client.connection.minIdleTimeBeforeClose", 120000); // 2 minutes
+ this.minIdleTimeBeforeClose = conf.getInt(IDLE_TIME, 120000); // 2 minutes
this.maxRetries = conf.getInt("hbase.ipc.client.connect.max.retries", 0);
this.failureSleep = conf.getLong(HConstants.HBASE_CLIENT_PAUSE,
HConstants.DEFAULT_HBASE_CLIENT_PAUSE);
@@ -1289,31 +1446,44 @@ public class RpcClient {
* @throws IOException
*/
Pair<Message, CellScanner> call(MethodDescriptor md, Message param, CellScanner cells,
- Message returnType, User ticket, InetSocketAddress addr,
- int rpcTimeout, int priority)
- throws InterruptedException, IOException {
+ Message returnType, User ticket, InetSocketAddress addr, int rpcTimeout, int priority)
+ throws IOException, InterruptedException {
Call call = new Call(md, param, cells, returnType);
Connection connection =
getConnection(ticket, call, addr, rpcTimeout, this.codec, this.compressor);
- connection.writeRequest(call, priority);
+ CallFuture cts = null;
+ if (connection.callSender != null){
+ cts = connection.callSender.sendCall(call, priority, Trace.currentSpan());
+ } else {
+ connection.tracedWriteRequest(call, priority, Trace.currentSpan());
+ }
- //noinspection SynchronizationOnLocalVariableOrMethodParameter
- synchronized (call) {
- while (!call.done) {
- call.wait(1000); // wait for the result
+ while (!call.done) {
+ try {
+ synchronized (call) {
+ call.wait(1000); // wait for the result. We will be notified by the reader.
+ }
+ } catch (InterruptedException e) {
+ if (cts != null) {
+ connection.callSender.cancel(cts);
+ } else {
+ call.done = true;
+ }
+ throw e;
}
+ }
- if (call.error != null) {
- if (call.error instanceof RemoteException) {
- call.error.fillInStackTrace();
- throw call.error;
- }
- // local exception
- throw wrapException(addr, call.error);
+ if (call.error != null) {
+ if (call.error instanceof RemoteException) {
+ call.error.fillInStackTrace();
+ throw call.error;
}
- return new Pair<Message, CellScanner>(call.response, call.cells);
+ // local exception
+ throw wrapException(addr, call.error);
}
+
+ return new Pair<Message, CellScanner>(call.response, call.cells);
}
@@ -1361,9 +1531,8 @@ public class RpcClient {
connection.getRemoteAddress().getHostName().equals(hostname)) {
LOG.info("The server on " + hostname + ":" + port +
" is dead - stopping the connection " + connection.remoteId);
- connection.closeConnection();
- // We could do a connection.interrupt(), but it's safer not to do it, as the
- // interrupted exception behavior is not defined nor enforced enough.
+ connection.interrupt(); // We're interrupting a Reader. It means we want it to finish.
+ // This will close the connection as well.
}
}
}
@@ -1465,10 +1634,6 @@ public class RpcClient {
rpcTimeout.set(t);
}
- public static int getRpcTimeout() {
- return rpcTimeout.get();
- }
-
/**
* Returns the lower of the thread-local RPC time from {@link #setRpcTimeout(int)} and the given
* default timeout.
@@ -1484,18 +1649,10 @@ public class RpcClient {
/**
* Make a blocking call. Throws exceptions if there are network problems or if the remote code
* threw an exception.
- * @param md
- * @param controller
- * @param param
- * @param returnType
- * @param isa
* @param ticket Be careful which ticket you pass. A new user will mean a new Connection.
* {@link UserProvider#getCurrent()} makes a new instance of User each time so will be a
* new Connection each time.
- * @param rpcTimeout
* @return A pair with the Message response and the Cell data (if any).
- * @throws InterruptedException
- * @throws IOException
*/
Message callBlockingMethod(MethodDescriptor md, RpcController controller,
Message param, Message returnType, final User ticket, final InetSocketAddress isa,
@@ -1503,7 +1660,7 @@ public class RpcClient {
throws ServiceException {
long startTime = 0;
if (LOG.isTraceEnabled()) {
- startTime = System.currentTimeMillis();
+ startTime = EnvironmentEdgeManager.currentTimeMillis();
}
PayloadCarryingRpcController pcrc = (PayloadCarryingRpcController)controller;
CellScanner cells = null;
@@ -1524,10 +1681,8 @@ public class RpcClient {
}
if (LOG.isTraceEnabled()) {
- long callTime = System.currentTimeMillis() - startTime;
- if (LOG.isTraceEnabled()) {
- LOG.trace("Call: " + md.getName() + ", callTime: " + callTime + "ms");
- }
+ long callTime = EnvironmentEdgeManager.currentTimeMillis() - startTime;
+ LOG.trace("Call: " + md.getName() + ", callTime: " + callTime + "ms");
}
return val.getFirst();
} catch (Throwable e) {
@@ -1538,9 +1693,6 @@ public class RpcClient {
/**
* Creates a "channel" that can be used by a blocking protobuf service. Useful setting up
* protobuf blocking stubs.
- * @param sn
- * @param ticket
- * @param rpcTimeout
* @return A blocking rpc channel that goes via this rpc client instance.
*/
public BlockingRpcChannel createBlockingRpcChannel(final ServerName sn,
@@ -1551,10 +1703,10 @@ public class RpcClient {
/**
* Blocking rpc channel that goes via hbase rpc.
*/
- // Public so can be subclassed for tests.
+ @VisibleForTesting
public static class BlockingRpcChannelImplementation implements BlockingRpcChannel {
private final InetSocketAddress isa;
- private volatile RpcClient rpcClient;
+ private final RpcClient rpcClient;
private final int rpcTimeout;
private final User ticket;
Modified: hbase/branches/hbase-10070/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java
URL: http://svn.apache.org/viewvc/hbase/branches/hbase-10070/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java?rev=1574385&r1=1574384&r2=1574385&view=diff
==============================================================================
--- hbase/branches/hbase-10070/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java (original)
+++ hbase/branches/hbase-10070/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java Wed Mar 5 06:35:21 2014
@@ -791,7 +791,7 @@ public class RpcServer implements RpcSer
} catch (InterruptedException ieo) {
throw ieo;
} catch (Exception e) {
- LOG.warn(getName() + ": count of bytes read: " + count, e);
+ LOG.info(getName() + ": count of bytes read: " + count, e);
count = -1; //so that the (count < 0) block is executed
}
if (count < 0) {
Modified: hbase/branches/hbase-10070/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestHCM.java
URL: http://svn.apache.org/viewvc/hbase/branches/hbase-10070/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestHCM.java?rev=1574385&r1=1574384&r2=1574385&view=diff
==============================================================================
--- hbase/branches/hbase-10070/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestHCM.java (original)
+++ hbase/branches/hbase-10070/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestHCM.java Wed Mar 5 06:35:21 2014
@@ -39,6 +39,8 @@ import java.util.concurrent.SynchronousQ
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -57,6 +59,7 @@ import org.apache.hadoop.hbase.exception
import org.apache.hadoop.hbase.exceptions.RegionMovedException;
import org.apache.hadoop.hbase.filter.Filter;
import org.apache.hadoop.hbase.filter.FilterBase;
+import org.apache.hadoop.hbase.ipc.RpcClient;
import org.apache.hadoop.hbase.master.HMaster;
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.HRegionServer;
@@ -229,11 +232,156 @@ public class TestHCM {
}
/**
- * Test that the connection to the dead server is cut immediately when we receive the
- * notification.
- * @throws Exception
+ * Test that we can handle connection close: it will trigger a retry, but the calls will
+ * finish.
+ */
+ @Test
+ public void testConnectionCloseAllowsInterrupt() throws Exception {
+ testConnectionClose(true);
+ }
+
+ @Test
+ public void testConnectionNotAllowsInterrupt() throws Exception {
+ testConnectionClose(false);
+ }
+
+ private void testConnectionClose(boolean allowsInterrupt) throws Exception {
+ String tableName = "HCM-testConnectionClose" + allowsInterrupt;
+ TEST_UTIL.createTable(tableName.getBytes(), FAM_NAM).close();
+
+ boolean previousBalance = TEST_UTIL.getHBaseAdmin().setBalancerRunning(false, true);
+
+ Configuration c2 = new Configuration(TEST_UTIL.getConfiguration());
+ // We want to work on a separate connection.
+ c2.set(HConstants.HBASE_CLIENT_INSTANCE_ID, String.valueOf(-1));
+ c2.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, Integer.MAX_VALUE - 1); // retry a lot
+ c2.setInt(HConstants.HBASE_CLIENT_PAUSE, 0); // don't wait between retries.
+ c2.setInt(RpcClient.FAILED_SERVER_EXPIRY_KEY, 0); // Server do not really expire
+ c2.setBoolean(RpcClient.ALLOWS_INTERRUPTS, allowsInterrupt);
+
+ final HTable table = new HTable(c2, tableName.getBytes());
+
+ Put put = new Put(ROW);
+ put.add(FAM_NAM, ROW, ROW);
+ table.put(put);
+
+ // 4 steps: ready=0; doGets=1; mustStop=2; stopped=3
+ final AtomicInteger step = new AtomicInteger(0);
+
+ final AtomicReference<Throwable> failed = new AtomicReference<Throwable>(null);
+ Thread t = new Thread("testConnectionCloseThread") {
+ public void run() {
+ int done = 0;
+ try {
+ step.set(1);
+ while (step.get() == 1) {
+ Get get = new Get(ROW);
+ table.get(get);
+ done++;
+ if (done % 100 == 0)
+ LOG.info("done=" + done);
+ }
+ } catch (Throwable t) {
+ failed.set(t);
+ LOG.error(t);
+ }
+ step.set(3);
+ }
+ };
+ t.start();
+ TEST_UTIL.waitFor(20000, new Waiter.Predicate<Exception>() {
+ @Override
+ public boolean evaluate() throws Exception {
+ return step.get() == 1;
+ }
+ });
+
+ ServerName sn = table.getRegionLocation(ROW).getServerName();
+ ConnectionManager.HConnectionImplementation conn =
+ (ConnectionManager.HConnectionImplementation) table.getConnection();
+ RpcClient rpcClient = conn.getRpcClient();
+
+ LOG.info("Going to cancel connections. connection=" + conn.toString() + ", sn=" + sn);
+ for (int i = 0; i < 5000; i++) {
+ rpcClient.cancelConnections(sn.getHostname(), sn.getPort(), null);
+ Thread.sleep(5);
+ }
+
+ step.compareAndSet(1, 2);
+ // The test may fail here if the thread doing the gets is stuck. The wait to find
+ // out what's happening is to look for the thread named 'testConnectionCloseThread'
+ TEST_UTIL.waitFor(20000, new Waiter.Predicate<Exception>() {
+ @Override
+ public boolean evaluate() throws Exception {
+ return step.get() == 3;
+ }
+ });
+
+ Assert.assertTrue("Unexpected exception is " + failed.get(), failed.get() == null);
+ TEST_UTIL.getHBaseAdmin().setBalancerRunning(previousBalance, true);
+ }
+
+ /**
+ * Test that connection can become idle without breaking everything.
*/
@Test
+ public void testConnectionIdle() throws Exception {
+ String tableName = "HCM-testConnectionIdle";
+ TEST_UTIL.createTable(tableName.getBytes(), FAM_NAM).close();
+ int idleTime = 20000;
+ boolean previousBalance = TEST_UTIL.getHBaseAdmin().setBalancerRunning(false, true);
+
+ Configuration c2 = new Configuration(TEST_UTIL.getConfiguration());
+ // We want to work on a separate connection.
+ c2.set(HConstants.HBASE_CLIENT_INSTANCE_ID, String.valueOf(-1));
+ c2.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 1); // Don't retry: retry = test failed
+ c2.setInt(RpcClient.IDLE_TIME, idleTime);
+
+ final HTable table = new HTable(c2, tableName.getBytes());
+
+ Put put = new Put(ROW);
+ put.add(FAM_NAM, ROW, ROW);
+ table.put(put);
+
+ ManualEnvironmentEdge mee = new ManualEnvironmentEdge();
+ mee.setValue(System.currentTimeMillis());
+ EnvironmentEdgeManager.injectEdge(mee);
+ LOG.info("first get");
+ table.get(new Get(ROW));
+
+ LOG.info("first get - changing the time & sleeping");
+ mee.incValue(idleTime + 1000);
+ Thread.sleep(1500); // we need to wait a little for the connection to be seen as idle.
+ // 1500 = sleep time in RpcClient#waitForWork + a margin
+
+ LOG.info("second get - connection has been marked idle in the middle");
+ // To check that the connection actually became idle would need to read some private
+ // fields of RpcClient.
+ table.get(new Get(ROW));
+ mee.incValue(idleTime + 1000);
+
+ LOG.info("third get - connection is idle, but the reader doesn't know yet");
+ // We're testing here a special case:
+ // time limit reached BUT connection not yet reclaimed AND a new call.
+ // in this situation, we don't close the connection, instead we use it immediately.
+ // If we're very unlucky we can have a race condition in the test: the connection is already
+ // under closing when we do the get, so we have an exception, and we don't retry as the
+ // retry number is 1. The probability is very very low, and seems acceptable for now. It's
+ // a test issue only.
+ table.get(new Get(ROW));
+
+ LOG.info("we're done - time will change back");
+
+ EnvironmentEdgeManager.reset();
+ TEST_UTIL.getHBaseAdmin().setBalancerRunning(previousBalance, true);
+ }
+
+ /**
+ * Test that the connection to the dead server is cut immediately when we receive the
+ * notification.
+ * @throws Exception
+ */
+ @Test
public void testConnectionCut() throws Exception {
String tableName = "HCM-testConnectionCut";