You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by en...@apache.org on 2014/03/05 07:35:22 UTC

svn commit: r1574385 - in /hbase/branches/hbase-10070: ./ dev-support/ hbase-client/src/main/java/org/apache/hadoop/hbase/client/ hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/ hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/ hbase-serv...

Author: enis
Date: Wed Mar  5 06:35:21 2014
New Revision: 1574385

URL: http://svn.apache.org/r1574385
Log:
HBASE-10525 Allow the client to use a different thread for writing to ease interrupt (Nicolas Liochon)

Modified:
    hbase/branches/hbase-10070/   (props changed)
    hbase/branches/hbase-10070/dev-support/findbugs-exclude.xml
    hbase/branches/hbase-10070/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionManager.java
    hbase/branches/hbase-10070/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClient.java
    hbase/branches/hbase-10070/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java
    hbase/branches/hbase-10070/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestHCM.java

Propchange: hbase/branches/hbase-10070/
------------------------------------------------------------------------------
  Merged /hbase/trunk:r1571210

Modified: hbase/branches/hbase-10070/dev-support/findbugs-exclude.xml
URL: http://svn.apache.org/viewvc/hbase/branches/hbase-10070/dev-support/findbugs-exclude.xml?rev=1574385&r1=1574384&r2=1574385&view=diff
==============================================================================
--- hbase/branches/hbase-10070/dev-support/findbugs-exclude.xml (original)
+++ hbase/branches/hbase-10070/dev-support/findbugs-exclude.xml Wed Mar  5 06:35:21 2014
@@ -115,6 +115,14 @@
   </Match>
 
   <Match>
+    <Class name="org.apache.hadoop.hbase.ipc.RpcClient$Connection"/>
+    <Or>
+      <Bug pattern="IS2_INCONSISTENT_SYNC"/>
+      <Bug pattern="NN_NAKED_NOTIFY"/>
+    </Or>
+  </Match>
+
+  <Match>
     <Class name="org.apache.hadoop.hbase.regionserver.HRegion"/>
     <Or>
       <Method name="startRegionOperation"/>

