You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by en...@apache.org on 2014/02/14 02:35:05 UTC
svn commit: r1568185 - in /hbase/branches/hbase-10070: ./
hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/
hbase-server/src/test/java/org/apache/hadoop/hbase/client/
hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/
Author: enis
Date: Fri Feb 14 01:35:04 2014
New Revision: 1568185
URL: http://svn.apache.org/r1568185
Log:
HBASE-10490 Simplify RpcClient code (Nicolas Liochon)
Modified:
hbase/branches/hbase-10070/ (props changed)
hbase/branches/hbase-10070/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClient.java
hbase/branches/hbase-10070/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAdmin.java
hbase/branches/hbase-10070/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLogRollAbort.java
hbase/branches/hbase-10070/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLogRolling.java
Propchange: hbase/branches/hbase-10070/
------------------------------------------------------------------------------
Merged /hbase/trunk:r1567919
Modified: hbase/branches/hbase-10070/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClient.java
URL: http://svn.apache.org/viewvc/hbase/branches/hbase-10070/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClient.java?rev=1568185&r1=1568184&r2=1568185&view=diff
==============================================================================
--- hbase/branches/hbase-10070/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClient.java (original)
+++ hbase/branches/hbase-10070/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClient.java Fri Feb 14 01:35:04 2014
@@ -19,37 +19,14 @@
package org.apache.hadoop.hbase.ipc;
-import java.io.BufferedInputStream;
-import java.io.BufferedOutputStream;
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
-import java.io.FilterInputStream;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.InterruptedIOException;
-import java.io.OutputStream;
-import java.net.ConnectException;
-import java.net.InetSocketAddress;
-import java.net.Socket;
-import java.net.SocketAddress;
-import java.net.SocketException;
-import java.net.SocketTimeoutException;
-import java.net.UnknownHostException;
-import java.nio.ByteBuffer;
-import java.security.PrivilegedExceptionAction;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.LinkedList;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.Random;
-import java.util.concurrent.ConcurrentSkipListMap;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicLong;
-
-import javax.net.SocketFactory;
-import javax.security.sasl.SaslException;
-
+import com.google.common.annotations.VisibleForTesting;
+import com.google.protobuf.BlockingRpcChannel;
+import com.google.protobuf.Descriptors.MethodDescriptor;
+import com.google.protobuf.Message;
+import com.google.protobuf.Message.Builder;
+import com.google.protobuf.RpcController;
+import com.google.protobuf.ServiceException;
+import com.google.protobuf.TextFormat;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
@@ -94,14 +71,34 @@ import org.apache.hadoop.security.token.
import org.cloudera.htrace.Span;
import org.cloudera.htrace.Trace;
-import com.google.common.annotations.VisibleForTesting;
-import com.google.protobuf.BlockingRpcChannel;
-import com.google.protobuf.Descriptors.MethodDescriptor;
-import com.google.protobuf.Message;
-import com.google.protobuf.Message.Builder;
-import com.google.protobuf.RpcController;
-import com.google.protobuf.ServiceException;
-import com.google.protobuf.TextFormat;
+import javax.net.SocketFactory;
+import javax.security.sasl.SaslException;
+import java.io.BufferedInputStream;
+import java.io.BufferedOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InterruptedIOException;
+import java.io.OutputStream;
+import java.net.ConnectException;
+import java.net.InetSocketAddress;
+import java.net.Socket;
+import java.net.SocketAddress;
+import java.net.SocketException;
+import java.net.SocketTimeoutException;
+import java.net.UnknownHostException;
+import java.nio.ByteBuffer;
+import java.security.PrivilegedExceptionAction;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Random;
+import java.util.concurrent.ConcurrentSkipListMap;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
/**
@@ -115,16 +112,15 @@ public class RpcClient {
public static final Log LOG = LogFactory.getLog("org.apache.hadoop.ipc.RpcClient");
protected final PoolMap<ConnectionId, Connection> connections;
- protected int counter; // counter for call ids
+ protected final AtomicInteger callIdCnt = new AtomicInteger();
protected final AtomicBoolean running = new AtomicBoolean(true); // if client runs
final protected Configuration conf;
- final protected int maxIdleTime; // connections will be culled if it was idle for
- // maxIdleTime microsecs
+ final protected int minIdleTimeBeforeClose; // if the connection is iddle for more than this
+ // time (in ms), it will be closed at any moment.
final protected int maxRetries; //the max. no. of retries for socket connections
final protected long failureSleep; // Time to sleep before retry on failure.
protected final boolean tcpNoDelay; // if T then disable Nagle's Algorithm
protected final boolean tcpKeepAlive; // if T then use keepalives
- protected int pingInterval; // how often sends ping to the server in msecs
protected FailedServers failedServers;
private final Codec codec;
private final CompressionCodec compressor;
@@ -137,11 +133,9 @@ public class RpcClient {
private final boolean fallbackAllowed;
private UserProvider userProvider;
- final private static String PING_INTERVAL_NAME = "ipc.ping.interval";
final private static String SOCKET_TIMEOUT = "ipc.socket.timeout";
- final static int DEFAULT_PING_INTERVAL = 60000; // 1 min
final static int DEFAULT_SOCKET_TIMEOUT = 20000; // 20 seconds
- final static int PING_CALL_ID = -1;
+ final static int PING_CALL_ID = -1; // Used by the server, for compatibility with old clients.
public final static String FAILED_SERVER_EXPIRY_KEY = "hbase.ipc.client.failed.servers.expiry";
public final static int FAILED_SERVER_EXPIRY_DEFAULT = 2000;
@@ -213,10 +207,16 @@ public class RpcClient {
}
}
+
+ /**
+ * Indicates that we're trying to connect to a already known as dead server. We will want to
+ * retry: we're getting this because the region location was wrong, or because
+ * the server just died, in which case the retry loop will help us to wait for the
+ * regions to recover.
+ */
@SuppressWarnings("serial")
@InterfaceAudience.Public
@InterfaceStability.Evolving
- // Shouldn't this be a DoNotRetryException? St.Ack 10/2/2013
public static class FailedServerException extends HBaseIOException {
public FailedServerException(String s) {
super(s);
@@ -224,29 +224,6 @@ public class RpcClient {
}
/**
- * set the ping interval value in configuration
- *
- * @param conf Configuration
- * @param pingInterval the ping interval
- */
- // Any reason we couldn't just do tcp keepalive instead of this pingery?
- // St.Ack 20130121
- public static void setPingInterval(Configuration conf, int pingInterval) {
- conf.setInt(PING_INTERVAL_NAME, pingInterval);
- }
-
- /**
- * Get the ping interval from configuration;
- * If not set in the configuration, return the default value.
- *
- * @param conf Configuration
- * @return the ping interval
- */
- static int getPingInterval(Configuration conf) {
- return conf.getInt(PING_INTERVAL_NAME, DEFAULT_PING_INTERVAL);
- }
-
- /**
* Set the socket timeout
* @param conf Configuration
* @param socketTimeout the socket timeout
@@ -286,9 +263,7 @@ public class RpcClient {
this.cells = cells;
this.startTime = System.currentTimeMillis();
this.responseDefaultType = responseDefaultType;
- synchronized (RpcClient.this) {
- this.id = counter++;
- }
+ this.id = callIdCnt.getAndIncrement();
}
@Override
@@ -358,7 +333,7 @@ public class RpcClient {
protected ConnectionId remoteId;
protected Socket socket = null; // connected socket
protected DataInputStream in;
- protected DataOutputStream out;
+ protected DataOutputStream out; // Warning: can be locked inside a class level lock.
private InetSocketAddress server; // server ip:port
private String serverPrincipal; // server's krb5 principal name
private AuthMethod authMethod; // authentication method
@@ -372,11 +347,8 @@ public class RpcClient {
// currently active calls
protected final ConcurrentSkipListMap<Integer, Call> calls =
new ConcurrentSkipListMap<Integer, Call>();
- protected final AtomicLong lastActivity =
- new AtomicLong(); // last I/O activity time
- protected final AtomicBoolean shouldCloseConnection =
- new AtomicBoolean(); // indicate if the connection is closed
- protected IOException closeException; // close reason
+
+ protected final AtomicBoolean shouldCloseConnection = new AtomicBoolean();
Connection(ConnectionId remoteId, final Codec codec, final CompressionCodec compressor)
throws IOException {
@@ -470,97 +442,6 @@ public class RpcClient {
return userInfoPB.build();
}
- /** Update lastActivity with the current time. */
- protected void touch() {
- lastActivity.set(System.currentTimeMillis());
- }
-
- /**
- * Add a call to this connection's call queue and notify
- * a listener; synchronized. If the connection is dead, the call is not added, and the
- * caller is notified.
- * This function can return a connection that is already marked as 'shouldCloseConnection'
- * It is up to the user code to check this status.
- * @param call to add
- */
- @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="NN_NAKED_NOTIFY",
- justification="Notify because new call available for processing")
- protected synchronized void addCall(Call call) {
- // If the connection is about to close, we manage this as if the call was already added
- // to the connection calls list. If not, the connection creations are serialized, as
- // mentioned in HBASE-6364
- if (this.shouldCloseConnection.get()) {
- if (this.closeException == null) {
- call.setException(new IOException(
- "Call " + call.id + " not added as the connection " + remoteId + " is closing"));
- } else {
- call.setException(this.closeException);
- }
- synchronized (call) {
- call.notifyAll();
- }
- } else {
- calls.put(call.id, call);
- synchronized (call) {
- notify();
- }
- }
- }
-
- /** This class sends a ping to the remote side when timeout on
- * reading. If no failure is detected, it retries until at least
- * a byte is read.
- */
- protected class PingInputStream extends FilterInputStream {
- /* constructor */
- protected PingInputStream(InputStream in) {
- super(in);
- }
-
- /* Process timeout exception
- * if the connection is not going to be closed, send a ping.
- * otherwise, throw the timeout exception.
- */
- private void handleTimeout(SocketTimeoutException e) throws IOException {
- if (shouldCloseConnection.get() || !running.get() || remoteId.rpcTimeout > 0) {
- throw e;
- }
- sendPing();
- }
-
- /** Read a byte from the stream.
- * Send a ping if timeout on read. Retries if no failure is detected
- * until a byte is read.
- * @throws IOException for any IO problem other than socket timeout
- */
- @Override
- public int read() throws IOException {
- do {
- try {
- return super.read();
- } catch (SocketTimeoutException e) {
- handleTimeout(e);
- }
- } while (true);
- }
-
- /** Read bytes into a buffer starting from offset <code>off</code>
- * Send a ping if timeout on read. Retries if no failure is detected
- * until a byte is read.
- *
- * @return the total number of bytes read; -1 if the connection is closed.
- */
- @Override
- public int read(byte[] buf, int off, int len) throws IOException {
- do {
- try {
- return super.read(buf, off, len);
- } catch (SocketTimeoutException e) {
- handleTimeout(e);
- }
- } while (true);
- }
- }
protected synchronized void setupConnection() throws IOException {
short ioFailures = 0;
@@ -576,10 +457,7 @@ public class RpcClient {
// connection time out is 20s
NetUtils.connect(this.socket, remoteId.getAddress(),
getSocketTimeout(conf));
- if (remoteId.rpcTimeout > 0) {
- pingInterval = remoteId.rpcTimeout; // overwrite pingInterval
- }
- this.socket.setSoTimeout(pingInterval);
+ this.socket.setSoTimeout(remoteId.rpcTimeout);
return;
} catch (SocketTimeoutException toe) {
/* The max number of retries is 45,
@@ -663,30 +541,32 @@ public class RpcClient {
" time(s).");
}
+ /**
+ * @throws IOException if the connection is not open.
+ */
+ private void checkIsOpen() throws IOException {
+ if (shouldCloseConnection.get()) {
+ throw new IOException(getName() + " is closing");
+ }
+ }
+
/* wait till someone signals us to start reading RPC response or
* it is idle too long, it is marked as to be closed,
* or the client is marked as not running.
*
* Return true if it is time to read a response; false otherwise.
*/
- protected synchronized boolean waitForWork() {
- if (calls.isEmpty() && !shouldCloseConnection.get() && running.get()) {
- long timeout = maxIdleTime - (System.currentTimeMillis()-lastActivity.get());
- if (timeout>0) {
- try {
- wait(timeout);
- } catch (InterruptedException ie) {
- Thread.currentThread().interrupt();
- }
- }
+ protected synchronized boolean waitForWork() throws InterruptedException{
+ while (calls.isEmpty() && !shouldCloseConnection.get() && running.get() ) {
+ wait(minIdleTimeBeforeClose);
}
if (!calls.isEmpty() && !shouldCloseConnection.get() && running.get()) {
return true;
} else if (shouldCloseConnection.get()) {
return false;
- } else if (calls.isEmpty()) { // idle connection closed or stopped
- markClosed(null);
+ } else if (calls.isEmpty()) {
+ markClosed(new IOException("idle connection closed or stopped"));
return false;
} else { // get stopped but there are still pending requests
markClosed((IOException)new IOException().initCause(
@@ -699,22 +579,6 @@ public class RpcClient {
return remoteId.getAddress();
}
- /* Send a ping to the server if the time elapsed
- * since last I/O activity is equal to or greater than the ping interval
- */
- protected synchronized void sendPing() throws IOException {
- // Can we do tcp keepalive instead of this pinging?
- long curTime = System.currentTimeMillis();
- if ( curTime - lastActivity.get() >= pingInterval) {
- lastActivity.set(curTime);
- //noinspection SynchronizeOnNonFinalField
- synchronized (this.out) {
- out.writeInt(PING_CALL_ID);
- out.flush();
- }
- }
- }
-
@Override
public void run() {
if (LOG.isDebugEnabled()) {
@@ -836,8 +700,7 @@ public class RpcClient {
});
}
- protected synchronized void setupIOstreams()
- throws IOException, InterruptedException {
+ protected synchronized void setupIOstreams() throws IOException {
if (socket != null || shouldCloseConnection.get()) {
return;
}
@@ -867,7 +730,7 @@ public class RpcClient {
// This creates a socket with a write timeout. This timeout cannot be changed,
// RpcClient allows to change the timeout dynamically, but we can only
// change the read timeout today.
- OutputStream outStream = NetUtils.getOutputStream(socket, pingInterval);
+ OutputStream outStream = NetUtils.getOutputStream(socket, remoteId.rpcTimeout);
// Write out the preamble -- MAGIC, version, and auth to use.
writeConnectionHeaderPreamble(outStream);
if (useSasl) {
@@ -879,7 +742,7 @@ public class RpcClient {
ticket = ticket.getRealUser();
}
}
- boolean continueSasl = false;
+ boolean continueSasl;
if (ticket == null) throw new FatalConnectionException("ticket/user is null");
try {
continueSasl = ticket.doAs(new PrivilegedExceptionAction<Boolean>() {
@@ -905,28 +768,24 @@ public class RpcClient {
useSasl = false;
}
}
- this.in = new DataInputStream(new BufferedInputStream(new PingInputStream(inStream)));
+ this.in = new DataInputStream(new BufferedInputStream(inStream));
this.out = new DataOutputStream(new BufferedOutputStream(outStream));
// Now write out the connection header
writeConnectionHeader();
- // update last activity time
- touch();
-
// start the receiver thread after the socket connection has been set up
start();
return;
}
} catch (Throwable t) {
failedServers.addToFailedServers(remoteId.address);
- IOException e = null;
+ IOException e;
if (t instanceof IOException) {
e = (IOException)t;
- markClosed(e);
} else {
e = new IOException("Could not set up IO Streams", t);
- markClosed(e);
}
+ markClosed(e);
close();
throw e;
}
@@ -986,28 +845,15 @@ public class RpcClient {
this.in = null;
disposeSasl();
- // clean up all calls
- if (closeException == null) {
- if (!calls.isEmpty()) {
- LOG.warn(getName() + ": connection is closed for no cause and calls are not empty. " +
- "#Calls: " + calls.size());
-
- // clean up calls anyway
- closeException = new IOException("Unexpected closed connection");
- cleanupCalls();
- }
- } else {
- // log the info
- if (LOG.isDebugEnabled()) {
- LOG.debug(getName() + ": closing ipc connection to " + server + ": " +
- closeException.getMessage(), closeException);
- }
-
- // cleanup calls
- cleanupCalls();
+ // log the info
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(getName() + ": closing ipc connection to " + server);
}
+
+ cleanupCalls();
+
if (LOG.isDebugEnabled())
- LOG.debug(getName() + ": closed");
+ LOG.debug(getName() + ": ipc connection closed");
}
/**
@@ -1018,36 +864,56 @@ public class RpcClient {
* @param priority
* @see #readResponse()
*/
- protected void writeRequest(Call call, final int priority) {
- if (shouldCloseConnection.get()) return;
- try {
- RequestHeader.Builder builder = RequestHeader.newBuilder();
- builder.setCallId(call.id);
- if (Trace.isTracing()) {
- Span s = Trace.currentSpan();
- builder.setTraceInfo(RPCTInfo.newBuilder().
+ @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "NN_NAKED_NOTIFY",
+ justification = "on close the reader thread must stop")
+ protected void writeRequest(Call call, final int priority) throws IOException {
+ RequestHeader.Builder builder = RequestHeader.newBuilder();
+ builder.setCallId(call.id);
+ if (Trace.isTracing()) {
+ Span s = Trace.currentSpan();
+ builder.setTraceInfo(RPCTInfo.newBuilder().
setParentId(s.getSpanId()).setTraceId(s.getTraceId()));
- }
- builder.setMethodName(call.md.getName());
- builder.setRequestParam(call.param != null);
- ByteBuffer cellBlock = ipcUtil.buildCellBlock(this.codec, this.compressor, call.cells);
- if (cellBlock != null) {
- CellBlockMeta.Builder cellBlockBuilder = CellBlockMeta.newBuilder();
- cellBlockBuilder.setLength(cellBlock.limit());
- builder.setCellBlockMeta(cellBlockBuilder.build());
- }
- // Only pass priority if there one. Let zero be same as no priority.
- if (priority != 0) builder.setPriority(priority);
- //noinspection SynchronizeOnNonFinalField
- RequestHeader header = builder.build();
+ }
+ builder.setMethodName(call.md.getName());
+ builder.setRequestParam(call.param != null);
+ ByteBuffer cellBlock = ipcUtil.buildCellBlock(this.codec, this.compressor, call.cells);
+ if (cellBlock != null) {
+ CellBlockMeta.Builder cellBlockBuilder = CellBlockMeta.newBuilder();
+ cellBlockBuilder.setLength(cellBlock.limit());
+ builder.setCellBlockMeta(cellBlockBuilder.build());
+ }
+ // Only pass priority if there one. Let zero be same as no priority.
+ if (priority != 0) builder.setPriority(priority);
+ RequestHeader header = builder.build();
+
+ // Now we're going to write the call. We take the lock, then check that the connection
+ // is still valid, and, if so we do the write to the socket. If the write fails, we don't
+ // know where we stand, we have to close the connection.
+ checkIsOpen();
+ calls.put(call.id, call); // On error, the call will be removed by the timeout.
+ try {
synchronized (this.out) { // FindBugs IS2_INCONSISTENT_SYNC
- IPCUtil.write(this.out, header, call.param, cellBlock);
+ if (Thread.interrupted()) throw new InterruptedIOException();
+ checkIsOpen();
+
+ try {
+ IPCUtil.write(this.out, header, call.param, cellBlock);
+ } catch (IOException e) {
+ // We set the value inside the synchronized block, this way the next in line
+ // won't even try to write
+ shouldCloseConnection.set(true);
+ throw e;
+ }
}
- if (LOG.isDebugEnabled()) {
- LOG.debug(getName() + ": wrote request header " + TextFormat.shortDebugString(header));
+ } finally {
+ synchronized (this) {
+ // We added a call, and may start the connection clode. In both cases, we
+ // need to notify the reader.
+ notifyAll();
}
- } catch(IOException e) {
- markClosed(e);
+ }
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(getName() + ": wrote request header " + TextFormat.shortDebugString(header));
}
}
@@ -1056,7 +922,6 @@ public class RpcClient {
*/
protected void readResponse() {
if (shouldCloseConnection.get()) return;
- touch();
int totalSize = -1;
try {
// See HBaseServer.Call.setResponse for where we write out the response.
@@ -1070,7 +935,7 @@ public class RpcClient {
LOG.debug(getName() + ": got response header " +
TextFormat.shortDebugString(responseHeader) + ", totalSize: " + totalSize + " bytes");
}
- Call call = calls.get(id);
+ Call call = calls.remove(id);
if (call == null) {
// So we got a response for which we have no corresponding 'call' here on the client-side.
// We probably timed out waiting, cleaned up all references, and now the server decides
@@ -1110,13 +975,11 @@ public class RpcClient {
// timeout, so check if it still exists before setting the value.
if (call != null) call.setResponse(value, cellBlockScanner);
}
- if (call != null) calls.remove(id);
} catch (IOException e) {
if (e instanceof SocketTimeoutException && remoteId.rpcTimeout > 0) {
// Clean up open calls but don't treat this as a fatal condition,
// since we expect certain responses to not make it by the specified
// {@link ConnectionId#rpcTimeout}.
- closeException = e;
} if (ExceptionUtil.isInterrupt(e)){
} else {
@@ -1154,10 +1017,18 @@ public class RpcClient {
e.getStackTrace(), doNotRetry);
}
- protected synchronized void markClosed(IOException e) {
+ @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "NN_NAKED_NOTIFY",
+ justification = "on close the reader thread must stop")
+ protected void markClosed(IOException e) {
+ if (e == null) throw new NullPointerException();
+
if (shouldCloseConnection.compareAndSet(false, true)) {
- closeException = e;
- notifyAll();
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(getName() + ": marking at should closed, reason =" + e.getMessage());
+ }
+ synchronized (this) {
+ notifyAll();
+ }
}
}
@@ -1167,46 +1038,38 @@ public class RpcClient {
}
@edu.umd.cs.findbugs.annotations.SuppressWarnings(value="NN_NAKED_NOTIFY",
- justification="Notify because timedout")
+ justification="Notify because timeout")
protected void cleanupCalls(long rpcTimeout) {
Iterator<Entry<Integer, Call>> itor = calls.entrySet().iterator();
while (itor.hasNext()) {
Call c = itor.next().getValue();
long waitTime = System.currentTimeMillis() - c.getStartTime();
if (waitTime >= rpcTimeout) {
- if (this.closeException == null) {
- // There may be no exception in the case that there are many calls
- // being multiplexed over this connection and these are succeeding
- // fine while this Call object is taking a long time to finish
- // over on the server; e.g. I just asked the regionserver to bulk
- // open 3k regions or its a big fat multiput into a heavily-loaded
- // server (Perhaps this only happens at the extremes?)
- this.closeException = new CallTimeoutException("Call id=" + c.id +
- ", waitTime=" + waitTime + ", rpcTimetout=" + rpcTimeout);
- }
- c.setException(this.closeException);
- synchronized (c) {
- c.notifyAll();
- }
+ IOException ie = new CallTimeoutException("Call id=" + c.id +
+ ", waitTime=" + waitTime + ", rpcTimeout=" + rpcTimeout);
+ c.setException(ie);
itor.remove();
} else {
+ // This relies on the insertion order to be the call id order. This is not
+ // true under 'difficult' conditions (gc, ...).
break;
}
}
- try {
- if (!calls.isEmpty()) {
- Call firstCall = calls.get(calls.firstKey());
- long maxWaitTime = System.currentTimeMillis() - firstCall.getStartTime();
- if (maxWaitTime < rpcTimeout) {
- rpcTimeout -= maxWaitTime;
- }
+
+ if (!calls.isEmpty()) {
+ Call firstCall = calls.get(calls.firstKey());
+ long maxWaitTime = System.currentTimeMillis() - firstCall.getStartTime();
+ if (maxWaitTime < rpcTimeout) {
+ rpcTimeout -= maxWaitTime;
}
+ }
+
+ try {
if (!shouldCloseConnection.get()) {
- closeException = null;
setSocketTimeout(socket, (int) rpcTimeout);
}
} catch (SocketException e) {
- LOG.debug("Couldn't lower timeout, which may result in longer than expected calls");
+ LOG.warn("Couldn't lower timeout, which may result in longer than expected calls");
}
}
}
@@ -1249,13 +1112,13 @@ public class RpcClient {
* @param localAddr client socket bind address
*/
RpcClient(Configuration conf, String clusterId, SocketFactory factory, SocketAddress localAddr) {
- this.maxIdleTime = conf.getInt("hbase.ipc.client.connection.maxidletime", 10000); //10s
+ this.minIdleTimeBeforeClose =
+ conf.getInt("hbase.ipc.client.connection.minIdleTimeBeforeClose", 120000); // 2 minutes
this.maxRetries = conf.getInt("hbase.ipc.client.connect.max.retries", 0);
this.failureSleep = conf.getLong(HConstants.HBASE_CLIENT_PAUSE,
HConstants.DEFAULT_HBASE_CLIENT_PAUSE);
this.tcpNoDelay = conf.getBoolean("hbase.ipc.client.tcpnodelay", true);
this.tcpKeepAlive = conf.getBoolean("hbase.ipc.client.tcpkeepalive", true);
- this.pingInterval = getPingInterval(conf);
this.ipcUtil = new IPCUtil(conf);
this.conf = conf;
this.codec = getCodec();
@@ -1273,10 +1136,9 @@ public class RpcClient {
LOG.debug("Codec=" + this.codec + ", compressor=" + this.compressor +
", tcpKeepAlive=" + this.tcpKeepAlive +
", tcpNoDelay=" + this.tcpNoDelay +
- ", maxIdleTime=" + this.maxIdleTime +
+ ", minIdleTimeBeforeClose=" + this.minIdleTimeBeforeClose +
", maxRetries=" + this.maxRetries +
", fallbackAllowed=" + this.fallbackAllowed +
- ", ping interval=" + this.pingInterval + "ms" +
", bind address=" + (this.localAddr != null ? this.localAddr : "null"));
}
}
@@ -1395,7 +1257,11 @@ public class RpcClient {
while (!connections.isEmpty()) {
try {
Thread.sleep(100);
- } catch (InterruptedException ignored) {
+ } catch (InterruptedException e) {
+ LOG.info("Interrupted while stopping the client. We still have " + connections.size() +
+ " connections.");
+ Thread.currentThread().interrupt();
+ return;
}
}
}
@@ -1431,14 +1297,12 @@ public class RpcClient {
Call call = new Call(md, param, cells, returnType);
Connection connection =
getConnection(ticket, call, addr, rpcTimeout, this.codec, this.compressor);
- connection.writeRequest(call, priority); // send the parameter
+
+ connection.writeRequest(call, priority);
//noinspection SynchronizationOnLocalVariableOrMethodParameter
synchronized (call) {
while (!call.done) {
- if (connection.shouldCloseConnection.get()) {
- throw new IOException("Unexpected closed connection");
- }
call.wait(1000); // wait for the result
}
@@ -1454,6 +1318,8 @@ public class RpcClient {
}
}
+
+
/**
* Take an IOException and the address we were trying to connect to
* and return an IOException with the input exception as the cause.
@@ -1486,7 +1352,7 @@ public class RpcClient {
* is known as actually dead. This will not prevent current operation to be retried, and,
* depending on their own behavior, they may retry on the same server. This can be a feature,
* for example at startup. In any case, they're likely to get connection refused (if the
- * process died) or no route to host: i.e. there next retries should be faster and with a
+ * process died) or no route to host: i.e. their next retries should be faster and with a
* safe exception.
*/
public void cancelConnections(String hostname, int port, IOException ioe) {
@@ -1505,11 +1371,13 @@ public class RpcClient {
}
}
- /* Get a connection from the pool, or create a new one and add it to the
- * pool. Connections to a given host/port are reused. */
+ /**
+ * Get a connection from the pool, or create a new one and add it to the
+ * pool. Connections to a given host/port are reused.
+ */
protected Connection getConnection(User ticket, Call call, InetSocketAddress addr,
int rpcTimeout, final Codec codec, final CompressionCodec compressor)
- throws IOException, InterruptedException {
+ throws IOException {
if (!running.get()) throw new StoppedRpcClientException();
Connection connection;
ConnectionId remoteId =
@@ -1521,7 +1389,6 @@ public class RpcClient {
connections.put(remoteId, connection);
}
}
- connection.addCall(call);
//we don't invoke the method below inside "synchronized (connections)"
//block above. The reason for that is if the server happens to be slow,
@@ -1531,6 +1398,7 @@ public class RpcClient {
// waiting here; as setupIOstreams is synchronized. If the connection fails with a
// timeout, they will all fail simultaneously. This is checked in setupIOstreams.
connection.setupIOstreams();
+
return connection;
}
@@ -1646,7 +1514,7 @@ public class RpcClient {
// Clear it here so we don't by mistake try and these cells processing results.
pcrc.setCellScanner(null);
}
- Pair<Message, CellScanner> val = null;
+ Pair<Message, CellScanner> val;
try {
val = call(md, param, cells, returnType, ticket, isa, rpcTimeout,
pcrc != null? pcrc.getPriority(): HConstants.NORMAL_QOS);
Modified: hbase/branches/hbase-10070/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAdmin.java
URL: http://svn.apache.org/viewvc/hbase/branches/hbase-10070/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAdmin.java?rev=1568185&r1=1568184&r2=1568185&view=diff
==============================================================================
--- hbase/branches/hbase-10070/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAdmin.java (original)
+++ hbase/branches/hbase-10070/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAdmin.java Fri Feb 14 01:35:04 2014
@@ -1552,7 +1552,6 @@ public class TestAdmin {
TEST_UTIL.getConfiguration().setInt(
"hbase.regionserver.logroll.errors.tolerated", 2);
- TEST_UTIL.getConfiguration().setInt("ipc.ping.interval", 10 * 1000);
TEST_UTIL.getConfiguration().setInt("ipc.socket.timeout", 10 * 1000);
TEST_UTIL.getConfiguration().setInt("hbase.rpc.timeout", 10 * 1000);
Modified: hbase/branches/hbase-10070/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLogRollAbort.java
URL: http://svn.apache.org/viewvc/hbase/branches/hbase-10070/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLogRollAbort.java?rev=1568185&r1=1568184&r2=1568185&view=diff
==============================================================================
--- hbase/branches/hbase-10070/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLogRollAbort.java (original)
+++ hbase/branches/hbase-10070/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLogRollAbort.java Fri Feb 14 01:35:04 2014
@@ -60,7 +60,6 @@ public class TestLogRollAbort {
// Tweak default timeout values down for faster recovery
TEST_UTIL.getConfiguration().setInt(
"hbase.regionserver.logroll.errors.tolerated", 2);
- TEST_UTIL.getConfiguration().setInt("ipc.ping.interval", 10 * 1000);
TEST_UTIL.getConfiguration().setInt("ipc.socket.timeout", 10 * 1000);
TEST_UTIL.getConfiguration().setInt("hbase.rpc.timeout", 10 * 1000);
Modified: hbase/branches/hbase-10070/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLogRolling.java
URL: http://svn.apache.org/viewvc/hbase/branches/hbase-10070/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLogRolling.java?rev=1568185&r1=1568184&r2=1568185&view=diff
==============================================================================
--- hbase/branches/hbase-10070/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLogRolling.java (original)
+++ hbase/branches/hbase-10070/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLogRolling.java Fri Feb 14 01:35:04 2014
@@ -124,7 +124,6 @@ public class TestLogRolling {
TEST_UTIL.getConfiguration().setInt("hbase.regionserver.maxlogentries", 32);
TEST_UTIL.getConfiguration().setInt("hbase.regionserver.logroll.errors.tolerated", 2);
- TEST_UTIL.getConfiguration().setInt("ipc.ping.interval", 10 * 1000);
TEST_UTIL.getConfiguration().setInt("ipc.socket.timeout", 10 * 1000);
TEST_UTIL.getConfiguration().setInt("hbase.rpc.timeout", 10 * 1000);