You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by en...@apache.org on 2014/02/14 02:35:05 UTC

svn commit: r1568185 - in /hbase/branches/hbase-10070: ./ hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/ hbase-server/src/test/java/org/apache/hadoop/hbase/client/ hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/

Author: enis
Date: Fri Feb 14 01:35:04 2014
New Revision: 1568185

URL: http://svn.apache.org/r1568185
Log:
HBASE-10490 Simplify RpcClient code (Nicolas Liochon)

Modified:
    hbase/branches/hbase-10070/   (props changed)
    hbase/branches/hbase-10070/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClient.java
    hbase/branches/hbase-10070/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAdmin.java
    hbase/branches/hbase-10070/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLogRollAbort.java
    hbase/branches/hbase-10070/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLogRolling.java

Propchange: hbase/branches/hbase-10070/
------------------------------------------------------------------------------
  Merged /hbase/trunk:r1567919

Modified: hbase/branches/hbase-10070/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClient.java
URL: http://svn.apache.org/viewvc/hbase/branches/hbase-10070/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClient.java?rev=1568185&r1=1568184&r2=1568185&view=diff
==============================================================================
--- hbase/branches/hbase-10070/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClient.java (original)
+++ hbase/branches/hbase-10070/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClient.java Fri Feb 14 01:35:04 2014
@@ -19,37 +19,14 @@
 
 package org.apache.hadoop.hbase.ipc;
 
-import java.io.BufferedInputStream;
-import java.io.BufferedOutputStream;
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
-import java.io.FilterInputStream;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.InterruptedIOException;
-import java.io.OutputStream;
-import java.net.ConnectException;
-import java.net.InetSocketAddress;
-import java.net.Socket;
-import java.net.SocketAddress;
-import java.net.SocketException;
-import java.net.SocketTimeoutException;
-import java.net.UnknownHostException;
-import java.nio.ByteBuffer;
-import java.security.PrivilegedExceptionAction;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.LinkedList;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.Random;
-import java.util.concurrent.ConcurrentSkipListMap;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicLong;
-
-import javax.net.SocketFactory;
-import javax.security.sasl.SaslException;
-
+import com.google.common.annotations.VisibleForTesting;
+import com.google.protobuf.BlockingRpcChannel;
+import com.google.protobuf.Descriptors.MethodDescriptor;
+import com.google.protobuf.Message;
+import com.google.protobuf.Message.Builder;
+import com.google.protobuf.RpcController;
+import com.google.protobuf.ServiceException;
+import com.google.protobuf.TextFormat;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience;
@@ -94,14 +71,34 @@ import org.apache.hadoop.security.token.
 import org.cloudera.htrace.Span;
 import org.cloudera.htrace.Trace;
 