Modified: hbase/branches/hbase-10070/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionManager.java
URL: http://svn.apache.org/viewvc/hbase/branches/hbase-10070/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionManager.java?rev=1574385&r1=1574384&r2=1574385&view=diff
==============================================================================
--- hbase/branches/hbase-10070/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionManager.java (original)
+++ hbase/branches/hbase-10070/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionManager.java Wed Mar  5 06:35:21 2014
@@ -738,6 +738,7 @@ class ConnectionManager {
      * @param rpcClient Client we should use instead.
      * @return Previous rpcClient
      */
+    @VisibleForTesting
     RpcClient setRpcClient(final RpcClient rpcClient) {
       RpcClient oldRpcClient = this.rpcClient;
       this.rpcClient = rpcClient;
@@ -745,6 +746,14 @@ class ConnectionManager {
     }
 
     /**
+     * For tests only.
+     */
+    @VisibleForTesting
+    RpcClient getRpcClient() {
+      return rpcClient;
+    }
+
+    /**
      * An identifier that will remain the same for a given connection.
      * @return
      */

Modified: hbase/branches/hbase-10070/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClient.java
URL: http://svn.apache.org/viewvc/hbase/branches/hbase-10070/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClient.java?rev=1574385&r1=1574384&r2=1574385&view=diff
==============================================================================
--- hbase/branches/hbase-10070/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClient.java (original)
+++ hbase/branches/hbase-10070/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClient.java Wed Mar  5 06:35:21 2014
@@ -70,11 +70,13 @@ import org.apache.hadoop.security.token.
 import org.apache.hadoop.security.token.TokenSelector;
 import org.cloudera.htrace.Span;
 import org.cloudera.htrace.Trace;
+import org.cloudera.htrace.TraceScope;
 
 import javax.net.SocketFactory;
 import javax.security.sasl.SaslException;
 import java.io.BufferedInputStream;
 import java.io.BufferedOutputStream;
+import java.io.Closeable;
 import java.io.DataInputStream;
 import java.io.DataOutputStream;
 import java.io.IOException;
@@ -96,6 +98,8 @@ import java.util.LinkedList;
 import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Random;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.ConcurrentSkipListMap;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
@@ -115,13 +119,13 @@ public class RpcClient {
   protected final AtomicInteger callIdCnt = new AtomicInteger();
   protected final AtomicBoolean running = new AtomicBoolean(true); // if client runs
   final protected Configuration conf;
-  final protected int minIdleTimeBeforeClose; // if the connection is iddle for more than this
+  protected final int minIdleTimeBeforeClose; // if the connection is iddle for more than this
                                                // time (in ms), it will be closed at any moment.
   final protected int maxRetries; //the max. no. of retries for socket connections
   final protected long failureSleep; // Time to sleep before retry on failure.
   protected final boolean tcpNoDelay; // if T then disable Nagle's Algorithm
   protected final boolean tcpKeepAlive; // if T then use keepalives
-  protected FailedServers failedServers;
+  protected final FailedServers failedServers;
   private final Codec codec;
   private final CompressionCodec compressor;
   private final IPCUtil ipcUtil;
@@ -140,10 +144,14 @@ public class RpcClient {
   public final static String FAILED_SERVER_EXPIRY_KEY = "hbase.ipc.client.failed.servers.expiry";
   public final static int FAILED_SERVER_EXPIRY_DEFAULT = 2000;
 
+  public final static String IDLE_TIME = "hbase.ipc.client.connection.minIdleTimeBeforeClose";
+
   public static final String IPC_CLIENT_FALLBACK_TO_SIMPLE_AUTH_ALLOWED_KEY =
       "hbase.ipc.client.fallback-to-simple-auth-allowed";
   public static final boolean IPC_CLIENT_FALLBACK_TO_SIMPLE_AUTH_ALLOWED_DEFAULT = false;
 
+  public static final String ALLOWS_INTERRUPTS = "hbase.ipc.client.allowsInterrupt";
+
   // thread-specific RPC timeout, which may override that of what was passed in.
   // This is used to change dynamically the timeout (for read only) when retrying: if
   //  the time allowed for the operation is less than the usual socket timeout, then
@@ -224,15 +232,6 @@ public class RpcClient {
   }
 
   /**
-   * Set the socket timeout
-   * @param conf Configuration
-   * @param socketTimeout the socket timeout
-   */
-  public static void setSocketTimeout(Configuration conf, int socketTimeout) {
-    conf.setInt(SOCKET_TIMEOUT, socketTimeout);
-  }
-
-  /**
    * @return the socket timeout
    */
   static int getSocketTimeout(Configuration conf) {
@@ -252,7 +251,7 @@ public class RpcClient {
     // The return type.  Used to create shell into which we deserialize the response if any.
     Message responseDefaultType;
     IOException error;                            // exception, null if value
-    boolean done;                                 // true when call is done
+    volatile boolean done;                                 // true when call is done
     long startTime;
     final MethodDescriptor md;
 
@@ -261,7 +260,7 @@ public class RpcClient {
       this.param = param;
       this.md = md;
       this.cells = cells;
-      this.startTime = System.currentTimeMillis();
+      this.startTime = EnvironmentEdgeManager.currentTimeMillis();
       this.responseDefaultType = responseDefaultType;
       this.id = callIdCnt.getAndIncrement();
     }
@@ -325,6 +324,24 @@ public class RpcClient {
     return new Connection(remoteId, codec, compressor);
   }
 
+  /**
+   * see {@link org.apache.hadoop.hbase.ipc.RpcClient.Connection.CallSender}
+   */
+  private static class CallFuture {
+    Call call;
+    int priority;
+    Span span;
+
+    // We will use this to stop the writer
+    final static CallFuture DEATH_PILL = new CallFuture(null, -1, null);
+
+    CallFuture(Call call, int priority, Span span) {
+      this.call = call;
+      this.priority = priority;
+      this.span = span;
+    }
+  }
+
   /** Thread that reads responses and notifies callers.  Each connection owns a
    * socket connected to a remote address.  Calls are multiplexed through this
    * socket: responses may be delivered out of order. */
@@ -349,6 +366,123 @@ public class RpcClient {
       new ConcurrentSkipListMap<Integer, Call>();
 
     protected final AtomicBoolean shouldCloseConnection = new AtomicBoolean();
+    protected final CallSender callSender;
+
+
+    /**
+     * If the client wants to interrupt its calls easily (i.e. call Thread#interrupt),
+     *  it gets into a java issue: an interruption during a write closes the socket/channel.
+     * A way to avoid this is to use a different thread for writing. This way, on interruptions,
+     *  we either cancel the writes or ignore the answer if the write is already done, but we
+     *  don't stop the write in the middle.
+     * This adds a thread per region server in the client, so it's kept as an option.
+     * <p>
+     * The implementation is simple: the client threads adds their call to the queue, and then
+     *  wait for an answer. The CallSender blocks on the queue, and writes the calls one
+     *  after the other. On interruption, the client cancels its call. The CallSender checks that
+     *  the call has not been canceled before writing it.
+     * </p>
+     * When the connection closes, all the calls not yet sent are dismissed. The client thread
+     *  is notified with an appropriate exception, as if the call was already sent but the answer
+     *  not yet received.
+     * </p>
+     */
+    private class CallSender extends Thread implements Closeable {
+      protected final BlockingQueue<CallFuture> callsToWrite;
+
+
+      public CallFuture sendCall(Call call, int priority, Span span)
+          throws InterruptedException, IOException {
+        CallFuture cts = new CallFuture(call, priority, span);
+        callsToWrite.add(cts);
+        checkIsOpen(); // We check after the put, to be sure that the call we added won't stay
+                       //  in the list while the cleanup was already done.
+        return cts;
+      }
+
+      public void close(){
+        assert shouldCloseConnection.get();
+        callsToWrite.offer(CallFuture.DEATH_PILL);
+        // We don't care if we can't add the death pill to the queue: the writer
+        //  won't be blocked in the 'take', as its queue is full.
+      }
+
+      CallSender(String name, Configuration conf) {
+        int queueSize = conf.getInt("hbase.ipc.client.write.queueSize", 1000);
+        callsToWrite = new ArrayBlockingQueue<CallFuture>(queueSize);
+        setDaemon(true);
+        setName(name + " - writer");
+      }
+
+      public void cancel(CallFuture cts){
+        cts.call.done = true;
+        callsToWrite.remove(cts);
+        calls.remove(cts.call.id);
+      }
+
+      /**
+       * Reads the call from the queue, write them on the socket.
+       */
+      @Override
+      public void run() {
+        while (!shouldCloseConnection.get()) {
+          CallFuture cts = null;
+          try {
+            cts = callsToWrite.take();
+          } catch (InterruptedException e) {
+            markClosed(new InterruptedIOException());
+          }
+
+          if (cts == null || cts == CallFuture.DEATH_PILL){
+            assert shouldCloseConnection.get();
+            break;
+          }
+
+          if (cts.call.done) {
+            continue;
+          }
+
+          if (remoteId.rpcTimeout > 0) {
+            long waitTime = EnvironmentEdgeManager.currentTimeMillis() - cts.call.getStartTime();
+            if (waitTime >= remoteId.rpcTimeout) {
+              IOException ie = new CallTimeoutException("Call id=" + cts.call.id +
+                  ", waitTime=" + waitTime + ", rpcTimetout=" + remoteId.rpcTimeout +
+                  ", expired before being sent to the server.");
+              cts.call.setException(ie); // includes a notify
+              continue;
+            }
+          }
+
+          try {
+            Connection.this.tracedWriteRequest(cts.call, cts.priority, cts.span);
+          } catch (IOException e) {
+            LOG.warn("call write error for call #" + cts.call.id + ", message =" + e.getMessage());
+            cts.call.setException(e);
+            markClosed(e);
+          }
+        }
+
+        cleanup();
+      }
+
+      /**
+       * Cleans the call not yet sent when we finish.
+       */
+      private void cleanup() {
+        assert shouldCloseConnection.get();
+
+        IOException ie = new IOException("Connection to " + server + " is closing.");
+        while (true) {
+          CallFuture cts = callsToWrite.poll();
+          if (cts == null) {
+            break;
+          }
+          if (cts.call != null && !cts.call.done) {
+            cts.call.setException(ie);
+          }
+        }
+      }
+    }
 
     Connection(ConnectionId remoteId, final Codec codec, final CompressionCodec compressor)
     throws IOException {
@@ -421,6 +555,13 @@ public class RpcClient {
         ((ticket==null)?" from an unknown user": (" from "
         + ticket.getUserName())));
       this.setDaemon(true);
+
+      if (conf.getBoolean(ALLOWS_INTERRUPTS, false)) {
+        callSender = new CallSender(getName(), conf);
+        callSender.start();
+      } else {
+        callSender = null;
+      }
     }
 
     private UserInformation getUserInfo(UserGroupInformation ugi) {
@@ -470,7 +611,7 @@ public class RpcClient {
       }
     }
 
-    protected void closeConnection() {
+    protected synchronized void closeConnection() {
       if (socket == null) {
         return;
       }
@@ -556,23 +697,38 @@ public class RpcClient {
      *
      * Return true if it is time to read a response; false otherwise.
      */
-    protected synchronized boolean waitForWork() throws InterruptedException{
-      while (calls.isEmpty() && !shouldCloseConnection.get() && running.get() ) {
-        wait(minIdleTimeBeforeClose);
+    protected synchronized boolean waitForWork() throws InterruptedException {
+      // beware of the concurrent access to the calls list: we can add calls, but as well
+      //  remove them.
+      long waitUntil = EnvironmentEdgeManager.currentTimeMillis() + minIdleTimeBeforeClose;
+      while (!shouldCloseConnection.get() && running.get() &&
+          EnvironmentEdgeManager.currentTimeMillis() < waitUntil && calls.isEmpty()) {
+        wait(Math.min(minIdleTimeBeforeClose, 1000));
       }
 
-      if (!calls.isEmpty() && !shouldCloseConnection.get() && running.get()) {
-        return true;
-      } else if (shouldCloseConnection.get()) {
-        return false;
-      } else if (calls.isEmpty()) {
-        markClosed(new IOException("idle connection closed or stopped"));
+      if (shouldCloseConnection.get()) {
         return false;
-      } else { // get stopped but there are still pending requests
-        markClosed((IOException)new IOException().initCause(
-            new InterruptedException()));
+      }
+
+      if (!running.get()) {
+        markClosed(new IOException("stopped with " + calls.size() + " pending request(s)"));
         return false;
       }
+
+      if (!calls.isEmpty()) {
+        // shouldCloseConnection can be set to true by a parallel thread here. The caller
+        //  will need to check anyway.
+        return true;
+      }
+
+      // Connection is idle.
+      // We expect the number of calls to be zero here, but actually someone can
+      //  adds a call at the any moment, as there is no synchronization between this task
+      //  and adding new calls. It's not a big issue, but it will get an exception.
+      markClosed(new IOException(
+          "idle connection closed with " + calls.size() + " pending request(s)"));
+
+      return false;
     }
 
     public InetSocketAddress getRemoteAddress() {
@@ -590,7 +746,7 @@ public class RpcClient {
           readResponse();
         }
       } catch (Throwable t) {
-        LOG.warn(getName() + ": unexpected exception receiving call responses", t);
+        LOG.debug(getName() + ": unexpected exception receiving call responses", t);
         markClosed(new IOException("Unexpected exception receiving call responses", t));
       }
 
@@ -811,9 +967,8 @@ public class RpcClient {
 
     /**
      * Write the connection header.
-     * Out is not synchronized because only the first thread does this.
      */
-    private void writeConnectionHeader() throws IOException {
+    private synchronized void writeConnectionHeader() throws IOException {
       synchronized (this.out) {
         this.out.writeInt(this.header.getSerializedSize());
         this.header.writeTo(this.out);
@@ -852,8 +1007,18 @@ public class RpcClient {
 
       cleanupCalls();
 
-      if (LOG.isDebugEnabled())
-        LOG.debug(getName() + ": ipc connection closed");
+      if (LOG.isDebugEnabled()) {
+        LOG.debug(getName() + ": ipc connection to " + server + " closed");
+      }
+    }
+
+    protected void tracedWriteRequest(Call call, int priority, Span span) throws IOException {
+      TraceScope ts = Trace.continueSpan(span);
+      try {
+        writeRequest(call, priority, span);
+      } finally {
+        ts.close();
+      }
     }
 
     /**
@@ -864,15 +1029,12 @@ public class RpcClient {
      * @param priority
      * @see #readResponse()
      */
-    @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "NN_NAKED_NOTIFY",
-        justification = "on close the reader thread must stop")
-    protected void writeRequest(Call call, final int priority) throws IOException {
+    private void writeRequest(Call call, final int priority, Span span) throws IOException {
       RequestHeader.Builder builder = RequestHeader.newBuilder();
       builder.setCallId(call.id);
-      if (Trace.isTracing()) {
-        Span s = Trace.currentSpan();
-        builder.setTraceInfo(RPCTInfo.newBuilder().
-            setParentId(s.getSpanId()).setTraceId(s.getTraceId()));
+      if (span != null) {
+        builder.setTraceInfo(
+            RPCTInfo.newBuilder().setParentId(span.getSpanId()).setTraceId(span.getTraceId()));
       }
       builder.setMethodName(call.md.getName());
       builder.setRequestParam(call.param != null);
@@ -890,28 +1052,32 @@ public class RpcClient {
       //  is still valid, and, if so we do the write to the socket. If the write fails, we don't
       //  know where we stand, we have to close the connection.
       checkIsOpen();
-      calls.put(call.id, call);  // On error, the call will be removed by the timeout.
-      try {
-        synchronized (this.out) { // FindBugs IS2_INCONSISTENT_SYNC
-          if (Thread.interrupted()) throw new InterruptedIOException();
-          checkIsOpen();
+      IOException writeException = null;
+      synchronized (this.out) {
+        if (Thread.interrupted()) throw new InterruptedIOException();
 
-          try {
-            IPCUtil.write(this.out, header, call.param, cellBlock);
-          } catch (IOException e) {
-            // We set the value inside the synchronized block, this way the next in line
-            //  won't even try to write
-            shouldCloseConnection.set(true);
-            throw e;
-          }
-        }
-      } finally {
-        synchronized (this) {
-          // We added a call, and may start the connection clode. In both cases, we
-          //  need to notify the reader.
-          notifyAll();
+        calls.put(call.id, call); // We put first as we don't want the connection to become idle.
+        checkIsOpen(); // Now we're checking that it didn't became idle in between.
+
+        try {
+          IPCUtil.write(this.out, header, call.param, cellBlock);
+        } catch (IOException e) {
+          // We set the value inside the synchronized block, this way the next in line
+          //  won't even try to write
+          shouldCloseConnection.set(true);
+          writeException = e;
         }
       }
+
+      // We added a call, and may be started the connection close. In both cases, we
+      //  need to notify the reader.
+      synchronized (this) {
+        notifyAll();
+      }
+
+      // Now that we notified, we can rethrow the exception if any. Otherwise we're good.
+      if (writeException != null) throw writeException;
+
       if (LOG.isDebugEnabled()) {
         LOG.debug(getName() + ": wrote request header " + TextFormat.shortDebugString(header));
       }
@@ -922,7 +1088,7 @@ public class RpcClient {
      */
     protected void readResponse() {
       if (shouldCloseConnection.get()) return;
-      int totalSize = -1;
+      int totalSize;
       try {
         // See HBaseServer.Call.setResponse for where we write out the response.
         // Total size of the response.  Unused.  But have to read it in anyways.
@@ -936,7 +1102,8 @@ public class RpcClient {
             TextFormat.shortDebugString(responseHeader) + ", totalSize: " + totalSize + " bytes");
         }
         Call call = calls.remove(id);
-        if (call == null) {
+        boolean expectedCall = (call != null && !call.done);
+        if (!expectedCall) {
           // So we got a response for which we have no corresponding 'call' here on the client-side.
           // We probably timed out waiting, cleaned up all references, and now the server decides
           // to return a response.  There is nothing we can do w/ the response at this stage. Clean
@@ -945,7 +1112,7 @@ public class RpcClient {
           int readSoFar = IPCUtil.getTotalSizeWhenWrittenDelimited(responseHeader);
           int whatIsLeftToRead = totalSize - readSoFar;
           LOG.debug("Unknown callId: " + id + ", skipping over this response of " +
-            whatIsLeftToRead + " bytes");
+              whatIsLeftToRead + " bytes");
           IOUtils.skipFully(in, whatIsLeftToRead);
         }
         if (responseHeader.hasException()) {
@@ -954,12 +1121,12 @@ public class RpcClient {
           if (isFatalConnectionException(exceptionResponse)) {
             markClosed(re);
           } else {
-            if (call != null) call.setException(re);
+            if (expectedCall) call.setException(re);
           }
         } else {
           Message value = null;
           // Call may be null because it may have timedout and been cleaned up on this side already
-          if (call != null && call.responseDefaultType != null) {
+          if (expectedCall && call.responseDefaultType != null) {
             Builder builder = call.responseDefaultType.newBuilderForType();
             builder.mergeDelimitedFrom(in);
             value = builder.build();
@@ -973,7 +1140,7 @@ public class RpcClient {
           }
           // it's possible that this call may have been cleaned up due to a RPC
           // timeout, so check if it still exists before setting the value.
-          if (call != null) call.setResponse(value, cellBlockScanner);
+          if (expectedCall) call.setResponse(value, cellBlockScanner);
         }
       } catch (IOException e) {
         if (e instanceof SocketTimeoutException && remoteId.rpcTimeout > 0) {
@@ -985,9 +1152,7 @@ public class RpcClient {
           markClosed(e);
         }
       } finally {
-        if (remoteId.rpcTimeout > 0) {
-          cleanupCalls(remoteId.rpcTimeout);
-        }
+        cleanupCalls(remoteId.rpcTimeout);
       }
     }
 
@@ -1015,34 +1180,42 @@ public class RpcClient {
           e.getStackTrace(), doNotRetry);
     }
 
-    @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "NN_NAKED_NOTIFY",
-        justification = "on close the reader thread must stop")
-    protected void markClosed(IOException e) {
+    protected synchronized void markClosed(IOException e) {
       if (e == null) throw new NullPointerException();
 
       if (shouldCloseConnection.compareAndSet(false, true)) {
+        LOG.warn(getName() + ": marking at should close, reason =" + e.getMessage());
         if (LOG.isDebugEnabled()) {
-          LOG.debug(getName() + ": marking at should closed, reason =" + e.getMessage());
+          LOG.debug(getName() + ": marking at should close, reason =" + e.getMessage());
         }
-        synchronized (this) {
-          notifyAll();
+        if (callSender != null) {
+          callSender.close();
         }
+        notifyAll();
       }
     }
 
     /* Cleanup all calls and mark them as done */
     protected void cleanupCalls() {
-      cleanupCalls(0);
+      cleanupCalls(-1);
     }
 
-    @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="NN_NAKED_NOTIFY",
-      justification="Notify because timeout")
-    protected void cleanupCalls(long rpcTimeout) {
+    /**
+     * Cleanup the calls older than a given timeout, in milli seconds.
+     * @param rpcTimeout -1 for all calls, > 0 otherwise. 0 means no timeout and does nothing.
+     */
+    protected synchronized void cleanupCalls(long rpcTimeout) {
+      if (rpcTimeout == 0) return;
+
       Iterator<Entry<Integer, Call>> itor = calls.entrySet().iterator();
       while (itor.hasNext()) {
         Call c = itor.next().getValue();
-        long waitTime = System.currentTimeMillis() - c.getStartTime();
-        if (waitTime >= rpcTimeout) {
+        long waitTime = EnvironmentEdgeManager.currentTimeMillis() - c.getStartTime();
+        if (rpcTimeout < 0) {
+          IOException ie = new IOException("Call id=" + c.id + ", waitTime=" + waitTime);
+          c.setException(ie);
+          itor.remove();
+        } else if (waitTime >= rpcTimeout) {
           IOException ie = new CallTimeoutException("Call id=" + c.id +
               ", waitTime=" + waitTime + ", rpcTimeout=" + rpcTimeout);
           c.setException(ie);
@@ -1050,36 +1223,21 @@ public class RpcClient {
         } else {
           // This relies on the insertion order to be the call id order. This is not
           //  true under 'difficult' conditions (gc, ...).
+          rpcTimeout -= waitTime;
           break;
         }
       }
 
-      if (!calls.isEmpty()) {
-        Call firstCall = calls.get(calls.firstKey());
-        long maxWaitTime = System.currentTimeMillis() - firstCall.getStartTime();
-        if (maxWaitTime < rpcTimeout) {
-          rpcTimeout -= maxWaitTime;
-        }
-      }
-
-      try {
-        if (!shouldCloseConnection.get()) {
-          setSocketTimeout(socket, (int) rpcTimeout);
+      if (!shouldCloseConnection.get() && socket != null && rpcTimeout > 0) {
+        try {
+          socket.setSoTimeout((int)rpcTimeout);
+        } catch (SocketException e) {
+          LOG.warn("Couldn't change timeout, which may result in longer than expected calls");
         }
-      } catch (SocketException e) {
-        LOG.warn("Couldn't lower timeout, which may result in longer than expected calls");
       }
     }
   }
 
-  @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="IS2_INCONSISTENT_SYNC",
-    justification="Presume sync not needed setting socket timeout")
-  private static void setSocketTimeout(final Socket socket, final int rpcTimeout)
-  throws java.net.SocketException {
-    if (socket == null) return;
-    socket.setSoTimeout(rpcTimeout);
-  }
-
   /**
    * Client-side call timeout
    */
@@ -1110,8 +1268,7 @@ public class RpcClient {
    * @param localAddr client socket bind address
    */
   RpcClient(Configuration conf, String clusterId, SocketFactory factory, SocketAddress localAddr) {
-    this.minIdleTimeBeforeClose =
-        conf.getInt("hbase.ipc.client.connection.minIdleTimeBeforeClose", 120000); // 2 minutes
+    this.minIdleTimeBeforeClose = conf.getInt(IDLE_TIME, 120000); // 2 minutes
     this.maxRetries = conf.getInt("hbase.ipc.client.connect.max.retries", 0);
     this.failureSleep = conf.getLong(HConstants.HBASE_CLIENT_PAUSE,
         HConstants.DEFAULT_HBASE_CLIENT_PAUSE);
@@ -1289,31 +1446,44 @@ public class RpcClient {
    * @throws IOException
    */
   Pair<Message, CellScanner> call(MethodDescriptor md, Message param, CellScanner cells,
-      Message returnType, User ticket, InetSocketAddress addr,
-      int rpcTimeout, int priority)
-  throws InterruptedException, IOException {
+      Message returnType, User ticket, InetSocketAddress addr, int rpcTimeout, int priority)
+      throws IOException, InterruptedException {
     Call call = new Call(md, param, cells, returnType);
     Connection connection =
       getConnection(ticket, call, addr, rpcTimeout, this.codec, this.compressor);
 
-    connection.writeRequest(call, priority);
+    CallFuture cts = null;
+    if (connection.callSender != null){
+      cts = connection.callSender.sendCall(call, priority, Trace.currentSpan());
+    } else {
+      connection.tracedWriteRequest(call, priority, Trace.currentSpan());
+    }
 
-    //noinspection SynchronizationOnLocalVariableOrMethodParameter
-    synchronized (call) {
-      while (!call.done) {
-        call.wait(1000);                       // wait for the result
+    while (!call.done) {
+      try {
+        synchronized (call) {
+          call.wait(1000);  // wait for the result. We will be notified by the reader.
+        }
+      } catch (InterruptedException e) {
+        if (cts != null) {
+          connection.callSender.cancel(cts);
+        } else {
+          call.done = true;
+        }
+        throw e;
       }
+    }
 
-      if (call.error != null) {
-        if (call.error instanceof RemoteException) {
-          call.error.fillInStackTrace();
-          throw call.error;
-        }
-        // local exception
-        throw wrapException(addr, call.error);
+    if (call.error != null) {
+      if (call.error instanceof RemoteException) {
+        call.error.fillInStackTrace();
+        throw call.error;
       }
-      return new Pair<Message, CellScanner>(call.response, call.cells);
+      // local exception
+      throw wrapException(addr, call.error);
     }
+
+    return new Pair<Message, CellScanner>(call.response, call.cells);
   }
 
 
@@ -1361,9 +1531,8 @@ public class RpcClient {
             connection.getRemoteAddress().getHostName().equals(hostname)) {
           LOG.info("The server on " + hostname + ":" + port +
               " is dead - stopping the connection " + connection.remoteId);
-          connection.closeConnection();
-          // We could do a connection.interrupt(), but it's safer not to do it, as the
-          //  interrupted exception behavior is not defined nor enforced enough.
+          connection.interrupt(); // We're interrupting a Reader. It means we want it to finish.
+                                  // This will close the connection as well.
         }
       }
     }
@@ -1465,10 +1634,6 @@ public class RpcClient {
     rpcTimeout.set(t);
   }
 
-  public static int getRpcTimeout() {
-    return rpcTimeout.get();
-  }
-
   /**
    * Returns the lower of the thread-local RPC time from {@link #setRpcTimeout(int)} and the given
    * default timeout.
@@ -1484,18 +1649,10 @@ public class RpcClient {
   /**
    * Make a blocking call. Throws exceptions if there are network problems or if the remote code
    * threw an exception.
-   * @param md
-   * @param controller
-   * @param param
-   * @param returnType
-   * @param isa
    * @param ticket Be careful which ticket you pass. A new user will mean a new Connection.
    *          {@link UserProvider#getCurrent()} makes a new instance of User each time so will be a
    *          new Connection each time.
-   * @param rpcTimeout
    * @return A pair with the Message response and the Cell data (if any).
-   * @throws InterruptedException
-   * @throws IOException
    */
   Message callBlockingMethod(MethodDescriptor md, RpcController controller,
       Message param, Message returnType, final User ticket, final InetSocketAddress isa,
@@ -1503,7 +1660,7 @@ public class RpcClient {
   throws ServiceException {
     long startTime = 0;
     if (LOG.isTraceEnabled()) {
-      startTime = System.currentTimeMillis();
+      startTime = EnvironmentEdgeManager.currentTimeMillis();
     }
     PayloadCarryingRpcController pcrc = (PayloadCarryingRpcController)controller;
     CellScanner cells = null;
@@ -1524,10 +1681,8 @@ public class RpcClient {
       }
 
       if (LOG.isTraceEnabled()) {
-        long callTime = System.currentTimeMillis() - startTime;
-        if (LOG.isTraceEnabled()) {
-          LOG.trace("Call: " + md.getName() + ", callTime: " + callTime + "ms");
-        }
+        long callTime = EnvironmentEdgeManager.currentTimeMillis() - startTime;
+        LOG.trace("Call: " + md.getName() + ", callTime: " + callTime + "ms");
       }
       return val.getFirst();
     } catch (Throwable e) {
@@ -1538,9 +1693,6 @@ public class RpcClient {
   /**
    * Creates a "channel" that can be used by a blocking protobuf service.  Useful setting up
    * protobuf blocking stubs.
-   * @param sn
-   * @param ticket
-   * @param rpcTimeout
    * @return A blocking rpc channel that goes via this rpc client instance.
    */
   public BlockingRpcChannel createBlockingRpcChannel(final ServerName sn,
@@ -1551,10 +1703,10 @@ public class RpcClient {
   /**
    * Blocking rpc channel that goes via hbase rpc.
    */
-  // Public so can be subclassed for tests.
+  @VisibleForTesting
   public static class BlockingRpcChannelImplementation implements BlockingRpcChannel {
     private final InetSocketAddress isa;
-    private volatile RpcClient rpcClient;
+    private final RpcClient rpcClient;
     private final int rpcTimeout;
     private final User ticket;
 

Modified: hbase/branches/hbase-10070/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java
URL: http://svn.apache.org/viewvc/hbase/branches/hbase-10070/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java?rev=1574385&r1=1574384&r2=1574385&view=diff
==============================================================================
--- hbase/branches/hbase-10070/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java (original)
+++ hbase/branches/hbase-10070/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java Wed Mar  5 06:35:21 2014
@@ -791,7 +791,7 @@ public class RpcServer implements RpcSer
       } catch (InterruptedException ieo) {
         throw ieo;
       } catch (Exception e) {
-        LOG.warn(getName() + ": count of bytes read: " + count, e);
+        LOG.info(getName() + ": count of bytes read: " + count, e);
         count = -1; //so that the (count < 0) block is executed
       }
       if (count < 0) {

Modified: hbase/branches/hbase-10070/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestHCM.java
URL: http://svn.apache.org/viewvc/hbase/branches/hbase-10070/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestHCM.java?rev=1574385&r1=1574384&r2=1574385&view=diff
==============================================================================
--- hbase/branches/hbase-10070/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestHCM.java (original)
+++ hbase/branches/hbase-10070/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestHCM.java Wed Mar  5 06:35:21 2014
@@ -39,6 +39,8 @@ import java.util.concurrent.SynchronousQ
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -57,6 +59,7 @@ import org.apache.hadoop.hbase.exception
 import org.apache.hadoop.hbase.exceptions.RegionMovedException;
 import org.apache.hadoop.hbase.filter.Filter;
 import org.apache.hadoop.hbase.filter.FilterBase;
+import org.apache.hadoop.hbase.ipc.RpcClient;
 import org.apache.hadoop.hbase.master.HMaster;
 import org.apache.hadoop.hbase.regionserver.HRegion;
 import org.apache.hadoop.hbase.regionserver.HRegionServer;
@@ -229,11 +232,156 @@ public class TestHCM {
   }
 
   /**
-   * Test that the connection to the dead server is cut immediately when we receive the
-   *  notification.
-   * @throws Exception
+   * Test that we can handle connection close: it will trigger a retry, but the calls will
+   *  finish.
+   */
+  @Test
+  public void testConnectionCloseAllowsInterrupt() throws Exception {
+    testConnectionClose(true);
+  }
+
+  @Test
+  public void testConnectionNotAllowsInterrupt() throws Exception {
+    testConnectionClose(false);
+  }
+
+  private void testConnectionClose(boolean allowsInterrupt) throws Exception {
+    String tableName = "HCM-testConnectionClose" + allowsInterrupt;
+    TEST_UTIL.createTable(tableName.getBytes(), FAM_NAM).close();
+
+    boolean previousBalance = TEST_UTIL.getHBaseAdmin().setBalancerRunning(false, true);
+
+    Configuration c2 = new Configuration(TEST_UTIL.getConfiguration());
+    // We want to work on a separate connection.
+    c2.set(HConstants.HBASE_CLIENT_INSTANCE_ID, String.valueOf(-1));
+    c2.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, Integer.MAX_VALUE - 1); // retry a lot
+    c2.setInt(HConstants.HBASE_CLIENT_PAUSE, 0); // don't wait between retries.
+    c2.setInt(RpcClient.FAILED_SERVER_EXPIRY_KEY, 0); // Server do not really expire
+    c2.setBoolean(RpcClient.ALLOWS_INTERRUPTS, allowsInterrupt);
+
+    final HTable table = new HTable(c2, tableName.getBytes());
+
+    Put put = new Put(ROW);
+    put.add(FAM_NAM, ROW, ROW);
+    table.put(put);
+
+    // 4 steps: ready=0; doGets=1; mustStop=2; stopped=3
+    final AtomicInteger step = new AtomicInteger(0);
+
+    final AtomicReference<Throwable> failed = new AtomicReference<Throwable>(null);
+    Thread t = new Thread("testConnectionCloseThread") {
+      public void run() {
+        int done = 0;
+        try {
+          step.set(1);
+          while (step.get() == 1) {
+            Get get = new Get(ROW);
+            table.get(get);
+            done++;
+            if (done % 100 == 0)
+              LOG.info("done=" + done);
+          }
+        } catch (Throwable t) {
+          failed.set(t);
+          LOG.error(t);
+        }
+        step.set(3);
+      }
+    };
+    t.start();
+    TEST_UTIL.waitFor(20000, new Waiter.Predicate<Exception>() {
+      @Override
+      public boolean evaluate() throws Exception {
+        return step.get() == 1;
+      }
+    });
+
+    ServerName sn = table.getRegionLocation(ROW).getServerName();
+    ConnectionManager.HConnectionImplementation conn =
+        (ConnectionManager.HConnectionImplementation) table.getConnection();
+    RpcClient rpcClient = conn.getRpcClient();
+
+    LOG.info("Going to cancel connections. connection=" + conn.toString() + ", sn=" + sn);
+    for (int i = 0; i < 5000; i++) {
+      rpcClient.cancelConnections(sn.getHostname(), sn.getPort(), null);
+      Thread.sleep(5);
+    }
+
+    step.compareAndSet(1, 2);
+    // The test may fail here if the thread doing the gets is stuck. The wait to find
+    //  out what's happening is to look for the thread named 'testConnectionCloseThread'
+    TEST_UTIL.waitFor(20000, new Waiter.Predicate<Exception>() {
+      @Override
+      public boolean evaluate() throws Exception {
+        return step.get() == 3;
+      }
+    });
+
+    Assert.assertTrue("Unexpected exception is " + failed.get(), failed.get() == null);
+    TEST_UTIL.getHBaseAdmin().setBalancerRunning(previousBalance, true);
+  }
+
+  /**
+   * Test that connection can become idle without breaking everything.
    */
   @Test
+  public void testConnectionIdle() throws Exception {
+    String tableName = "HCM-testConnectionIdle";
+    TEST_UTIL.createTable(tableName.getBytes(), FAM_NAM).close();
+    int idleTime =  20000;
+    boolean previousBalance = TEST_UTIL.getHBaseAdmin().setBalancerRunning(false, true);
+
+    Configuration c2 = new Configuration(TEST_UTIL.getConfiguration());
+    // We want to work on a separate connection.
+    c2.set(HConstants.HBASE_CLIENT_INSTANCE_ID, String.valueOf(-1));
+    c2.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 1); // Don't retry: retry = test failed
+    c2.setInt(RpcClient.IDLE_TIME, idleTime);
+
+    final HTable table = new HTable(c2, tableName.getBytes());
+
+    Put put = new Put(ROW);
+    put.add(FAM_NAM, ROW, ROW);
+    table.put(put);
+
+    ManualEnvironmentEdge mee = new ManualEnvironmentEdge();
+    mee.setValue(System.currentTimeMillis());
+    EnvironmentEdgeManager.injectEdge(mee);
+    LOG.info("first get");
+    table.get(new Get(ROW));
+
+    LOG.info("first get - changing the time & sleeping");
+    mee.incValue(idleTime + 1000);
+    Thread.sleep(1500); // we need to wait a little for the connection to be seen as idle.
+                        // 1500 = sleep time in RpcClient#waitForWork + a margin
+
+    LOG.info("second get - connection has been marked idle in the middle");
+    // To check that the connection actually became idle would need to read some private
+    //  fields of RpcClient.
+    table.get(new Get(ROW));
+    mee.incValue(idleTime + 1000);
+
+    LOG.info("third get - connection is idle, but the reader doesn't know yet");
+    // We're testing here a special case:
+    //  time limit reached BUT connection not yet reclaimed AND a new call.
+    //  in this situation, we don't close the connection, instead we use it immediately.
+    // If we're very unlucky we can have a race condition in the test: the connection is already
+    //  under closing when we do the get, so we have an exception, and we don't retry as the
+    //  retry number is 1. The probability is very very low, and seems acceptable for now. It's
+    //  a test issue only.
+    table.get(new Get(ROW));
+
+    LOG.info("we're done - time will change back");
+
+    EnvironmentEdgeManager.reset();
+    TEST_UTIL.getHBaseAdmin().setBalancerRunning(previousBalance, true);
+  }
+
+    /**
+     * Test that the connection to the dead server is cut immediately when we receive the
+     *  notification.
+     * @throws Exception
+     */
+  @Test
   public void testConnectionCut() throws Exception {
     String tableName = "HCM-testConnectionCut";