-import com.google.common.annotations.VisibleForTesting;
-import com.google.protobuf.BlockingRpcChannel;
-import com.google.protobuf.Descriptors.MethodDescriptor;
-import com.google.protobuf.Message;
-import com.google.protobuf.Message.Builder;
-import com.google.protobuf.RpcController;
-import com.google.protobuf.ServiceException;
-import com.google.protobuf.TextFormat;
+import javax.net.SocketFactory;
+import javax.security.sasl.SaslException;
+import java.io.BufferedInputStream;
+import java.io.BufferedOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InterruptedIOException;
+import java.io.OutputStream;
+import java.net.ConnectException;
+import java.net.InetSocketAddress;
+import java.net.Socket;
+import java.net.SocketAddress;
+import java.net.SocketException;
+import java.net.SocketTimeoutException;
+import java.net.UnknownHostException;
+import java.nio.ByteBuffer;
+import java.security.PrivilegedExceptionAction;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Random;
+import java.util.concurrent.ConcurrentSkipListMap;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
 
 
 /**
@@ -115,16 +112,15 @@ public class RpcClient {
   public static final Log LOG = LogFactory.getLog("org.apache.hadoop.ipc.RpcClient");
   protected final PoolMap<ConnectionId, Connection> connections;
 
-  protected int counter;                            // counter for call ids
+  protected final AtomicInteger callIdCnt = new AtomicInteger();
   protected final AtomicBoolean running = new AtomicBoolean(true); // if client runs
   final protected Configuration conf;
-  final protected int maxIdleTime; // connections will be culled if it was idle for
-                           // maxIdleTime microsecs
+  final protected int minIdleTimeBeforeClose; // if the connection is iddle for more than this
+                                               // time (in ms), it will be closed at any moment.
   final protected int maxRetries; //the max. no. of retries for socket connections
   final protected long failureSleep; // Time to sleep before retry on failure.
   protected final boolean tcpNoDelay; // if T then disable Nagle's Algorithm
   protected final boolean tcpKeepAlive; // if T then use keepalives
-  protected int pingInterval; // how often sends ping to the server in msecs
   protected FailedServers failedServers;
   private final Codec codec;
   private final CompressionCodec compressor;
@@ -137,11 +133,9 @@ public class RpcClient {
   private final boolean fallbackAllowed;
   private UserProvider userProvider;
 
-  final private static String PING_INTERVAL_NAME = "ipc.ping.interval";
   final private static String SOCKET_TIMEOUT = "ipc.socket.timeout";
-  final static int DEFAULT_PING_INTERVAL = 60000;  // 1 min
   final static int DEFAULT_SOCKET_TIMEOUT = 20000; // 20 seconds
-  final static int PING_CALL_ID = -1;
+  final static int PING_CALL_ID = -1; // Used by the server, for compatibility with old clients.
 
   public final static String FAILED_SERVER_EXPIRY_KEY = "hbase.ipc.client.failed.servers.expiry";
   public final static int FAILED_SERVER_EXPIRY_DEFAULT = 2000;
@@ -213,10 +207,16 @@ public class RpcClient {
     }
   }
 
+
+  /**
+   * Indicates that we're trying to connect to a already known as dead server. We will want to
+   *  retry: we're getting this because the region location was wrong, or because
+   *  the server just died, in which case the retry loop will help us to wait for the
+   *  regions to recover.
+   */
   @SuppressWarnings("serial")
   @InterfaceAudience.Public
   @InterfaceStability.Evolving
-  // Shouldn't this be a DoNotRetryException? St.Ack 10/2/2013
   public static class FailedServerException extends HBaseIOException {
     public FailedServerException(String s) {
       super(s);
@@ -224,29 +224,6 @@ public class RpcClient {
   }
 
   /**
-   * set the ping interval value in configuration
-   *
-   * @param conf Configuration
-   * @param pingInterval the ping interval
-   */
-  // Any reason we couldn't just do tcp keepalive instead of this pingery?
-  // St.Ack 20130121
-  public static void setPingInterval(Configuration conf, int pingInterval) {
-    conf.setInt(PING_INTERVAL_NAME, pingInterval);
-  }
-
-  /**
-   * Get the ping interval from configuration;
-   * If not set in the configuration, return the default value.
-   *
-   * @param conf Configuration
-   * @return the ping interval
-   */
-  static int getPingInterval(Configuration conf) {
-    return conf.getInt(PING_INTERVAL_NAME, DEFAULT_PING_INTERVAL);
-  }
-
-  /**
    * Set the socket timeout
    * @param conf Configuration
    * @param socketTimeout the socket timeout
@@ -286,9 +263,7 @@ public class RpcClient {
       this.cells = cells;
       this.startTime = System.currentTimeMillis();
       this.responseDefaultType = responseDefaultType;
-      synchronized (RpcClient.this) {
-        this.id = counter++;
-      }
+      this.id = callIdCnt.getAndIncrement();
     }
 
     @Override
@@ -358,7 +333,7 @@ public class RpcClient {
     protected ConnectionId remoteId;
     protected Socket socket = null;                 // connected socket
     protected DataInputStream in;
-    protected DataOutputStream out;
+    protected DataOutputStream out; // Warning: can be locked inside a class level lock.
     private InetSocketAddress server;             // server ip:port
     private String serverPrincipal;  // server's krb5 principal name
     private AuthMethod authMethod; // authentication method
@@ -372,11 +347,8 @@ public class RpcClient {
     // currently active calls
     protected final ConcurrentSkipListMap<Integer, Call> calls =
       new ConcurrentSkipListMap<Integer, Call>();
-    protected final AtomicLong lastActivity =
-      new AtomicLong(); // last I/O activity time
-    protected final AtomicBoolean shouldCloseConnection =
-      new AtomicBoolean();  // indicate if the connection is closed
-    protected IOException closeException; // close reason
+
+    protected final AtomicBoolean shouldCloseConnection = new AtomicBoolean();
 
     Connection(ConnectionId remoteId, final Codec codec, final CompressionCodec compressor)
     throws IOException {
@@ -470,97 +442,6 @@ public class RpcClient {
       return userInfoPB.build();
     }
 
-    /** Update lastActivity with the current time. */
-    protected void touch() {
-      lastActivity.set(System.currentTimeMillis());
-    }
-
-    /**
-     * Add a call to this connection's call queue and notify
-     * a listener; synchronized. If the connection is dead, the call is not added, and the
-     * caller is notified.
-     * This function can return a connection that is already marked as 'shouldCloseConnection'
-     *  It is up to the user code to check this status.
-     * @param call to add
-     */
-    @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="NN_NAKED_NOTIFY",
-      justification="Notify because new call available for processing")
-    protected synchronized void addCall(Call call) {
-      // If the connection is about to close, we manage this as if the call was already added
-      //  to the connection calls list. If not, the connection creations are serialized, as
-      //  mentioned in HBASE-6364
-      if (this.shouldCloseConnection.get()) {
-        if (this.closeException == null) {
-          call.setException(new IOException(
-              "Call " + call.id + " not added as the connection " + remoteId + " is closing"));
-        } else {
-          call.setException(this.closeException);
-        }
-        synchronized (call) {
-          call.notifyAll();
-        }
-      } else {
-        calls.put(call.id, call);
-        synchronized (call) {
-          notify();
-        }
-      }
-    }
-
-    /** This class sends a ping to the remote side when timeout on
-     * reading. If no failure is detected, it retries until at least
-     * a byte is read.
-     */
-    protected class PingInputStream extends FilterInputStream {
-      /* constructor */
-      protected PingInputStream(InputStream in) {
-        super(in);
-      }
-
-      /* Process timeout exception
-       * if the connection is not going to be closed, send a ping.
-       * otherwise, throw the timeout exception.
-       */
-      private void handleTimeout(SocketTimeoutException e) throws IOException {
-        if (shouldCloseConnection.get() || !running.get() || remoteId.rpcTimeout > 0) {
-          throw e;
-        }
-        sendPing();
-      }
-
-      /** Read a byte from the stream.
-       * Send a ping if timeout on read. Retries if no failure is detected
-       * until a byte is read.
-       * @throws IOException for any IO problem other than socket timeout
-       */
-      @Override
-      public int read() throws IOException {
-        do {
-          try {
-            return super.read();
-          } catch (SocketTimeoutException e) {
-            handleTimeout(e);
-          }
-        } while (true);
-      }
-
-      /** Read bytes into a buffer starting from offset <code>off</code>
-       * Send a ping if timeout on read. Retries if no failure is detected
-       * until a byte is read.
-       *
-       * @return the total number of bytes read; -1 if the connection is closed.
-       */
-      @Override
-      public int read(byte[] buf, int off, int len) throws IOException {
-        do {
-          try {
-            return super.read(buf, off, len);
-          } catch (SocketTimeoutException e) {
-            handleTimeout(e);
-          }
-        } while (true);
-      }
-    }
 
     protected synchronized void setupConnection() throws IOException {
       short ioFailures = 0;
@@ -576,10 +457,7 @@ public class RpcClient {
           // connection time out is 20s
           NetUtils.connect(this.socket, remoteId.getAddress(),
               getSocketTimeout(conf));
-          if (remoteId.rpcTimeout > 0) {
-            pingInterval = remoteId.rpcTimeout; // overwrite pingInterval
-          }
-          this.socket.setSoTimeout(pingInterval);
+          this.socket.setSoTimeout(remoteId.rpcTimeout);
           return;
         } catch (SocketTimeoutException toe) {
           /* The max number of retries is 45,
@@ -663,30 +541,32 @@ public class RpcClient {
         " time(s).");
     }
 
+    /**
+     * @throws IOException if the connection is not open.
+     */
+    private void checkIsOpen() throws IOException {
+      if (shouldCloseConnection.get()) {
+        throw new IOException(getName() + " is closing");
+      }
+    }
+
     /* wait till someone signals us to start reading RPC response or
      * it is idle too long, it is marked as to be closed,
      * or the client is marked as not running.
      *
      * Return true if it is time to read a response; false otherwise.
      */
-    protected synchronized boolean waitForWork() {
-      if (calls.isEmpty() && !shouldCloseConnection.get()  && running.get())  {
-        long timeout = maxIdleTime - (System.currentTimeMillis()-lastActivity.get());
-        if (timeout>0) {
-          try {
-            wait(timeout);
-          } catch (InterruptedException ie) {
-            Thread.currentThread().interrupt();
-          }
-        }
+    protected synchronized boolean waitForWork() throws InterruptedException{
+      while (calls.isEmpty() && !shouldCloseConnection.get() && running.get() ) {
+        wait(minIdleTimeBeforeClose);
       }
 
       if (!calls.isEmpty() && !shouldCloseConnection.get() && running.get()) {
         return true;
       } else if (shouldCloseConnection.get()) {
         return false;
-      } else if (calls.isEmpty()) { // idle connection closed or stopped
-        markClosed(null);
+      } else if (calls.isEmpty()) {
+        markClosed(new IOException("idle connection closed or stopped"));
         return false;
       } else { // get stopped but there are still pending requests
         markClosed((IOException)new IOException().initCause(
@@ -699,22 +579,6 @@ public class RpcClient {
       return remoteId.getAddress();
     }
 
-    /* Send a ping to the server if the time elapsed
-     * since last I/O activity is equal to or greater than the ping interval
-     */
-    protected synchronized void sendPing() throws IOException {
-      // Can we do tcp keepalive instead of this pinging?
-      long curTime = System.currentTimeMillis();
-      if ( curTime - lastActivity.get() >= pingInterval) {
-        lastActivity.set(curTime);
-        //noinspection SynchronizeOnNonFinalField
-        synchronized (this.out) {
-          out.writeInt(PING_CALL_ID);
-          out.flush();
-        }
-      }
-    }
-
     @Override
     public void run() {
       if (LOG.isDebugEnabled()) {
@@ -836,8 +700,7 @@ public class RpcClient {
       });
     }
 
-    protected synchronized void setupIOstreams()
-    throws IOException, InterruptedException {
+    protected synchronized void setupIOstreams() throws IOException {
       if (socket != null || shouldCloseConnection.get()) {
         return;
       }
@@ -867,7 +730,7 @@ public class RpcClient {
           // This creates a socket with a write timeout. This timeout cannot be changed,
           //  RpcClient allows to change the timeout dynamically, but we can only
           //  change the read timeout today.
-          OutputStream outStream = NetUtils.getOutputStream(socket, pingInterval);
+          OutputStream outStream = NetUtils.getOutputStream(socket, remoteId.rpcTimeout);
           // Write out the preamble -- MAGIC, version, and auth to use.
           writeConnectionHeaderPreamble(outStream);
           if (useSasl) {
@@ -879,7 +742,7 @@ public class RpcClient {
                 ticket = ticket.getRealUser();
               }
             }
-            boolean continueSasl = false;
+            boolean continueSasl;
             if (ticket == null) throw new FatalConnectionException("ticket/user is null");
             try {
               continueSasl = ticket.doAs(new PrivilegedExceptionAction<Boolean>() {
@@ -905,28 +768,24 @@ public class RpcClient {
               useSasl = false;
             }
           }
-          this.in = new DataInputStream(new BufferedInputStream(new PingInputStream(inStream)));
+          this.in = new DataInputStream(new BufferedInputStream(inStream));
           this.out = new DataOutputStream(new BufferedOutputStream(outStream));
           // Now write out the connection header
           writeConnectionHeader();
 
-          // update last activity time
-          touch();
-
           // start the receiver thread after the socket connection has been set up
           start();
           return;
         }
       } catch (Throwable t) {
         failedServers.addToFailedServers(remoteId.address);
-        IOException e = null;
+        IOException e;
         if (t instanceof IOException) {
           e = (IOException)t;
-          markClosed(e);
         } else {
           e = new IOException("Could not set up IO Streams", t);
-          markClosed(e);
         }
+        markClosed(e);
         close();
         throw e;
       }
@@ -986,28 +845,15 @@ public class RpcClient {
       this.in = null;
       disposeSasl();
 
-      // clean up all calls
-      if (closeException == null) {
-        if (!calls.isEmpty()) {
-          LOG.warn(getName() + ": connection is closed for no cause and calls are not empty. " +
-              "#Calls: " + calls.size());
-
-          // clean up calls anyway
-          closeException = new IOException("Unexpected closed connection");
-          cleanupCalls();
-        }
-      } else {
-        // log the info
-        if (LOG.isDebugEnabled()) {
-          LOG.debug(getName() + ": closing ipc connection to " + server + ": " +
-              closeException.getMessage(), closeException);
-        }
-
-        // cleanup calls
-        cleanupCalls();
+      // log the info
+      if (LOG.isDebugEnabled()) {
+        LOG.debug(getName() + ": closing ipc connection to " + server);
       }
+
+      cleanupCalls();
+
       if (LOG.isDebugEnabled())
-        LOG.debug(getName() + ": closed");
+        LOG.debug(getName() + ": ipc connection closed");
     }
 
     /**
@@ -1018,36 +864,56 @@ public class RpcClient {
      * @param priority
      * @see #readResponse()
      */
-    protected void writeRequest(Call call, final int priority) {
-      if (shouldCloseConnection.get()) return;
-      try {
-        RequestHeader.Builder builder = RequestHeader.newBuilder();
-        builder.setCallId(call.id);
-        if (Trace.isTracing()) {
-          Span s = Trace.currentSpan();
-          builder.setTraceInfo(RPCTInfo.newBuilder().
+    @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "NN_NAKED_NOTIFY",
+        justification = "on close the reader thread must stop")
+    protected void writeRequest(Call call, final int priority) throws IOException {
+      RequestHeader.Builder builder = RequestHeader.newBuilder();
+      builder.setCallId(call.id);
+      if (Trace.isTracing()) {
+        Span s = Trace.currentSpan();
+        builder.setTraceInfo(RPCTInfo.newBuilder().
             setParentId(s.getSpanId()).setTraceId(s.getTraceId()));
-        }
-        builder.setMethodName(call.md.getName());
-        builder.setRequestParam(call.param != null);
-        ByteBuffer cellBlock = ipcUtil.buildCellBlock(this.codec, this.compressor, call.cells);
-        if (cellBlock != null) {
-          CellBlockMeta.Builder cellBlockBuilder = CellBlockMeta.newBuilder();
-          cellBlockBuilder.setLength(cellBlock.limit());
-          builder.setCellBlockMeta(cellBlockBuilder.build());
-        }
-        // Only pass priority if there one.  Let zero be same as no priority.
-        if (priority != 0) builder.setPriority(priority);
-        //noinspection SynchronizeOnNonFinalField
-        RequestHeader header = builder.build();
+      }
+      builder.setMethodName(call.md.getName());
+      builder.setRequestParam(call.param != null);
+      ByteBuffer cellBlock = ipcUtil.buildCellBlock(this.codec, this.compressor, call.cells);
+      if (cellBlock != null) {
+        CellBlockMeta.Builder cellBlockBuilder = CellBlockMeta.newBuilder();
+        cellBlockBuilder.setLength(cellBlock.limit());
+        builder.setCellBlockMeta(cellBlockBuilder.build());
+      }
+      // Only pass priority if there one.  Let zero be same as no priority.
+      if (priority != 0) builder.setPriority(priority);
+      RequestHeader header = builder.build();
+
+      // Now we're going to write the call. We take the lock, then check that the connection
+      //  is still valid, and, if so we do the write to the socket. If the write fails, we don't
+      //  know where we stand, we have to close the connection.
+      checkIsOpen();
+      calls.put(call.id, call);  // On error, the call will be removed by the timeout.
+      try {
         synchronized (this.out) { // FindBugs IS2_INCONSISTENT_SYNC
-          IPCUtil.write(this.out, header, call.param, cellBlock);
+          if (Thread.interrupted()) throw new InterruptedIOException();
+          checkIsOpen();
+
+          try {
+            IPCUtil.write(this.out, header, call.param, cellBlock);
+          } catch (IOException e) {
+            // We set the value inside the synchronized block, this way the next in line
+            //  won't even try to write
+            shouldCloseConnection.set(true);
+            throw e;
+          }
         }
-        if (LOG.isDebugEnabled()) {
-          LOG.debug(getName() + ": wrote request header " + TextFormat.shortDebugString(header));
+      } finally {
+        synchronized (this) {
+          // We added a call, and may start the connection clode. In both cases, we
+          //  need to notify the reader.
+          notifyAll();
         }
-      } catch(IOException e) {
-        markClosed(e);
+      }
+      if (LOG.isDebugEnabled()) {
+        LOG.debug(getName() + ": wrote request header " + TextFormat.shortDebugString(header));
       }
     }
 
@@ -1056,7 +922,6 @@ public class RpcClient {
      */
     protected void readResponse() {
       if (shouldCloseConnection.get()) return;
-      touch();
       int totalSize = -1;
       try {
         // See HBaseServer.Call.setResponse for where we write out the response.
@@ -1070,7 +935,7 @@ public class RpcClient {
           LOG.debug(getName() + ": got response header " +
             TextFormat.shortDebugString(responseHeader) + ", totalSize: " + totalSize + " bytes");
         }
-        Call call = calls.get(id);
+        Call call = calls.remove(id);
         if (call == null) {
           // So we got a response for which we have no corresponding 'call' here on the client-side.
           // We probably timed out waiting, cleaned up all references, and now the server decides
@@ -1110,13 +975,11 @@ public class RpcClient {
           // timeout, so check if it still exists before setting the value.
           if (call != null) call.setResponse(value, cellBlockScanner);
         }
-        if (call != null) calls.remove(id);
       } catch (IOException e) {
         if (e instanceof SocketTimeoutException && remoteId.rpcTimeout > 0) {
           // Clean up open calls but don't treat this as a fatal condition,
           // since we expect certain responses to not make it by the specified
           // {@link ConnectionId#rpcTimeout}.
-          closeException = e;
         } if (ExceptionUtil.isInterrupt(e)){
 
         } else {
@@ -1154,10 +1017,18 @@ public class RpcClient {
           e.getStackTrace(), doNotRetry);
     }
 
-    protected synchronized void markClosed(IOException e) {
+    @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "NN_NAKED_NOTIFY",
+        justification = "on close the reader thread must stop")
+    protected void markClosed(IOException e) {
+      if (e == null) throw new NullPointerException();
+
       if (shouldCloseConnection.compareAndSet(false, true)) {
-        closeException = e;
-        notifyAll();
+        if (LOG.isDebugEnabled()) {
+          LOG.debug(getName() + ": marking at should closed, reason =" + e.getMessage());
+        }
+        synchronized (this) {
+          notifyAll();
+        }
       }
     }
 
@@ -1167,46 +1038,38 @@ public class RpcClient {
     }
 
     @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="NN_NAKED_NOTIFY",
-      justification="Notify because timedout")
+      justification="Notify because timeout")
     protected void cleanupCalls(long rpcTimeout) {
       Iterator<Entry<Integer, Call>> itor = calls.entrySet().iterator();
       while (itor.hasNext()) {
         Call c = itor.next().getValue();
         long waitTime = System.currentTimeMillis() - c.getStartTime();
         if (waitTime >= rpcTimeout) {
-          if (this.closeException == null) {
-            // There may be no exception in the case that there are many calls
-            // being multiplexed over this connection and these are succeeding
-            // fine while this Call object is taking a long time to finish
-            // over on the server; e.g. I just asked the regionserver to bulk
-            // open 3k regions or its a big fat multiput into a heavily-loaded
-            // server (Perhaps this only happens at the extremes?)
-            this.closeException = new CallTimeoutException("Call id=" + c.id +
-              ", waitTime=" + waitTime + ", rpcTimetout=" + rpcTimeout);
-          }
-          c.setException(this.closeException);
-          synchronized (c) {
-            c.notifyAll();
-          }
+          IOException ie = new CallTimeoutException("Call id=" + c.id +
+              ", waitTime=" + waitTime + ", rpcTimeout=" + rpcTimeout);
+          c.setException(ie);
           itor.remove();
         } else {
+          // This relies on the insertion order to be the call id order. This is not
+          //  true under 'difficult' conditions (gc, ...).
           break;
         }
       }
-      try {
-        if (!calls.isEmpty()) {
-          Call firstCall = calls.get(calls.firstKey());
-          long maxWaitTime = System.currentTimeMillis() - firstCall.getStartTime();
-          if (maxWaitTime < rpcTimeout) {
-            rpcTimeout -= maxWaitTime;
-          }
+
+      if (!calls.isEmpty()) {
+        Call firstCall = calls.get(calls.firstKey());
+        long maxWaitTime = System.currentTimeMillis() - firstCall.getStartTime();
+        if (maxWaitTime < rpcTimeout) {
+          rpcTimeout -= maxWaitTime;
         }
+      }
+
+      try {
         if (!shouldCloseConnection.get()) {
-          closeException = null;
           setSocketTimeout(socket, (int) rpcTimeout);
         }
       } catch (SocketException e) {
-        LOG.debug("Couldn't lower timeout, which may result in longer than expected calls");
+        LOG.warn("Couldn't lower timeout, which may result in longer than expected calls");
       }
     }
   }
@@ -1249,13 +1112,13 @@ public class RpcClient {
    * @param localAddr client socket bind address
    */
   RpcClient(Configuration conf, String clusterId, SocketFactory factory, SocketAddress localAddr) {
-    this.maxIdleTime = conf.getInt("hbase.ipc.client.connection.maxidletime", 10000); //10s
+    this.minIdleTimeBeforeClose =
+        conf.getInt("hbase.ipc.client.connection.minIdleTimeBeforeClose", 120000); // 2 minutes
     this.maxRetries = conf.getInt("hbase.ipc.client.connect.max.retries", 0);
     this.failureSleep = conf.getLong(HConstants.HBASE_CLIENT_PAUSE,
         HConstants.DEFAULT_HBASE_CLIENT_PAUSE);
     this.tcpNoDelay = conf.getBoolean("hbase.ipc.client.tcpnodelay", true);
     this.tcpKeepAlive = conf.getBoolean("hbase.ipc.client.tcpkeepalive", true);
-    this.pingInterval = getPingInterval(conf);
     this.ipcUtil = new IPCUtil(conf);
     this.conf = conf;
     this.codec = getCodec();
@@ -1273,10 +1136,9 @@ public class RpcClient {
       LOG.debug("Codec=" + this.codec + ", compressor=" + this.compressor +
         ", tcpKeepAlive=" + this.tcpKeepAlive +
         ", tcpNoDelay=" + this.tcpNoDelay +
-        ", maxIdleTime=" + this.maxIdleTime +
+        ", minIdleTimeBeforeClose=" + this.minIdleTimeBeforeClose +
         ", maxRetries=" + this.maxRetries +
         ", fallbackAllowed=" + this.fallbackAllowed +
-        ", ping interval=" + this.pingInterval + "ms" +
         ", bind address=" + (this.localAddr != null ? this.localAddr : "null"));
     }
   }
@@ -1395,7 +1257,11 @@ public class RpcClient {
     while (!connections.isEmpty()) {
       try {
         Thread.sleep(100);
-      } catch (InterruptedException ignored) {
+      } catch (InterruptedException e) {
+        LOG.info("Interrupted while stopping the client. We still have " + connections.size() +
+            " connections.");
+        Thread.currentThread().interrupt();
+        return;
       }
     }
   }
@@ -1431,14 +1297,12 @@ public class RpcClient {
     Call call = new Call(md, param, cells, returnType);
     Connection connection =
       getConnection(ticket, call, addr, rpcTimeout, this.codec, this.compressor);
-    connection.writeRequest(call, priority);                 // send the parameter
+
+    connection.writeRequest(call, priority);
 
     //noinspection SynchronizationOnLocalVariableOrMethodParameter
     synchronized (call) {
       while (!call.done) {
-        if (connection.shouldCloseConnection.get()) {
-          throw new IOException("Unexpected closed connection");
-        }
         call.wait(1000);                       // wait for the result
       }
 
@@ -1454,6 +1318,8 @@ public class RpcClient {
     }
   }
 
+
+
   /**
    * Take an IOException and the address we were trying to connect to
    * and return an IOException with the input exception as the cause.
@@ -1486,7 +1352,7 @@ public class RpcClient {
    *  is known as actually dead. This will not prevent current operation to be retried, and,
    *  depending on their own behavior, they may retry on the same server. This can be a feature,
    *  for example at startup. In any case, they're likely to get connection refused (if the
-   *  process died) or no route to host: i.e. there next retries should be faster and with a
+   *  process died) or no route to host: i.e. their next retries should be faster and with a
    *  safe exception.
    */
   public void cancelConnections(String hostname, int port, IOException ioe) {
@@ -1505,11 +1371,13 @@ public class RpcClient {
     }
   }
 
-  /* Get a connection from the pool, or create a new one and add it to the
-   * pool.  Connections to a given host/port are reused. */
+  /**
+   *  Get a connection from the pool, or create a new one and add it to the
+   * pool.  Connections to a given host/port are reused.
+   */
   protected Connection getConnection(User ticket, Call call, InetSocketAddress addr,
       int rpcTimeout, final Codec codec, final CompressionCodec compressor)
-  throws IOException, InterruptedException {
+  throws IOException {
     if (!running.get()) throw new StoppedRpcClientException();
     Connection connection;
     ConnectionId remoteId =
@@ -1521,7 +1389,6 @@ public class RpcClient {
         connections.put(remoteId, connection);
       }
     }
-    connection.addCall(call);
 
     //we don't invoke the method below inside "synchronized (connections)"
     //block above. The reason for that is if the server happens to be slow,
@@ -1531,6 +1398,7 @@ public class RpcClient {
     // waiting here; as setupIOstreams is synchronized. If the connection fails with a
     // timeout, they will all fail simultaneously. This is checked in setupIOstreams.
     connection.setupIOstreams();
+
     return connection;
   }
 
@@ -1646,7 +1514,7 @@ public class RpcClient {
       // Clear it here so we don't by mistake try and these cells processing results.
       pcrc.setCellScanner(null);
     }
-    Pair<Message, CellScanner> val = null;
+    Pair<Message, CellScanner> val;
     try {
       val = call(md, param, cells, returnType, ticket, isa, rpcTimeout,
         pcrc != null? pcrc.getPriority(): HConstants.NORMAL_QOS);

Modified: hbase/branches/hbase-10070/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAdmin.java
URL: http://svn.apache.org/viewvc/hbase/branches/hbase-10070/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAdmin.java?rev=1568185&r1=1568184&r2=1568185&view=diff
==============================================================================
--- hbase/branches/hbase-10070/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAdmin.java (original)
+++ hbase/branches/hbase-10070/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAdmin.java Fri Feb 14 01:35:04 2014
@@ -1552,7 +1552,6 @@ public class TestAdmin {
 
     TEST_UTIL.getConfiguration().setInt(
         "hbase.regionserver.logroll.errors.tolerated", 2);
-    TEST_UTIL.getConfiguration().setInt("ipc.ping.interval", 10 * 1000);
     TEST_UTIL.getConfiguration().setInt("ipc.socket.timeout", 10 * 1000);
     TEST_UTIL.getConfiguration().setInt("hbase.rpc.timeout", 10 * 1000);
 

Modified: hbase/branches/hbase-10070/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLogRollAbort.java
URL: http://svn.apache.org/viewvc/hbase/branches/hbase-10070/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLogRollAbort.java?rev=1568185&r1=1568184&r2=1568185&view=diff
==============================================================================
--- hbase/branches/hbase-10070/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLogRollAbort.java (original)
+++ hbase/branches/hbase-10070/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLogRollAbort.java Fri Feb 14 01:35:04 2014
@@ -60,7 +60,6 @@ public class TestLogRollAbort {
     // Tweak default timeout values down for faster recovery
     TEST_UTIL.getConfiguration().setInt(
         "hbase.regionserver.logroll.errors.tolerated", 2);
-    TEST_UTIL.getConfiguration().setInt("ipc.ping.interval", 10 * 1000);
     TEST_UTIL.getConfiguration().setInt("ipc.socket.timeout", 10 * 1000);
     TEST_UTIL.getConfiguration().setInt("hbase.rpc.timeout", 10 * 1000);
 

Modified: hbase/branches/hbase-10070/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLogRolling.java
URL: http://svn.apache.org/viewvc/hbase/branches/hbase-10070/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLogRolling.java?rev=1568185&r1=1568184&r2=1568185&view=diff
==============================================================================
--- hbase/branches/hbase-10070/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLogRolling.java (original)
+++ hbase/branches/hbase-10070/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLogRolling.java Fri Feb 14 01:35:04 2014
@@ -124,7 +124,6 @@ public class TestLogRolling  {
     TEST_UTIL.getConfiguration().setInt("hbase.regionserver.maxlogentries", 32);
 
     TEST_UTIL.getConfiguration().setInt("hbase.regionserver.logroll.errors.tolerated", 2);
-    TEST_UTIL.getConfiguration().setInt("ipc.ping.interval", 10 * 1000);
     TEST_UTIL.getConfiguration().setInt("ipc.socket.timeout", 10 * 1000);
     TEST_UTIL.getConfiguration().setInt("hbase.rpc.timeout", 10 * 1000);