You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by mi...@apache.org on 2016/02/05 05:32:45 UTC

[35/51] [partial] hbase-site git commit: Published site at 18eff3c1c337003b2a419490e621f931d16936fb.

http://git-wip-us.apache.org/repos/asf/hbase-site/blob/a8725a46/devapidocs/src-html/org/apache/hadoop/hbase/ipc/RpcClientImpl.Connection.html
----------------------------------------------------------------------
diff --git a/devapidocs/src-html/org/apache/hadoop/hbase/ipc/RpcClientImpl.Connection.html b/devapidocs/src-html/org/apache/hadoop/hbase/ipc/RpcClientImpl.Connection.html
index b59d17d..caf98ca 100644
--- a/devapidocs/src-html/org/apache/hadoop/hbase/ipc/RpcClientImpl.Connection.html
+++ b/devapidocs/src-html/org/apache/hadoop/hbase/ipc/RpcClientImpl.Connection.html
@@ -907,428 +907,430 @@
 <span class="sourceLineNo">899</span>        cellBlockBuilder.setLength(cellBlock.limit());<a name="line.899"></a>
 <span class="sourceLineNo">900</span>        builder.setCellBlockMeta(cellBlockBuilder.build());<a name="line.900"></a>
 <span class="sourceLineNo">901</span>      }<a name="line.901"></a>
-<span class="sourceLineNo">902</span>      // Only pass priority if there one.  Let zero be same as no priority.<a name="line.902"></a>
-<span class="sourceLineNo">903</span>      if (priority != 0) builder.setPriority(priority);<a name="line.903"></a>
-<span class="sourceLineNo">904</span>      RequestHeader header = builder.build();<a name="line.904"></a>
-<span class="sourceLineNo">905</span><a name="line.905"></a>
-<span class="sourceLineNo">906</span>      setupIOstreams();<a name="line.906"></a>
+<span class="sourceLineNo">902</span>      // Only pass priority if there is one set.<a name="line.902"></a>
+<span class="sourceLineNo">903</span>      if (priority != PayloadCarryingRpcController.PRIORITY_UNSET) {<a name="line.903"></a>
+<span class="sourceLineNo">904</span>        builder.setPriority(priority);<a name="line.904"></a>
+<span class="sourceLineNo">905</span>      }<a name="line.905"></a>
+<span class="sourceLineNo">906</span>      RequestHeader header = builder.build();<a name="line.906"></a>
 <span class="sourceLineNo">907</span><a name="line.907"></a>
-<span class="sourceLineNo">908</span>      // Now we're going to write the call. We take the lock, then check that the connection<a name="line.908"></a>
-<span class="sourceLineNo">909</span>      //  is still valid, and, if so we do the write to the socket. If the write fails, we don't<a name="line.909"></a>
-<span class="sourceLineNo">910</span>      //  know where we stand, we have to close the connection.<a name="line.910"></a>
-<span class="sourceLineNo">911</span>      checkIsOpen();<a name="line.911"></a>
-<span class="sourceLineNo">912</span>      IOException writeException = null;<a name="line.912"></a>
-<span class="sourceLineNo">913</span>      synchronized (this.outLock) {<a name="line.913"></a>
-<span class="sourceLineNo">914</span>        if (Thread.interrupted()) throw new InterruptedIOException();<a name="line.914"></a>
-<span class="sourceLineNo">915</span><a name="line.915"></a>
-<span class="sourceLineNo">916</span>        calls.put(call.id, call); // We put first as we don't want the connection to become idle.<a name="line.916"></a>
-<span class="sourceLineNo">917</span>        checkIsOpen(); // Now we're checking that it didn't became idle in between.<a name="line.917"></a>
-<span class="sourceLineNo">918</span><a name="line.918"></a>
-<span class="sourceLineNo">919</span>        try {<a name="line.919"></a>
-<span class="sourceLineNo">920</span>          call.callStats.setRequestSizeBytes(IPCUtil.write(this.out, header, call.param,<a name="line.920"></a>
-<span class="sourceLineNo">921</span>              cellBlock));<a name="line.921"></a>
-<span class="sourceLineNo">922</span>        } catch (IOException e) {<a name="line.922"></a>
-<span class="sourceLineNo">923</span>          // We set the value inside the synchronized block, this way the next in line<a name="line.923"></a>
-<span class="sourceLineNo">924</span>          //  won't even try to write. Otherwise we might miss a call in the calls map?<a name="line.924"></a>
-<span class="sourceLineNo">925</span>          shouldCloseConnection.set(true);<a name="line.925"></a>
-<span class="sourceLineNo">926</span>          writeException = e;<a name="line.926"></a>
-<span class="sourceLineNo">927</span>          interrupt();<a name="line.927"></a>
-<span class="sourceLineNo">928</span>        }<a name="line.928"></a>
-<span class="sourceLineNo">929</span>      }<a name="line.929"></a>
-<span class="sourceLineNo">930</span><a name="line.930"></a>
-<span class="sourceLineNo">931</span>      // call close outside of the synchronized (outLock) to prevent deadlock - HBASE-14474<a name="line.931"></a>
-<span class="sourceLineNo">932</span>      if (writeException != null) {<a name="line.932"></a>
-<span class="sourceLineNo">933</span>        markClosed(writeException);<a name="line.933"></a>
-<span class="sourceLineNo">934</span>        close();<a name="line.934"></a>
-<span class="sourceLineNo">935</span>      }<a name="line.935"></a>
-<span class="sourceLineNo">936</span><a name="line.936"></a>
-<span class="sourceLineNo">937</span>      // We added a call, and may be started the connection close. In both cases, we<a name="line.937"></a>
-<span class="sourceLineNo">938</span>      //  need to notify the reader.<a name="line.938"></a>
-<span class="sourceLineNo">939</span>      doNotify();<a name="line.939"></a>
-<span class="sourceLineNo">940</span><a name="line.940"></a>
-<span class="sourceLineNo">941</span>      // Now that we notified, we can rethrow the exception if any. Otherwise we're good.<a name="line.941"></a>
-<span class="sourceLineNo">942</span>      if (writeException != null) throw writeException;<a name="line.942"></a>
-<span class="sourceLineNo">943</span>    }<a name="line.943"></a>
-<span class="sourceLineNo">944</span><a name="line.944"></a>
-<span class="sourceLineNo">945</span>    @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="NN_NAKED_NOTIFY",<a name="line.945"></a>
-<span class="sourceLineNo">946</span>        justification="Presume notifyAll is because we are closing/shutting down")<a name="line.946"></a>
-<span class="sourceLineNo">947</span>    private synchronized void doNotify() {<a name="line.947"></a>
-<span class="sourceLineNo">948</span>      // Make a separate method so can do synchronize and add findbugs annotation; only one<a name="line.948"></a>
-<span class="sourceLineNo">949</span>      // annotation at at time in source 1.7.<a name="line.949"></a>
-<span class="sourceLineNo">950</span>      notifyAll(); // Findbugs: NN_NAKED_NOTIFY<a name="line.950"></a>
-<span class="sourceLineNo">951</span>    }<a name="line.951"></a>
-<span class="sourceLineNo">952</span><a name="line.952"></a>
-<span class="sourceLineNo">953</span>    /* Receive a response.<a name="line.953"></a>
-<span class="sourceLineNo">954</span>     * Because only one receiver, so no synchronization on in.<a name="line.954"></a>
-<span class="sourceLineNo">955</span>     */<a name="line.955"></a>
-<span class="sourceLineNo">956</span>    protected void readResponse() {<a name="line.956"></a>
-<span class="sourceLineNo">957</span>      if (shouldCloseConnection.get()) return;<a name="line.957"></a>
-<span class="sourceLineNo">958</span>      Call call = null;<a name="line.958"></a>
-<span class="sourceLineNo">959</span>      boolean expectedCall = false;<a name="line.959"></a>
-<span class="sourceLineNo">960</span>      try {<a name="line.960"></a>
-<span class="sourceLineNo">961</span>        // See HBaseServer.Call.setResponse for where we write out the response.<a name="line.961"></a>
-<span class="sourceLineNo">962</span>        // Total size of the response.  Unused.  But have to read it in anyways.<a name="line.962"></a>
-<span class="sourceLineNo">963</span>        int totalSize = in.readInt();<a name="line.963"></a>
-<span class="sourceLineNo">964</span><a name="line.964"></a>
-<span class="sourceLineNo">965</span>        // Read the header<a name="line.965"></a>
-<span class="sourceLineNo">966</span>        ResponseHeader responseHeader = ResponseHeader.parseDelimitedFrom(in);<a name="line.966"></a>
-<span class="sourceLineNo">967</span>        int id = responseHeader.getCallId();<a name="line.967"></a>
-<span class="sourceLineNo">968</span>        call = calls.remove(id); // call.done have to be set before leaving this method<a name="line.968"></a>
-<span class="sourceLineNo">969</span>        expectedCall = (call != null &amp;&amp; !call.done);<a name="line.969"></a>
-<span class="sourceLineNo">970</span>        if (!expectedCall) {<a name="line.970"></a>
-<span class="sourceLineNo">971</span>          // So we got a response for which we have no corresponding 'call' here on the client-side.<a name="line.971"></a>
-<span class="sourceLineNo">972</span>          // We probably timed out waiting, cleaned up all references, and now the server decides<a name="line.972"></a>
-<span class="sourceLineNo">973</span>          // to return a response.  There is nothing we can do w/ the response at this stage. Clean<a name="line.973"></a>
-<span class="sourceLineNo">974</span>          // out the wire of the response so its out of the way and we can get other responses on<a name="line.974"></a>
-<span class="sourceLineNo">975</span>          // this connection.<a name="line.975"></a>
-<span class="sourceLineNo">976</span>          int readSoFar = IPCUtil.getTotalSizeWhenWrittenDelimited(responseHeader);<a name="line.976"></a>
-<span class="sourceLineNo">977</span>          int whatIsLeftToRead = totalSize - readSoFar;<a name="line.977"></a>
-<span class="sourceLineNo">978</span>          IOUtils.skipFully(in, whatIsLeftToRead);<a name="line.978"></a>
-<span class="sourceLineNo">979</span>          if (call != null) {<a name="line.979"></a>
-<span class="sourceLineNo">980</span>            call.callStats.setResponseSizeBytes(totalSize);<a name="line.980"></a>
-<span class="sourceLineNo">981</span>            call.callStats.setCallTimeMs(<a name="line.981"></a>
-<span class="sourceLineNo">982</span>                EnvironmentEdgeManager.currentTime() - call.callStats.getStartTime());<a name="line.982"></a>
-<span class="sourceLineNo">983</span>          }<a name="line.983"></a>
-<span class="sourceLineNo">984</span>          return;<a name="line.984"></a>
-<span class="sourceLineNo">985</span>        }<a name="line.985"></a>
-<span class="sourceLineNo">986</span>        if (responseHeader.hasException()) {<a name="line.986"></a>
-<span class="sourceLineNo">987</span>          ExceptionResponse exceptionResponse = responseHeader.getException();<a name="line.987"></a>
-<span class="sourceLineNo">988</span>          RemoteException re = createRemoteException(exceptionResponse);<a name="line.988"></a>
-<span class="sourceLineNo">989</span>          call.setException(re);<a name="line.989"></a>
-<span class="sourceLineNo">990</span>          call.callStats.setResponseSizeBytes(totalSize);<a name="line.990"></a>
-<span class="sourceLineNo">991</span>          call.callStats.setCallTimeMs(<a name="line.991"></a>
-<span class="sourceLineNo">992</span>              EnvironmentEdgeManager.currentTime() - call.callStats.getStartTime());<a name="line.992"></a>
-<span class="sourceLineNo">993</span>          if (isFatalConnectionException(exceptionResponse)) {<a name="line.993"></a>
-<span class="sourceLineNo">994</span>            markClosed(re);<a name="line.994"></a>
-<span class="sourceLineNo">995</span>          }<a name="line.995"></a>
-<span class="sourceLineNo">996</span>        } else {<a name="line.996"></a>
-<span class="sourceLineNo">997</span>          Message value = null;<a name="line.997"></a>
-<span class="sourceLineNo">998</span>          if (call.responseDefaultType != null) {<a name="line.998"></a>
-<span class="sourceLineNo">999</span>            Builder builder = call.responseDefaultType.newBuilderForType();<a name="line.999"></a>
-<span class="sourceLineNo">1000</span>            ProtobufUtil.mergeDelimitedFrom(builder, in);<a name="line.1000"></a>
-<span class="sourceLineNo">1001</span>            value = builder.build();<a name="line.1001"></a>
-<span class="sourceLineNo">1002</span>          }<a name="line.1002"></a>
-<span class="sourceLineNo">1003</span>          CellScanner cellBlockScanner = null;<a name="line.1003"></a>
-<span class="sourceLineNo">1004</span>          if (responseHeader.hasCellBlockMeta()) {<a name="line.1004"></a>
-<span class="sourceLineNo">1005</span>            int size = responseHeader.getCellBlockMeta().getLength();<a name="line.1005"></a>
-<span class="sourceLineNo">1006</span>            byte [] cellBlock = new byte[size];<a name="line.1006"></a>
-<span class="sourceLineNo">1007</span>            IOUtils.readFully(this.in, cellBlock, 0, cellBlock.length);<a name="line.1007"></a>
-<span class="sourceLineNo">1008</span>            cellBlockScanner = ipcUtil.createCellScanner(this.codec, this.compressor, cellBlock);<a name="line.1008"></a>
-<span class="sourceLineNo">1009</span>          }<a name="line.1009"></a>
-<span class="sourceLineNo">1010</span>          call.setResponse(value, cellBlockScanner);<a name="line.1010"></a>
-<span class="sourceLineNo">1011</span>          call.callStats.setResponseSizeBytes(totalSize);<a name="line.1011"></a>
-<span class="sourceLineNo">1012</span>          call.callStats.setCallTimeMs(<a name="line.1012"></a>
-<span class="sourceLineNo">1013</span>              EnvironmentEdgeManager.currentTime() - call.callStats.getStartTime());<a name="line.1013"></a>
-<span class="sourceLineNo">1014</span>        }<a name="line.1014"></a>
-<span class="sourceLineNo">1015</span>      } catch (IOException e) {<a name="line.1015"></a>
-<span class="sourceLineNo">1016</span>        if (expectedCall) call.setException(e);<a name="line.1016"></a>
-<span class="sourceLineNo">1017</span>        if (e instanceof SocketTimeoutException) {<a name="line.1017"></a>
-<span class="sourceLineNo">1018</span>          // Clean up open calls but don't treat this as a fatal condition,<a name="line.1018"></a>
-<span class="sourceLineNo">1019</span>          // since we expect certain responses to not make it by the specified<a name="line.1019"></a>
-<span class="sourceLineNo">1020</span>          // {@link ConnectionId#rpcTimeout}.<a name="line.1020"></a>
-<span class="sourceLineNo">1021</span>          if (LOG.isTraceEnabled()) LOG.trace("ignored", e);<a name="line.1021"></a>
-<span class="sourceLineNo">1022</span>        } else {<a name="line.1022"></a>
-<span class="sourceLineNo">1023</span>          // Treat this as a fatal condition and close this connection<a name="line.1023"></a>
-<span class="sourceLineNo">1024</span>          markClosed(e);<a name="line.1024"></a>
-<span class="sourceLineNo">1025</span>        }<a name="line.1025"></a>
-<span class="sourceLineNo">1026</span>      } finally {<a name="line.1026"></a>
-<span class="sourceLineNo">1027</span>        cleanupCalls(false);<a name="line.1027"></a>
-<span class="sourceLineNo">1028</span>      }<a name="line.1028"></a>
-<span class="sourceLineNo">1029</span>    }<a name="line.1029"></a>
-<span class="sourceLineNo">1030</span><a name="line.1030"></a>
-<span class="sourceLineNo">1031</span>    /**<a name="line.1031"></a>
-<span class="sourceLineNo">1032</span>     * @return True if the exception is a fatal connection exception.<a name="line.1032"></a>
-<span class="sourceLineNo">1033</span>     */<a name="line.1033"></a>
-<span class="sourceLineNo">1034</span>    private boolean isFatalConnectionException(final ExceptionResponse e) {<a name="line.1034"></a>
-<span class="sourceLineNo">1035</span>      return e.getExceptionClassName().<a name="line.1035"></a>
-<span class="sourceLineNo">1036</span>        equals(FatalConnectionException.class.getName());<a name="line.1036"></a>
-<span class="sourceLineNo">1037</span>    }<a name="line.1037"></a>
-<span class="sourceLineNo">1038</span><a name="line.1038"></a>
-<span class="sourceLineNo">1039</span>    /**<a name="line.1039"></a>
-<span class="sourceLineNo">1040</span>     * @param e exception to be wrapped<a name="line.1040"></a>
-<span class="sourceLineNo">1041</span>     * @return RemoteException made from passed &lt;code&gt;e&lt;/code&gt;<a name="line.1041"></a>
-<span class="sourceLineNo">1042</span>     */<a name="line.1042"></a>
-<span class="sourceLineNo">1043</span>    private RemoteException createRemoteException(final ExceptionResponse e) {<a name="line.1043"></a>
-<span class="sourceLineNo">1044</span>      String innerExceptionClassName = e.getExceptionClassName();<a name="line.1044"></a>
-<span class="sourceLineNo">1045</span>      boolean doNotRetry = e.getDoNotRetry();<a name="line.1045"></a>
-<span class="sourceLineNo">1046</span>      return e.hasHostname()?<a name="line.1046"></a>
-<span class="sourceLineNo">1047</span>        // If a hostname then add it to the RemoteWithExtrasException<a name="line.1047"></a>
-<span class="sourceLineNo">1048</span>        new RemoteWithExtrasException(innerExceptionClassName,<a name="line.1048"></a>
-<span class="sourceLineNo">1049</span>          e.getStackTrace(), e.getHostname(), e.getPort(), doNotRetry):<a name="line.1049"></a>
+<span class="sourceLineNo">908</span>      setupIOstreams();<a name="line.908"></a>
+<span class="sourceLineNo">909</span><a name="line.909"></a>
+<span class="sourceLineNo">910</span>      // Now we're going to write the call. We take the lock, then check that the connection<a name="line.910"></a>
+<span class="sourceLineNo">911</span>      //  is still valid, and, if so we do the write to the socket. If the write fails, we don't<a name="line.911"></a>
+<span class="sourceLineNo">912</span>      //  know where we stand, we have to close the connection.<a name="line.912"></a>
+<span class="sourceLineNo">913</span>      checkIsOpen();<a name="line.913"></a>
+<span class="sourceLineNo">914</span>      IOException writeException = null;<a name="line.914"></a>
+<span class="sourceLineNo">915</span>      synchronized (this.outLock) {<a name="line.915"></a>
+<span class="sourceLineNo">916</span>        if (Thread.interrupted()) throw new InterruptedIOException();<a name="line.916"></a>
+<span class="sourceLineNo">917</span><a name="line.917"></a>
+<span class="sourceLineNo">918</span>        calls.put(call.id, call); // We put first as we don't want the connection to become idle.<a name="line.918"></a>
+<span class="sourceLineNo">919</span>        checkIsOpen(); // Now we're checking that it didn't became idle in between.<a name="line.919"></a>
+<span class="sourceLineNo">920</span><a name="line.920"></a>
+<span class="sourceLineNo">921</span>        try {<a name="line.921"></a>
+<span class="sourceLineNo">922</span>          call.callStats.setRequestSizeBytes(IPCUtil.write(this.out, header, call.param,<a name="line.922"></a>
+<span class="sourceLineNo">923</span>              cellBlock));<a name="line.923"></a>
+<span class="sourceLineNo">924</span>        } catch (IOException e) {<a name="line.924"></a>
+<span class="sourceLineNo">925</span>          // We set the value inside the synchronized block, this way the next in line<a name="line.925"></a>
+<span class="sourceLineNo">926</span>          //  won't even try to write. Otherwise we might miss a call in the calls map?<a name="line.926"></a>
+<span class="sourceLineNo">927</span>          shouldCloseConnection.set(true);<a name="line.927"></a>
+<span class="sourceLineNo">928</span>          writeException = e;<a name="line.928"></a>
+<span class="sourceLineNo">929</span>          interrupt();<a name="line.929"></a>
+<span class="sourceLineNo">930</span>        }<a name="line.930"></a>
+<span class="sourceLineNo">931</span>      }<a name="line.931"></a>
+<span class="sourceLineNo">932</span><a name="line.932"></a>
+<span class="sourceLineNo">933</span>      // call close outside of the synchronized (outLock) to prevent deadlock - HBASE-14474<a name="line.933"></a>
+<span class="sourceLineNo">934</span>      if (writeException != null) {<a name="line.934"></a>
+<span class="sourceLineNo">935</span>        markClosed(writeException);<a name="line.935"></a>
+<span class="sourceLineNo">936</span>        close();<a name="line.936"></a>
+<span class="sourceLineNo">937</span>      }<a name="line.937"></a>
+<span class="sourceLineNo">938</span><a name="line.938"></a>
+<span class="sourceLineNo">939</span>      // We added a call, and may be started the connection close. In both cases, we<a name="line.939"></a>
+<span class="sourceLineNo">940</span>      //  need to notify the reader.<a name="line.940"></a>
+<span class="sourceLineNo">941</span>      doNotify();<a name="line.941"></a>
+<span class="sourceLineNo">942</span><a name="line.942"></a>
+<span class="sourceLineNo">943</span>      // Now that we notified, we can rethrow the exception if any. Otherwise we're good.<a name="line.943"></a>
+<span class="sourceLineNo">944</span>      if (writeException != null) throw writeException;<a name="line.944"></a>
+<span class="sourceLineNo">945</span>    }<a name="line.945"></a>
+<span class="sourceLineNo">946</span><a name="line.946"></a>
+<span class="sourceLineNo">947</span>    @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="NN_NAKED_NOTIFY",<a name="line.947"></a>
+<span class="sourceLineNo">948</span>        justification="Presume notifyAll is because we are closing/shutting down")<a name="line.948"></a>
+<span class="sourceLineNo">949</span>    private synchronized void doNotify() {<a name="line.949"></a>
+<span class="sourceLineNo">950</span>      // Make a separate method so can do synchronize and add findbugs annotation; only one<a name="line.950"></a>
+<span class="sourceLineNo">951</span>      // annotation at at time in source 1.7.<a name="line.951"></a>
+<span class="sourceLineNo">952</span>      notifyAll(); // Findbugs: NN_NAKED_NOTIFY<a name="line.952"></a>
+<span class="sourceLineNo">953</span>    }<a name="line.953"></a>
+<span class="sourceLineNo">954</span><a name="line.954"></a>
+<span class="sourceLineNo">955</span>    /* Receive a response.<a name="line.955"></a>
+<span class="sourceLineNo">956</span>     * Because only one receiver, so no synchronization on in.<a name="line.956"></a>
+<span class="sourceLineNo">957</span>     */<a name="line.957"></a>
+<span class="sourceLineNo">958</span>    protected void readResponse() {<a name="line.958"></a>
+<span class="sourceLineNo">959</span>      if (shouldCloseConnection.get()) return;<a name="line.959"></a>
+<span class="sourceLineNo">960</span>      Call call = null;<a name="line.960"></a>
+<span class="sourceLineNo">961</span>      boolean expectedCall = false;<a name="line.961"></a>
+<span class="sourceLineNo">962</span>      try {<a name="line.962"></a>
+<span class="sourceLineNo">963</span>        // See HBaseServer.Call.setResponse for where we write out the response.<a name="line.963"></a>
+<span class="sourceLineNo">964</span>        // Total size of the response.  Unused.  But have to read it in anyways.<a name="line.964"></a>
+<span class="sourceLineNo">965</span>        int totalSize = in.readInt();<a name="line.965"></a>
+<span class="sourceLineNo">966</span><a name="line.966"></a>
+<span class="sourceLineNo">967</span>        // Read the header<a name="line.967"></a>
+<span class="sourceLineNo">968</span>        ResponseHeader responseHeader = ResponseHeader.parseDelimitedFrom(in);<a name="line.968"></a>
+<span class="sourceLineNo">969</span>        int id = responseHeader.getCallId();<a name="line.969"></a>
+<span class="sourceLineNo">970</span>        call = calls.remove(id); // call.done have to be set before leaving this method<a name="line.970"></a>
+<span class="sourceLineNo">971</span>        expectedCall = (call != null &amp;&amp; !call.done);<a name="line.971"></a>
+<span class="sourceLineNo">972</span>        if (!expectedCall) {<a name="line.972"></a>
+<span class="sourceLineNo">973</span>          // So we got a response for which we have no corresponding 'call' here on the client-side.<a name="line.973"></a>
+<span class="sourceLineNo">974</span>          // We probably timed out waiting, cleaned up all references, and now the server decides<a name="line.974"></a>
+<span class="sourceLineNo">975</span>          // to return a response.  There is nothing we can do w/ the response at this stage. Clean<a name="line.975"></a>
+<span class="sourceLineNo">976</span>          // out the wire of the response so its out of the way and we can get other responses on<a name="line.976"></a>
+<span class="sourceLineNo">977</span>          // this connection.<a name="line.977"></a>
+<span class="sourceLineNo">978</span>          int readSoFar = IPCUtil.getTotalSizeWhenWrittenDelimited(responseHeader);<a name="line.978"></a>
+<span class="sourceLineNo">979</span>          int whatIsLeftToRead = totalSize - readSoFar;<a name="line.979"></a>
+<span class="sourceLineNo">980</span>          IOUtils.skipFully(in, whatIsLeftToRead);<a name="line.980"></a>
+<span class="sourceLineNo">981</span>          if (call != null) {<a name="line.981"></a>
+<span class="sourceLineNo">982</span>            call.callStats.setResponseSizeBytes(totalSize);<a name="line.982"></a>
+<span class="sourceLineNo">983</span>            call.callStats.setCallTimeMs(<a name="line.983"></a>
+<span class="sourceLineNo">984</span>                EnvironmentEdgeManager.currentTime() - call.callStats.getStartTime());<a name="line.984"></a>
+<span class="sourceLineNo">985</span>          }<a name="line.985"></a>
+<span class="sourceLineNo">986</span>          return;<a name="line.986"></a>
+<span class="sourceLineNo">987</span>        }<a name="line.987"></a>
+<span class="sourceLineNo">988</span>        if (responseHeader.hasException()) {<a name="line.988"></a>
+<span class="sourceLineNo">989</span>          ExceptionResponse exceptionResponse = responseHeader.getException();<a name="line.989"></a>
+<span class="sourceLineNo">990</span>          RemoteException re = createRemoteException(exceptionResponse);<a name="line.990"></a>
+<span class="sourceLineNo">991</span>          call.setException(re);<a name="line.991"></a>
+<span class="sourceLineNo">992</span>          call.callStats.setResponseSizeBytes(totalSize);<a name="line.992"></a>
+<span class="sourceLineNo">993</span>          call.callStats.setCallTimeMs(<a name="line.993"></a>
+<span class="sourceLineNo">994</span>              EnvironmentEdgeManager.currentTime() - call.callStats.getStartTime());<a name="line.994"></a>
+<span class="sourceLineNo">995</span>          if (isFatalConnectionException(exceptionResponse)) {<a name="line.995"></a>
+<span class="sourceLineNo">996</span>            markClosed(re);<a name="line.996"></a>
+<span class="sourceLineNo">997</span>          }<a name="line.997"></a>
+<span class="sourceLineNo">998</span>        } else {<a name="line.998"></a>
+<span class="sourceLineNo">999</span>          Message value = null;<a name="line.999"></a>
+<span class="sourceLineNo">1000</span>          if (call.responseDefaultType != null) {<a name="line.1000"></a>
+<span class="sourceLineNo">1001</span>            Builder builder = call.responseDefaultType.newBuilderForType();<a name="line.1001"></a>
+<span class="sourceLineNo">1002</span>            ProtobufUtil.mergeDelimitedFrom(builder, in);<a name="line.1002"></a>
+<span class="sourceLineNo">1003</span>            value = builder.build();<a name="line.1003"></a>
+<span class="sourceLineNo">1004</span>          }<a name="line.1004"></a>
+<span class="sourceLineNo">1005</span>          CellScanner cellBlockScanner = null;<a name="line.1005"></a>
+<span class="sourceLineNo">1006</span>          if (responseHeader.hasCellBlockMeta()) {<a name="line.1006"></a>
+<span class="sourceLineNo">1007</span>            int size = responseHeader.getCellBlockMeta().getLength();<a name="line.1007"></a>
+<span class="sourceLineNo">1008</span>            byte [] cellBlock = new byte[size];<a name="line.1008"></a>
+<span class="sourceLineNo">1009</span>            IOUtils.readFully(this.in, cellBlock, 0, cellBlock.length);<a name="line.1009"></a>
+<span class="sourceLineNo">1010</span>            cellBlockScanner = ipcUtil.createCellScanner(this.codec, this.compressor, cellBlock);<a name="line.1010"></a>
+<span class="sourceLineNo">1011</span>          }<a name="line.1011"></a>
+<span class="sourceLineNo">1012</span>          call.setResponse(value, cellBlockScanner);<a name="line.1012"></a>
+<span class="sourceLineNo">1013</span>          call.callStats.setResponseSizeBytes(totalSize);<a name="line.1013"></a>
+<span class="sourceLineNo">1014</span>          call.callStats.setCallTimeMs(<a name="line.1014"></a>
+<span class="sourceLineNo">1015</span>              EnvironmentEdgeManager.currentTime() - call.callStats.getStartTime());<a name="line.1015"></a>
+<span class="sourceLineNo">1016</span>        }<a name="line.1016"></a>
+<span class="sourceLineNo">1017</span>      } catch (IOException e) {<a name="line.1017"></a>
+<span class="sourceLineNo">1018</span>        if (expectedCall) call.setException(e);<a name="line.1018"></a>
+<span class="sourceLineNo">1019</span>        if (e instanceof SocketTimeoutException) {<a name="line.1019"></a>
+<span class="sourceLineNo">1020</span>          // Clean up open calls but don't treat this as a fatal condition,<a name="line.1020"></a>
+<span class="sourceLineNo">1021</span>          // since we expect certain responses to not make it by the specified<a name="line.1021"></a>
+<span class="sourceLineNo">1022</span>          // {@link ConnectionId#rpcTimeout}.<a name="line.1022"></a>
+<span class="sourceLineNo">1023</span>          if (LOG.isTraceEnabled()) LOG.trace("ignored", e);<a name="line.1023"></a>
+<span class="sourceLineNo">1024</span>        } else {<a name="line.1024"></a>
+<span class="sourceLineNo">1025</span>          // Treat this as a fatal condition and close this connection<a name="line.1025"></a>
+<span class="sourceLineNo">1026</span>          markClosed(e);<a name="line.1026"></a>
+<span class="sourceLineNo">1027</span>        }<a name="line.1027"></a>
+<span class="sourceLineNo">1028</span>      } finally {<a name="line.1028"></a>
+<span class="sourceLineNo">1029</span>        cleanupCalls(false);<a name="line.1029"></a>
+<span class="sourceLineNo">1030</span>      }<a name="line.1030"></a>
+<span class="sourceLineNo">1031</span>    }<a name="line.1031"></a>
+<span class="sourceLineNo">1032</span><a name="line.1032"></a>
+<span class="sourceLineNo">1033</span>    /**<a name="line.1033"></a>
+<span class="sourceLineNo">1034</span>     * @return True if the exception is a fatal connection exception.<a name="line.1034"></a>
+<span class="sourceLineNo">1035</span>     */<a name="line.1035"></a>
+<span class="sourceLineNo">1036</span>    private boolean isFatalConnectionException(final ExceptionResponse e) {<a name="line.1036"></a>
+<span class="sourceLineNo">1037</span>      return e.getExceptionClassName().<a name="line.1037"></a>
+<span class="sourceLineNo">1038</span>        equals(FatalConnectionException.class.getName());<a name="line.1038"></a>
+<span class="sourceLineNo">1039</span>    }<a name="line.1039"></a>
+<span class="sourceLineNo">1040</span><a name="line.1040"></a>
+<span class="sourceLineNo">1041</span>    /**<a name="line.1041"></a>
+<span class="sourceLineNo">1042</span>     * @param e exception to be wrapped<a name="line.1042"></a>
+<span class="sourceLineNo">1043</span>     * @return RemoteException made from passed &lt;code&gt;e&lt;/code&gt;<a name="line.1043"></a>
+<span class="sourceLineNo">1044</span>     */<a name="line.1044"></a>
+<span class="sourceLineNo">1045</span>    private RemoteException createRemoteException(final ExceptionResponse e) {<a name="line.1045"></a>
+<span class="sourceLineNo">1046</span>      String innerExceptionClassName = e.getExceptionClassName();<a name="line.1046"></a>
+<span class="sourceLineNo">1047</span>      boolean doNotRetry = e.getDoNotRetry();<a name="line.1047"></a>
+<span class="sourceLineNo">1048</span>      return e.hasHostname()?<a name="line.1048"></a>
+<span class="sourceLineNo">1049</span>        // If a hostname then add it to the RemoteWithExtrasException<a name="line.1049"></a>
 <span class="sourceLineNo">1050</span>        new RemoteWithExtrasException(innerExceptionClassName,<a name="line.1050"></a>
-<span class="sourceLineNo">1051</span>          e.getStackTrace(), doNotRetry);<a name="line.1051"></a>
-<span class="sourceLineNo">1052</span>    }<a name="line.1052"></a>
-<span class="sourceLineNo">1053</span><a name="line.1053"></a>
-<span class="sourceLineNo">1054</span>    protected synchronized boolean markClosed(IOException e) {<a name="line.1054"></a>
-<span class="sourceLineNo">1055</span>      if (e == null) throw new NullPointerException();<a name="line.1055"></a>
-<span class="sourceLineNo">1056</span><a name="line.1056"></a>
-<span class="sourceLineNo">1057</span>      boolean ret = shouldCloseConnection.compareAndSet(false, true);<a name="line.1057"></a>
-<span class="sourceLineNo">1058</span>      if (ret) {<a name="line.1058"></a>
-<span class="sourceLineNo">1059</span>        if (LOG.isTraceEnabled()) {<a name="line.1059"></a>
-<span class="sourceLineNo">1060</span>          LOG.trace(getName() + ": marking at should close, reason: " + e.getMessage());<a name="line.1060"></a>
-<span class="sourceLineNo">1061</span>        }<a name="line.1061"></a>
-<span class="sourceLineNo">1062</span>        if (callSender != null) {<a name="line.1062"></a>
-<span class="sourceLineNo">1063</span>          callSender.close();<a name="line.1063"></a>
-<span class="sourceLineNo">1064</span>        }<a name="line.1064"></a>
-<span class="sourceLineNo">1065</span>        notifyAll();<a name="line.1065"></a>
-<span class="sourceLineNo">1066</span>      }<a name="line.1066"></a>
-<span class="sourceLineNo">1067</span>      return ret;<a name="line.1067"></a>
-<span class="sourceLineNo">1068</span>    }<a name="line.1068"></a>
-<span class="sourceLineNo">1069</span><a name="line.1069"></a>
-<span class="sourceLineNo">1070</span><a name="line.1070"></a>
-<span class="sourceLineNo">1071</span>    /**<a name="line.1071"></a>
-<span class="sourceLineNo">1072</span>     * Cleanup the calls older than a given timeout, in milli seconds.<a name="line.1072"></a>
-<span class="sourceLineNo">1073</span>     * @param allCalls true for all calls, false for only the calls in timeout<a name="line.1073"></a>
-<span class="sourceLineNo">1074</span>     */<a name="line.1074"></a>
-<span class="sourceLineNo">1075</span>    protected synchronized void cleanupCalls(boolean allCalls) {<a name="line.1075"></a>
-<span class="sourceLineNo">1076</span>      Iterator&lt;Entry&lt;Integer, Call&gt;&gt; itor = calls.entrySet().iterator();<a name="line.1076"></a>
-<span class="sourceLineNo">1077</span>      while (itor.hasNext()) {<a name="line.1077"></a>
-<span class="sourceLineNo">1078</span>        Call c = itor.next().getValue();<a name="line.1078"></a>
-<span class="sourceLineNo">1079</span>        if (c.done) {<a name="line.1079"></a>
-<span class="sourceLineNo">1080</span>          // To catch the calls without timeout that were cancelled.<a name="line.1080"></a>
-<span class="sourceLineNo">1081</span>          itor.remove();<a name="line.1081"></a>
-<span class="sourceLineNo">1082</span>        } else if (allCalls) {<a name="line.1082"></a>
-<span class="sourceLineNo">1083</span>          long waitTime = EnvironmentEdgeManager.currentTime() - c.getStartTime();<a name="line.1083"></a>
-<span class="sourceLineNo">1084</span>          IOException ie = new ConnectionClosingException("Connection to " + getRemoteAddress()<a name="line.1084"></a>
-<span class="sourceLineNo">1085</span>              + " is closing. Call id=" + c.id + ", waitTime=" + waitTime);<a name="line.1085"></a>
-<span class="sourceLineNo">1086</span>          c.setException(ie);<a name="line.1086"></a>
-<span class="sourceLineNo">1087</span>          itor.remove();<a name="line.1087"></a>
-<span class="sourceLineNo">1088</span>        } else if (c.checkAndSetTimeout()) {<a name="line.1088"></a>
+<span class="sourceLineNo">1051</span>          e.getStackTrace(), e.getHostname(), e.getPort(), doNotRetry):<a name="line.1051"></a>
+<span class="sourceLineNo">1052</span>        new RemoteWithExtrasException(innerExceptionClassName,<a name="line.1052"></a>
+<span class="sourceLineNo">1053</span>          e.getStackTrace(), doNotRetry);<a name="line.1053"></a>
+<span class="sourceLineNo">1054</span>    }<a name="line.1054"></a>
+<span class="sourceLineNo">1055</span><a name="line.1055"></a>
+<span class="sourceLineNo">1056</span>    protected synchronized boolean markClosed(IOException e) {<a name="line.1056"></a>
+<span class="sourceLineNo">1057</span>      if (e == null) throw new NullPointerException();<a name="line.1057"></a>
+<span class="sourceLineNo">1058</span><a name="line.1058"></a>
+<span class="sourceLineNo">1059</span>      boolean ret = shouldCloseConnection.compareAndSet(false, true);<a name="line.1059"></a>
+<span class="sourceLineNo">1060</span>      if (ret) {<a name="line.1060"></a>
+<span class="sourceLineNo">1061</span>        if (LOG.isTraceEnabled()) {<a name="line.1061"></a>
+<span class="sourceLineNo">1062</span>          LOG.trace(getName() + ": marking at should close, reason: " + e.getMessage());<a name="line.1062"></a>
+<span class="sourceLineNo">1063</span>        }<a name="line.1063"></a>
+<span class="sourceLineNo">1064</span>        if (callSender != null) {<a name="line.1064"></a>
+<span class="sourceLineNo">1065</span>          callSender.close();<a name="line.1065"></a>
+<span class="sourceLineNo">1066</span>        }<a name="line.1066"></a>
+<span class="sourceLineNo">1067</span>        notifyAll();<a name="line.1067"></a>
+<span class="sourceLineNo">1068</span>      }<a name="line.1068"></a>
+<span class="sourceLineNo">1069</span>      return ret;<a name="line.1069"></a>
+<span class="sourceLineNo">1070</span>    }<a name="line.1070"></a>
+<span class="sourceLineNo">1071</span><a name="line.1071"></a>
+<span class="sourceLineNo">1072</span><a name="line.1072"></a>
+<span class="sourceLineNo">1073</span>    /**<a name="line.1073"></a>
+<span class="sourceLineNo">1074</span>     * Cleanup the calls older than a given timeout, in milli seconds.<a name="line.1074"></a>
+<span class="sourceLineNo">1075</span>     * @param allCalls true for all calls, false for only the calls in timeout<a name="line.1075"></a>
+<span class="sourceLineNo">1076</span>     */<a name="line.1076"></a>
+<span class="sourceLineNo">1077</span>    protected synchronized void cleanupCalls(boolean allCalls) {<a name="line.1077"></a>
+<span class="sourceLineNo">1078</span>      Iterator&lt;Entry&lt;Integer, Call&gt;&gt; itor = calls.entrySet().iterator();<a name="line.1078"></a>
+<span class="sourceLineNo">1079</span>      while (itor.hasNext()) {<a name="line.1079"></a>
+<span class="sourceLineNo">1080</span>        Call c = itor.next().getValue();<a name="line.1080"></a>
+<span class="sourceLineNo">1081</span>        if (c.done) {<a name="line.1081"></a>
+<span class="sourceLineNo">1082</span>          // To catch the calls without timeout that were cancelled.<a name="line.1082"></a>
+<span class="sourceLineNo">1083</span>          itor.remove();<a name="line.1083"></a>
+<span class="sourceLineNo">1084</span>        } else if (allCalls) {<a name="line.1084"></a>
+<span class="sourceLineNo">1085</span>          long waitTime = EnvironmentEdgeManager.currentTime() - c.getStartTime();<a name="line.1085"></a>
+<span class="sourceLineNo">1086</span>          IOException ie = new ConnectionClosingException("Connection to " + getRemoteAddress()<a name="line.1086"></a>
+<span class="sourceLineNo">1087</span>              + " is closing. Call id=" + c.id + ", waitTime=" + waitTime);<a name="line.1087"></a>
+<span class="sourceLineNo">1088</span>          c.setException(ie);<a name="line.1088"></a>
 <span class="sourceLineNo">1089</span>          itor.remove();<a name="line.1089"></a>
-<span class="sourceLineNo">1090</span>        } else {<a name="line.1090"></a>
-<span class="sourceLineNo">1091</span>          // We expect the call to be ordered by timeout. It may not be the case, but stopping<a name="line.1091"></a>
-<span class="sourceLineNo">1092</span>          //  at the first valid call allows to be sure that we still have something to do without<a name="line.1092"></a>
-<span class="sourceLineNo">1093</span>          //  spending too much time by reading the full list.<a name="line.1093"></a>
-<span class="sourceLineNo">1094</span>          break;<a name="line.1094"></a>
-<span class="sourceLineNo">1095</span>        }<a name="line.1095"></a>
-<span class="sourceLineNo">1096</span>      }<a name="line.1096"></a>
-<span class="sourceLineNo">1097</span>    }<a name="line.1097"></a>
-<span class="sourceLineNo">1098</span>  }<a name="line.1098"></a>
-<span class="sourceLineNo">1099</span><a name="line.1099"></a>
-<span class="sourceLineNo">1100</span>  /**<a name="line.1100"></a>
-<span class="sourceLineNo">1101</span>   * Used in test only. Construct an IPC cluster client whose values are of the<a name="line.1101"></a>
-<span class="sourceLineNo">1102</span>   * {@link Message} class.<a name="line.1102"></a>
-<span class="sourceLineNo">1103</span>   * @param conf configuration<a name="line.1103"></a>
-<span class="sourceLineNo">1104</span>   * @param clusterId the cluster id<a name="line.1104"></a>
-<span class="sourceLineNo">1105</span>   * @param factory socket factory<a name="line.1105"></a>
-<span class="sourceLineNo">1106</span>   */<a name="line.1106"></a>
-<span class="sourceLineNo">1107</span>  @VisibleForTesting<a name="line.1107"></a>
-<span class="sourceLineNo">1108</span>  RpcClientImpl(Configuration conf, String clusterId, SocketFactory factory) {<a name="line.1108"></a>
-<span class="sourceLineNo">1109</span>    this(conf, clusterId, factory, null, null);<a name="line.1109"></a>
-<span class="sourceLineNo">1110</span>  }<a name="line.1110"></a>
-<span class="sourceLineNo">1111</span><a name="line.1111"></a>
-<span class="sourceLineNo">1112</span>  /**<a name="line.1112"></a>
-<span class="sourceLineNo">1113</span>   * Construct an IPC cluster client whose values are of the {@link Message} class.<a name="line.1113"></a>
-<span class="sourceLineNo">1114</span>   * @param conf configuration<a name="line.1114"></a>
-<span class="sourceLineNo">1115</span>   * @param clusterId the cluster id<a name="line.1115"></a>
-<span class="sourceLineNo">1116</span>   * @param factory socket factory<a name="line.1116"></a>
-<span class="sourceLineNo">1117</span>   * @param localAddr client socket bind address<a name="line.1117"></a>
-<span class="sourceLineNo">1118</span>   * @param metrics the connection metrics<a name="line.1118"></a>
-<span class="sourceLineNo">1119</span>   */<a name="line.1119"></a>
-<span class="sourceLineNo">1120</span>  RpcClientImpl(Configuration conf, String clusterId, SocketFactory factory,<a name="line.1120"></a>
-<span class="sourceLineNo">1121</span>      SocketAddress localAddr, MetricsConnection metrics) {<a name="line.1121"></a>
-<span class="sourceLineNo">1122</span>    super(conf, clusterId, localAddr, metrics);<a name="line.1122"></a>
-<span class="sourceLineNo">1123</span><a name="line.1123"></a>
-<span class="sourceLineNo">1124</span>    this.socketFactory = factory;<a name="line.1124"></a>
-<span class="sourceLineNo">1125</span>    this.connections = new PoolMap&lt;ConnectionId, Connection&gt;(getPoolType(conf), getPoolSize(conf));<a name="line.1125"></a>
-<span class="sourceLineNo">1126</span>    this.failedServers = new FailedServers(conf);<a name="line.1126"></a>
-<span class="sourceLineNo">1127</span>  }<a name="line.1127"></a>
-<span class="sourceLineNo">1128</span><a name="line.1128"></a>
-<span class="sourceLineNo">1129</span>  /**<a name="line.1129"></a>
-<span class="sourceLineNo">1130</span>   * Used in test only. Construct an IPC client for the cluster {@code clusterId} with<a name="line.1130"></a>
-<span class="sourceLineNo">1131</span>   * the default SocketFactory<a name="line.1131"></a>
-<span class="sourceLineNo">1132</span>   */<a name="line.1132"></a>
-<span class="sourceLineNo">1133</span>  @VisibleForTesting<a name="line.1133"></a>
-<span class="sourceLineNo">1134</span>  RpcClientImpl(Configuration conf, String clusterId) {<a name="line.1134"></a>
-<span class="sourceLineNo">1135</span>    this(conf, clusterId, NetUtils.getDefaultSocketFactory(conf), null, null);<a name="line.1135"></a>
-<span class="sourceLineNo">1136</span>  }<a name="line.1136"></a>
-<span class="sourceLineNo">1137</span><a name="line.1137"></a>
-<span class="sourceLineNo">1138</span>  /**<a name="line.1138"></a>
-<span class="sourceLineNo">1139</span>   * Construct an IPC client for the cluster {@code clusterId} with the default SocketFactory<a name="line.1139"></a>
-<span class="sourceLineNo">1140</span>   *<a name="line.1140"></a>
-<span class="sourceLineNo">1141</span>   * This method is called with reflection by the RpcClientFactory to create an instance<a name="line.1141"></a>
+<span class="sourceLineNo">1090</span>        } else if (c.checkAndSetTimeout()) {<a name="line.1090"></a>
+<span class="sourceLineNo">1091</span>          itor.remove();<a name="line.1091"></a>
+<span class="sourceLineNo">1092</span>        } else {<a name="line.1092"></a>
+<span class="sourceLineNo">1093</span>          // We expect the call to be ordered by timeout. It may not be the case, but stopping<a name="line.1093"></a>
+<span class="sourceLineNo">1094</span>          //  at the first valid call allows to be sure that we still have something to do without<a name="line.1094"></a>
+<span class="sourceLineNo">1095</span>          //  spending too much time by reading the full list.<a name="line.1095"></a>
+<span class="sourceLineNo">1096</span>          break;<a name="line.1096"></a>
+<span class="sourceLineNo">1097</span>        }<a name="line.1097"></a>
+<span class="sourceLineNo">1098</span>      }<a name="line.1098"></a>
+<span class="sourceLineNo">1099</span>    }<a name="line.1099"></a>
+<span class="sourceLineNo">1100</span>  }<a name="line.1100"></a>
+<span class="sourceLineNo">1101</span><a name="line.1101"></a>
+<span class="sourceLineNo">1102</span>  /**<a name="line.1102"></a>
+<span class="sourceLineNo">1103</span>   * Used in test only. Construct an IPC cluster client whose values are of the<a name="line.1103"></a>
+<span class="sourceLineNo">1104</span>   * {@link Message} class.<a name="line.1104"></a>
+<span class="sourceLineNo">1105</span>   * @param conf configuration<a name="line.1105"></a>
+<span class="sourceLineNo">1106</span>   * @param clusterId the cluster id<a name="line.1106"></a>
+<span class="sourceLineNo">1107</span>   * @param factory socket factory<a name="line.1107"></a>
+<span class="sourceLineNo">1108</span>   */<a name="line.1108"></a>
+<span class="sourceLineNo">1109</span>  @VisibleForTesting<a name="line.1109"></a>
+<span class="sourceLineNo">1110</span>  RpcClientImpl(Configuration conf, String clusterId, SocketFactory factory) {<a name="line.1110"></a>
+<span class="sourceLineNo">1111</span>    this(conf, clusterId, factory, null, null);<a name="line.1111"></a>
+<span class="sourceLineNo">1112</span>  }<a name="line.1112"></a>
+<span class="sourceLineNo">1113</span><a name="line.1113"></a>
+<span class="sourceLineNo">1114</span>  /**<a name="line.1114"></a>
+<span class="sourceLineNo">1115</span>   * Construct an IPC cluster client whose values are of the {@link Message} class.<a name="line.1115"></a>
+<span class="sourceLineNo">1116</span>   * @param conf configuration<a name="line.1116"></a>
+<span class="sourceLineNo">1117</span>   * @param clusterId the cluster id<a name="line.1117"></a>
+<span class="sourceLineNo">1118</span>   * @param factory socket factory<a name="line.1118"></a>
+<span class="sourceLineNo">1119</span>   * @param localAddr client socket bind address<a name="line.1119"></a>
+<span class="sourceLineNo">1120</span>   * @param metrics the connection metrics<a name="line.1120"></a>
+<span class="sourceLineNo">1121</span>   */<a name="line.1121"></a>
+<span class="sourceLineNo">1122</span>  RpcClientImpl(Configuration conf, String clusterId, SocketFactory factory,<a name="line.1122"></a>
+<span class="sourceLineNo">1123</span>      SocketAddress localAddr, MetricsConnection metrics) {<a name="line.1123"></a>
+<span class="sourceLineNo">1124</span>    super(conf, clusterId, localAddr, metrics);<a name="line.1124"></a>
+<span class="sourceLineNo">1125</span><a name="line.1125"></a>
+<span class="sourceLineNo">1126</span>    this.socketFactory = factory;<a name="line.1126"></a>
+<span class="sourceLineNo">1127</span>    this.connections = new PoolMap&lt;ConnectionId, Connection&gt;(getPoolType(conf), getPoolSize(conf));<a name="line.1127"></a>
+<span class="sourceLineNo">1128</span>    this.failedServers = new FailedServers(conf);<a name="line.1128"></a>
+<span class="sourceLineNo">1129</span>  }<a name="line.1129"></a>
+<span class="sourceLineNo">1130</span><a name="line.1130"></a>
+<span class="sourceLineNo">1131</span>  /**<a name="line.1131"></a>
+<span class="sourceLineNo">1132</span>   * Used in test only. Construct an IPC client for the cluster {@code clusterId} with<a name="line.1132"></a>
+<span class="sourceLineNo">1133</span>   * the default SocketFactory<a name="line.1133"></a>
+<span class="sourceLineNo">1134</span>   */<a name="line.1134"></a>
+<span class="sourceLineNo">1135</span>  @VisibleForTesting<a name="line.1135"></a>
+<span class="sourceLineNo">1136</span>  RpcClientImpl(Configuration conf, String clusterId) {<a name="line.1136"></a>
+<span class="sourceLineNo">1137</span>    this(conf, clusterId, NetUtils.getDefaultSocketFactory(conf), null, null);<a name="line.1137"></a>
+<span class="sourceLineNo">1138</span>  }<a name="line.1138"></a>
+<span class="sourceLineNo">1139</span><a name="line.1139"></a>
+<span class="sourceLineNo">1140</span>  /**<a name="line.1140"></a>
+<span class="sourceLineNo">1141</span>   * Construct an IPC client for the cluster {@code clusterId} with the default SocketFactory<a name="line.1141"></a>
 <span class="sourceLineNo">1142</span>   *<a name="line.1142"></a>
-<span class="sourceLineNo">1143</span>   * @param conf configuration<a name="line.1143"></a>
-<span class="sourceLineNo">1144</span>   * @param clusterId the cluster id<a name="line.1144"></a>
-<span class="sourceLineNo">1145</span>   * @param localAddr client socket bind address.<a name="line.1145"></a>
-<span class="sourceLineNo">1146</span>   * @param metrics the connection metrics<a name="line.1146"></a>
-<span class="sourceLineNo">1147</span>   */<a name="line.1147"></a>
-<span class="sourceLineNo">1148</span>  public RpcClientImpl(Configuration conf, String clusterId, SocketAddress localAddr,<a name="line.1148"></a>
-<span class="sourceLineNo">1149</span>      MetricsConnection metrics) {<a name="line.1149"></a>
-<span class="sourceLineNo">1150</span>    this(conf, clusterId, NetUtils.getDefaultSocketFactory(conf), localAddr, metrics);<a name="line.1150"></a>
-<span class="sourceLineNo">1151</span>  }<a name="line.1151"></a>
-<span class="sourceLineNo">1152</span><a name="line.1152"></a>
-<span class="sourceLineNo">1153</span>  /** Stop all threads related to this client.  No further calls may be made<a name="line.1153"></a>
-<span class="sourceLineNo">1154</span>   * using this client. */<a name="line.1154"></a>
-<span class="sourceLineNo">1155</span>  @Override<a name="line.1155"></a>
-<span class="sourceLineNo">1156</span>  public void close() {<a name="line.1156"></a>
-<span class="sourceLineNo">1157</span>    if (LOG.isDebugEnabled()) LOG.debug("Stopping rpc client");<a name="line.1157"></a>
-<span class="sourceLineNo">1158</span>    if (!running.compareAndSet(true, false)) return;<a name="line.1158"></a>
-<span class="sourceLineNo">1159</span><a name="line.1159"></a>
-<span class="sourceLineNo">1160</span>    Set&lt;Connection&gt; connsToClose = null;<a name="line.1160"></a>
-<span class="sourceLineNo">1161</span>    // wake up all connections<a name="line.1161"></a>
-<span class="sourceLineNo">1162</span>    synchronized (connections) {<a name="line.1162"></a>
-<span class="sourceLineNo">1163</span>      for (Connection conn : connections.values()) {<a name="line.1163"></a>
-<span class="sourceLineNo">1164</span>        conn.interrupt();<a name="line.1164"></a>
-<span class="sourceLineNo">1165</span>        if (conn.callSender != null) {<a name="line.1165"></a>
-<span class="sourceLineNo">1166</span>          conn.callSender.interrupt();<a name="line.1166"></a>
-<span class="sourceLineNo">1167</span>        }<a name="line.1167"></a>
-<span class="sourceLineNo">1168</span><a name="line.1168"></a>
-<span class="sourceLineNo">1169</span>        // In case the CallSender did not setupIOStreams() yet, the Connection may not be started<a name="line.1169"></a>
-<span class="sourceLineNo">1170</span>        // at all (if CallSender has a cancelled Call it can happen). See HBASE-13851<a name="line.1170"></a>
-<span class="sourceLineNo">1171</span>        if (!conn.isAlive()) {<a name="line.1171"></a>
-<span class="sourceLineNo">1172</span>          if (connsToClose == null) {<a name="line.1172"></a>
-<span class="sourceLineNo">1173</span>            connsToClose = new HashSet&lt;Connection&gt;();<a name="line.1173"></a>
-<span class="sourceLineNo">1174</span>          }<a name="line.1174"></a>
-<span class="sourceLineNo">1175</span>          connsToClose.add(conn);<a name="line.1175"></a>
-<span class="sourceLineNo">1176</span>        }<a name="line.1176"></a>
-<span class="sourceLineNo">1177</span>      }<a name="line.1177"></a>
-<span class="sourceLineNo">1178</span>    }<a name="line.1178"></a>
-<span class="sourceLineNo">1179</span>    if (connsToClose != null) {<a name="line.1179"></a>
-<span class="sourceLineNo">1180</span>      for (Connection conn : connsToClose) {<a name="line.1180"></a>
-<span class="sourceLineNo">1181</span>        if (conn.markClosed(new InterruptedIOException("RpcClient is closing"))) {<a name="line.1181"></a>
-<span class="sourceLineNo">1182</span>          conn.close();<a name="line.1182"></a>
-<span class="sourceLineNo">1183</span>        }<a name="line.1183"></a>
-<span class="sourceLineNo">1184</span>      }<a name="line.1184"></a>
-<span class="sourceLineNo">1185</span>    }<a name="line.1185"></a>
-<span class="sourceLineNo">1186</span>    // wait until all connections are closed<a name="line.1186"></a>
-<span class="sourceLineNo">1187</span>    while (!connections.isEmpty()) {<a name="line.1187"></a>
-<span class="sourceLineNo">1188</span>      try {<a name="line.1188"></a>
-<span class="sourceLineNo">1189</span>        Thread.sleep(10);<a name="line.1189"></a>
-<span class="sourceLineNo">1190</span>      } catch (InterruptedException e) {<a name="line.1190"></a>
-<span class="sourceLineNo">1191</span>        LOG.info("Interrupted while stopping the client. We still have " + connections.size() +<a name="line.1191"></a>
-<span class="sourceLineNo">1192</span>            " connections.");<a name="line.1192"></a>
-<span class="sourceLineNo">1193</span>        Thread.currentThread().interrupt();<a name="line.1193"></a>
-<span class="sourceLineNo">1194</span>        return;<a name="line.1194"></a>
-<span class="sourceLineNo">1195</span>      }<a name="line.1195"></a>
-<span class="sourceLineNo">1196</span>    }<a name="line.1196"></a>
-<span class="sourceLineNo">1197</span>  }<a name="line.1197"></a>
-<span class="sourceLineNo">1198</span><a name="line.1198"></a>
-<span class="sourceLineNo">1199</span>  /** Make a call, passing &lt;code&gt;param&lt;/code&gt;, to the IPC server running at<a name="line.1199"></a>
-<span class="sourceLineNo">1200</span>   * &lt;code&gt;address&lt;/code&gt; which is servicing the &lt;code&gt;protocol&lt;/code&gt; protocol,<a name="line.1200"></a>
-<span class="sourceLineNo">1201</span>   * with the &lt;code&gt;ticket&lt;/code&gt; credentials, returning the value.<a name="line.1201"></a>
-<span class="sourceLineNo">1202</span>   * Throws exceptions if there are network problems or if the remote code<a name="line.1202"></a>
-<span class="sourceLineNo">1203</span>   * threw an exception.<a name="line.1203"></a>
-<span class="sourceLineNo">1204</span>   * @param ticket Be careful which ticket you pass. A new user will mean a new Connection.<a name="line.1204"></a>
-<span class="sourceLineNo">1205</span>   *          {@link UserProvider#getCurrent()} makes a new instance of User each time so will be a<a name="line.1205"></a>
-<span class="sourceLineNo">1206</span>   *          new Connection each time.<a name="line.1206"></a>
-<span class="sourceLineNo">1207</span>   * @return A pair with the Message response and the Cell data (if any).<a name="line.1207"></a>
-<span class="sourceLineNo">1208</span>   * @throws InterruptedException<a name="line.1208"></a>
-<span class="sourceLineNo">1209</span>   * @throws IOException<a name="line.1209"></a>
-<span class="sourceLineNo">1210</span>   */<a name="line.1210"></a>
-<span class="sourceLineNo">1211</span>  @Override<a name="line.1211"></a>
-<span class="sourceLineNo">1212</span>  protected Pair&lt;Message, CellScanner&gt; call(PayloadCarryingRpcController pcrc, MethodDescriptor md,<a name="line.1212"></a>
-<span class="sourceLineNo">1213</span>      Message param, Message returnType, User ticket, InetSocketAddress addr,<a name="line.1213"></a>
-<span class="sourceLineNo">1214</span>      MetricsConnection.CallStats callStats)<a name="line.1214"></a>
-<span class="sourceLineNo">1215</span>      throws IOException, InterruptedException {<a name="line.1215"></a>
-<span class="sourceLineNo">1216</span>    if (pcrc == null) {<a name="line.1216"></a>
-<span class="sourceLineNo">1217</span>      pcrc = new PayloadCarryingRpcController();<a name="line.1217"></a>
-<span class="sourceLineNo">1218</span>    }<a name="line.1218"></a>
-<span class="sourceLineNo">1219</span>    CellScanner cells = pcrc.cellScanner();<a name="line.1219"></a>
-<span class="sourceLineNo">1220</span><a name="line.1220"></a>
-<span class="sourceLineNo">1221</span>    final Call call = new Call(this.callIdCnt.getAndIncrement(), md, param, cells, returnType,<a name="line.1221"></a>
-<span class="sourceLineNo">1222</span>        pcrc.getCallTimeout(), MetricsConnection.newCallStats());<a name="line.1222"></a>
-<span class="sourceLineNo">1223</span><a name="line.1223"></a>
-<span class="sourceLineNo">1224</span>    final Connection connection = getConnection(ticket, call, addr);<a name="line.1224"></a>
+<span class="sourceLineNo">1143</span>   * This method is called with reflection by the RpcClientFactory to create an instance<a name="line.1143"></a>
+<span class="sourceLineNo">1144</span>   *<a name="line.1144"></a>
+<span class="sourceLineNo">1145</span>   * @param conf configuration<a name="line.1145"></a>
+<span class="sourceLineNo">1146</span>   * @param clusterId the cluster id<a name="line.1146"></a>
+<span class="sourceLineNo">1147</span>   * @param localAddr client socket bind address.<a name="line.1147"></a>
+<span class="sourceLineNo">1148</span>   * @param metrics the connection metrics<a name="line.1148"></a>
+<span class="sourceLineNo">1149</span>   */<a name="line.1149"></a>
+<span class="sourceLineNo">1150</span>  public RpcClientImpl(Configuration conf, String clusterId, SocketAddress localAddr,<a name="line.1150"></a>
+<span class="sourceLineNo">1151</span>      MetricsConnection metrics) {<a name="line.1151"></a>
+<span class="sourceLineNo">1152</span>    this(conf, clusterId, NetUtils.getDefaultSocketFactory(conf), localAddr, metrics);<a name="line.1152"></a>
+<span class="sourceLineNo">1153</span>  }<a name="line.1153"></a>
+<span class="sourceLineNo">1154</span><a name="line.1154"></a>
+<span class="sourceLineNo">1155</span>  /** Stop all threads related to this client.  No further calls may be made<a name="line.1155"></a>
+<span class="sourceLineNo">1156</span>   * using this client. */<a name="line.1156"></a>
+<span class="sourceLineNo">1157</span>  @Override<a name="line.1157"></a>
+<span class="sourceLineNo">1158</span>  public void close() {<a name="line.1158"></a>
+<span class="sourceLineNo">1159</span>    if (LOG.isDebugEnabled()) LOG.debug("Stopping rpc client");<a name="line.1159"></a>
+<span class="sourceLineNo">1160</span>    if (!running.compareAndSet(true, false)) return;<a name="line.1160"></a>
+<span class="sourceLineNo">1161</span><a name="line.1161"></a>
+<span class="sourceLineNo">1162</span>    Set&lt;Connection&gt; connsToClose = null;<a name="line.1162"></a>
+<span class="sourceLineNo">1163</span>    // wake up all connections<a name="line.1163"></a>
+<span class="sourceLineNo">1164</span>    synchronized (connections) {<a name="line.1164"></a>
+<span class="sourceLineNo">1165</span>      for (Connection conn : connections.values()) {<a name="line.1165"></a>
+<span class="sourceLineNo">1166</span>        conn.interrupt();<a name="line.1166"></a>
+<span class="sourceLineNo">1167</span>        if (conn.callSender != null) {<a name="line.1167"></a>
+<span class="sourceLineNo">1168</span>          conn.callSender.interrupt();<a name="line.1168"></a>
+<span class="sourceLineNo">1169</span>        }<a name="line.1169"></a>
+<span class="sourceLineNo">1170</span><a name="line.1170"></a>
+<span class="sourceLineNo">1171</span>        // In case the CallSender did not setupIOStreams() yet, the Connection may not be started<a name="line.1171"></a>
+<span class="sourceLineNo">1172</span>        // at all (if CallSender has a cancelled Call it can happen). See HBASE-13851<a name="line.1172"></a>
+<span class="sourceLineNo">1173</span>        if (!conn.isAlive()) {<a name="line.1173"></a>
+<span class="sourceLineNo">1174</span>          if (connsToClose == null) {<a name="line.1174"></a>
+<span class="sourceLineNo">1175</span>            connsToClose = new HashSet&lt;Connection&gt;();<a name="line.1175"></a>
+<span class="sourceLineNo">1176</span>          }<a name="line.1176"></a>
+<span class="sourceLineNo">1177</span>          connsToClose.add(conn);<a name="line.1177"></a>
+<span class="sourceLineNo">1178</span>        }<a name="line.1178"></a>
+<span class="sourceLineNo">1179</span>      }<a name="line.1179"></a>
+<span class="sourceLineNo">1180</span>    }<a name="line.1180"></a>
+<span class="sourceLineNo">1181</span>    if (connsToClose != null) {<a name="line.1181"></a>
+<span class="sourceLineNo">1182</span>      for (Connection conn : connsToClose) {<a name="line.1182"></a>
+<span class="sourceLineNo">1183</span>        if (conn.markClosed(new InterruptedIOException("RpcClient is closing"))) {<a name="line.1183"></a>
+<span class="sourceLineNo">1184</span>          conn.close();<a name="line.1184"></a>
+<span class="sourceLineNo">1185</span>        }<a name="line.1185"></a>
+<span class="sourceLineNo">1186</span>      }<a name="line.1186"></a>
+<span class="sourceLineNo">1187</span>    }<a name="line.1187"></a>
+<span class="sourceLineNo">1188</span>    // wait until all connections are closed<a name="line.1188"></a>
+<span class="sourceLineNo">1189</span>    while (!connections.isEmpty()) {<a name="line.1189"></a>
+<span class="sourceLineNo">1190</span>      try {<a name="line.1190"></a>
+<span class="sourceLineNo">1191</span>        Thread.sleep(10);<a name="line.1191"></a>
+<span class="sourceLineNo">1192</span>      } catch (InterruptedException e) {<a name="line.1192"></a>
+<span class="sourceLineNo">1193</span>        LOG.info("Interrupted while stopping the client. We still have " + connections.size() +<a name="line.1193"></a>
+<span class="sourceLineNo">1194</span>            " connections.");<a name="line.1194"></a>
+<span class="sourceLineNo">1195</span>        Thread.currentThread().interrupt();<a name="line.1195"></a>
+<span class="sourceLineNo">1196</span>        return;<a name="line.1196"></a>
+<span class="sourceLineNo">1197</span>      }<a name="line.1197"></a>
+<span class="sourceLineNo">1198</span>    }<a name="line.1198"></a>
+<span class="sourceLineNo">1199</span>  }<a name="line.1199"></a>
+<span class="sourceLineNo">1200</span><a name="line.1200"></a>
+<span class="sourceLineNo">1201</span>  /** Make a call, passing &lt;code&gt;param&lt;/code&gt;, to the IPC server running at<a name="line.1201"></a>
+<span class="sourceLineNo">1202</span>   * &lt;code&gt;address&lt;/code&gt; which is servicing the &lt;code&gt;protocol&lt;/code&gt; protocol,<a name="line.1202"></a>
+<span class="sourceLineNo">1203</span>   * with the &lt;code&gt;ticket&lt;/code&gt; credentials, returning the value.<a name="line.1203"></a>
+<span class="sourceLineNo">1204</span>   * Throws exceptions if there are network problems or if the remote code<a name="line.1204"></a>
+<span class="sourceLineNo">1205</span>   * threw an exception.<a name="line.1205"></a>
+<span class="sourceLineNo">1206</span>   * @param ticket Be careful which ticket you pass. A new user will mean a new Connection.<a name="line.1206"></a>
+<span class="sourceLineNo">1207</span>   *          {@link UserProvider#getCurrent()} makes a new instance of User each time so will be a<a name="line.1207"></a>
+<span class="sourceLineNo">1208</span>   *          new Connection each time.<a name="line.1208"></a>
+<span class="sourceLineNo">1209</span>   * @return A pair with the Message response and the Cell data (if any).<a name="line.1209"></a>
+<span class="sourceLineNo">1210</span>   * @throws InterruptedException<a name="line.1210"></a>
+<span class="sourceLineNo">1211</span>   * @throws IOException<a name="line.1211"></a>
+<span class="sourceLineNo">1212</span>   */<a name="line.1212"></a>
+<span class="sourceLineNo">1213</span>  @Override<a name="line.1213"></a>
+<span class="sourceLineNo">1214</span>  protected Pair&lt;Message, CellScanner&gt; call(PayloadCarryingRpcController pcrc, MethodDescriptor md,<a name="line.1214"></a>
+<span class="sourceLineNo">1215</span>      Message param, Message returnType, User ticket, InetSocketAddress addr,<a name="line.1215"></a>
+<span class="sourceLineNo">1216</span>      MetricsConnection.CallStats callStats)<a name="line.1216"></a>
+<span class="sourceLineNo">1217</span>      throws IOException, InterruptedException {<a name="line.1217"></a>
+<span class="sourceLineNo">1218</span>    if (pcrc == null) {<a name="line.1218"></a>
+<span class="sourceLineNo">1219</span>      pcrc = new PayloadCarryingRpcController();<a name="line.1219"></a>
+<span class="sourceLineNo">1220</span>    }<a name="line.1220"></a>
+<span class="sourceLineNo">1221</span>    CellScanner cells = pcrc.cellScanner();<a name="line.1221"></a>
+<span class="sourceLineNo">1222</span><a name="line.1222"></a>
+<span class="sourceLineNo">1223</span>    final Call call = new Call(this.callIdCnt.getAndIncrement(), md, param, cells, returnType,<a name="line.1223"></a>
+<span class="sourceLineNo">1224</span>        pcrc.getCallTimeout(), MetricsConnection.newCallStats());<a name="line.1224"></a>
 <span class="sourceLineNo">1225</span><a name="line.1225"></a>
-<span class="sourceLineNo">1226</span>    final CallFuture cts;<a name="line.1226"></a>
-<span class="sourceLineNo">1227</span>    if (connection.callSender != null) {<a name="line.1227"></a>
-<span class="sourceLineNo">1228</span>      cts = connection.callSender.sendCall(call, pcrc.getPriority(), Trace.currentSpan());<a name="line.1228"></a>
-<span class="sourceLineNo">1229</span>        pcrc.notifyOnCancel(new RpcCallback&lt;Object&gt;() {<a name="line.1229"></a>
-<span class="sourceLineNo">1230</span>          @Override<a name="line.1230"></a>
-<span class="sourceLineNo">1231</span>          public void run(Object parameter) {<a name="line.1231"></a>
-<span class="sourceLineNo">1232</span>            connection.callSender.remove(cts);<a name="line.1232"></a>
-<span class="sourceLineNo">1233</span>          }<a name="line.1233"></a>
-<span class="sourceLineNo">1234</span>        });<a name="line.1234"></a>
-<span class="sourceLineNo">1235</span>        if (pcrc.isCanceled()) {<a name="line.1235"></a>
-<span class="sourceLineNo">1236</span>          // To finish if the call was cancelled before we set the notification (race condition)<a name="line.1236"></a>
-<span class="sourceLineNo">1237</span>          call.callComplete();<a name="line.1237"></a>
-<span class="sourceLineNo">1238</span>          return new Pair&lt;Message, CellScanner&gt;(call.response, call.cells);<a name="line.1238"></a>
-<span class="sourceLineNo">1239</span>        }<a name="line.1239"></a>
-<span class="sourceLineNo">1240</span>    } else {<a name="line.1240"></a>
-<span class="sourceLineNo">1241</span>      cts = null;<a name="line.1241"></a>
-<span class="sourceLineNo">1242</span>      connection.tracedWriteRequest(call, pcrc.getPriority(), Trace.currentSpan());<a name="line.1242"></a>
-<span class="sourceLineNo">1243</span>    }<a name="line.1243"></a>
-<span class="sourceLineNo">1244</span><a name="line.1244"></a>
-<span class="sourceLineNo">1245</span>    while (!call.done) {<a name="line.1245"></a>
-<span class="sourceLineNo">1246</span>      if (call.checkAndSetTimeout()) {<a name="line.1246"></a>
-<span class="sourceLineNo">1247</span>        if (cts != null) connection.callSender.remove(cts);<a name="line.1247"></a>
-<span class="sourceLineNo">1248</span>        break;<a name="line.1248"></a>
-<span class="sourceLineNo">1249</span>      }<a name="line.1249"></a>
-<span class="sourceLineNo">1250</span>      if (connection.shouldCloseConnection.get()) {<a name="line.1250"></a>
-<span class="sourceLineNo">1251</span>        throw new ConnectionClosingException("Call id=" + call.id +<a name="line.1251"></a>
-<span class="sourceLineNo">1252</span>            " on server " + addr + " aborted: connection is closing");<a name="line.1252"></a>
-<span class="sourceLineNo">1253</span>      }<a name="line.1253"></a>
-<span class="sourceLineNo">1254</span>      try {<a name="line.1254"></a>
-<span class="sourceLineNo">1255</span>        synchronized (call) {<a name="line.1255"></a>
-<span class="sourceLineNo">1256</span>          if (call.done) break;<a name="line.1256"></a>
-<span class="sourceLineNo">1257</span>          call.wait(Math.min(call.remainingTime(), 1000) + 1);<a name="line.1257"></a>
-<span class="sourceLineNo">1258</span>        }<a name="line.1258"></a>
-<span class="sourceLineNo">1259</span>      } catch (InterruptedException e) {<a name="line.1259"></a>
-<span class="sourceLineNo">1260</span>        call.setException(new InterruptedIOException());<a name="line.1260"></a>
-<span class="sourceLineNo">1261</span>        if (cts != null) connection.callSender.remove(cts);<a name="line.1261"></a>
-<span class="sourceLineNo">1262</span>        throw e;<a name="line.1262"></a>
-<span class="sourceLineNo">1263</span>      }<a name="line.1263"></a>
-<span class="sourceLineNo">1264</span>    }<a name="line.1264"></a>
-<span class="sourceLineNo">1265</span><a name="line.1265"></a>
-<span class="sourceLineNo">1266</span>    if (call.error != null) {<a name="line.1266"></a>
-<span class="sourceLineNo">1267</span>      if (call.error instanceof RemoteException) {<a name="line.1267"></a>
-<span class="sourceLineNo">1268</span>        call.error.fillInStackTrace();<a name="line.1268"></a>
-<span class="sourceLineNo">1269</span>        throw call.error;<a name="line.1269"></a>
-<span class="sourceLineNo">1270</span>      }<a name="line.1270"></a>
-<span class="sourceLineNo">1271</span>      // local exception<a name="line.1271"></a>
-<span class="sourceLineNo">1272</span>      throw wrapException(addr, call.error);<a name="line.1272"></a>
-<span class="sourceLineNo">1273</span>    }<a name="line.1273"></a>
-<span class="sourceLineNo">1274</span><a name="line.1274"></a>
-<span class="sourceLineNo">1275</span>    return new Pair&lt;Message, CellScanner&gt;(call.response, call.cells);<a name="line.1275"></a>
-<span class="sourceLineNo">1276</span>  }<a name="line.1276"></a>
-<span class="sourceLineNo">1277</span><a name="line.1277"></a>
-<span class="sourceLineNo">1278</span><a name="line.1278"></a>
-<span class="sourceLineNo">1279</span>  /**<a name="line.1279"></a>
-<span class="sourceLineNo">1280</span>   * Interrupt the connections to the given ip:port server. This should be called if the server<a name="line.1280"></a>
-<span class="sourceLineNo">1281</span>   *  is known as actually dead. This will not prevent current operation to be retried, and,<a name="line.1281"></a>
-<span class="sourceLineNo">1282</span>   *  depending on their own behavior, they may retry on the same server. This can be a feature,<a name="line.1282"></a>
-<span class="sourceLineNo">1283</span>   *  for example at startup. In any case, they're likely to get connection refused (if the<a name="line.1283"></a>
-<span class="sourceLineNo">1284</span>   *  process died) or no route to host: i.e. their next retries should be faster and with a<a name="line.1284"></a>
-<span class="sourceLineNo">1285</span>   *  safe exception.<a name="line.1285"></a>
-<span class="sourceLineNo">1286</span>   */<a name="line.1286"></a>
-<span class="sourceLineNo">1287</span>  @Override<a name="line.1287"></a>
-<span class="sourceLineNo">1288</span>  public void cancelConnections(ServerName sn) {<a name="line.1288"></a>
-<span class="sourceLineNo">1289</span>    synchronized (connections) {<a name="line.1289"></a>
-<span class="sourceLineNo">1290</span>      for (Connection connection : connections.values()) {<a name="line.1290"></a>
-<span class="sourceLineNo">1291</span>        if (connection.isAlive() &amp;&amp;<a name="line.1291"></a>
-<span class="sourceLineNo">1292</span>            connection.getRemoteAddress().getPort() == sn.getPort() &amp;&amp;<a name="line.1292"></a>
-<span class="sourceLineNo">1293</span>            connection.getRemoteAddress().getHostName().equals(sn.getHostname())) {<a name="line.1293"></a>
-<span class="sourceLineNo">1294</span>          LOG.info("The server on " + sn.toString() +<a name="line.1294"></a>
-<span class="sourceLineNo">1295</span>              " is dead - stopping the connection " + connection.remoteId);<a name="line.1295"></a>
-<span class="sourceLineNo">1296</span>          connection.interrupt(); // We're interrupting a Reader. It means we want it to finish.<a name="line.1296"></a>
-<span class="sourceLineNo">1297</span>                                  // This will close the connection as well.<a name="line.1297"></a>
-<span class="sourceLineNo">1298</span>        }<a name="line.1298"></a>
-<span class="sourceLineNo">1299</span>      }<a name="line.1299"></a>
-<span class="sourceLineNo">1300</span>    }<a name="line.1300"></a>
-<span class="sourceLineNo">1301</span>  }<a name="line.1301"></a>
-<span class="sourceLineNo">1302</span><a name="line.1302"></a>
-<span class="sourceLineNo">1303</span>  /**<a name="line.1303"></a>
-<span class="sourceLineNo">1304</span>   *  Get a connection from the pool, or create a new one and add it to the<a name="line.1304"></a>
-<span class="sourceLineNo">1305</span>   * pool. Connections to a given host/port are reused.<a name="line.1305"></a>
-<span class="sourceLineNo">1306</span>   */<a name="line.1306"></a>
-<span class="sourceLineNo">1307</span>  protected Connection getConnection(User ticket, Call call, InetSocketAddress addr)<a name="line.1307"></a>
-<span class="sourceLineNo">1308</span>  throws IOException {<a name="line.1308"></a>
-<span class="sourceLineNo">1309</span>    if (!running.get()) throw new StoppedRpcClientException();<a name="line.1309"></a>
-<span class="sourceLineNo">1310</span>    Connection connection;<a name="line.1310"></a>
-<span class="sourceLineNo">1311</span>    ConnectionId remoteId =<a name="line.1311"></a>
-<span class="sourceLineNo">1312</span>      new ConnectionId(ticket, call.md.getService().getName(), addr);<a name="line.1312"></a>
-<span class="sourceLineNo">1313</span>    synchronized (connections) {<a name="line.1313"></a>
-<span class="sourceLineNo">1314</span>      connection = connections.get(remoteId);<a name="line.1314"></a>
-<span class="sourceLineNo">1315</span>      if (connection == null) {<a name="line.1315"></a>
-<span class="sourceLineNo">1316</span>        connection = createConnection(remoteId, this.codec, this.compressor);<a name="line.1316"></a>
-<span class="sourceLineNo">1317</span>        connections.put(remoteId, connection);<a name="line.1317"></a>
-<span class="sourceLineNo">1318</span>      }<a name="line.1318"></a>
-<span class="sourceLineNo">1319</span>    }<a name="line.1319"></a>
-<span class="sourceLineNo">1320</span><a name="line.1320"></a>
-<span class="sourceLineNo">1321</span>    return connection;<a name="line.1321"></a>
-<span class="sourceLineNo">1322</span>  }<a name="line.1322"></a>
-<span class="sourceLineNo">1323</span>}<a name="line.1323"></a>
+<span class="sourceLineNo">1226</span>    final Connection connection = getConnection(ticket, call, addr);<a name="line.1226"></a>
+<span class="sourceLineNo">1227</span><a name="line.1227"></a>
+<span class="sourceLineNo">1228</span>    final CallFuture cts;<a name="line.1228"></a>
+<span class="sourceLineNo">1229</span>    if (connection.callSender != null) {<a name="line.1229"></a>
+<span class="sourceLineNo">1230</span>      cts = connection.callSender.sendCall(call, pcrc.getPriority(), Trace.currentSpan());<a name="line.1230"></a>
+<span class="sourceLineNo">1231</span>        pcrc.notifyOnCancel(new RpcCallback&lt;Object&gt;() {<a name="line.1231"></a>
+<span class="sourceLineNo">1232</span>          @Override<a name="line.1232"></a>
+<span class="sourceLineNo">1233</span>          public void run(Object parameter) {<a name="line.1233"></a>
+<span class="sourceLineNo">1234</span>            connection.callSender.remove(cts);<a name="line.1234"></a>
+<span class="sourceLineNo">1235</span>          }<a name="line.1235"></a>
+<span class="sourceLineNo">1236</span>        });<a name="line.1236"></a>
+<span class="sourceLineNo">1237</span>        if (pcrc.isCanceled()) {<a name="line.1237"></a>
+<span class="sourceLineNo">1238</span>          // To finish if the call was cancelled before we set the notification (race condition)<a name="line.1238"></a>
+<span class="sourceLineNo">1239</span>          call.callComplete();<a name="line.1239"></a>
+<span class="sourceLineNo">1240</span>          return new Pair&lt;Message, CellScanner&gt;(call.response, call.cells);<a name="line.1240"></a>
+<span class="sourceLineNo">1241</span>        }<a name="line.1241"></a>
+<span class="sourceLineNo">1242</span>    } else {<a name="line.1242"></a>
+<span class="sourceLineNo">1243</span>      cts = null;<a name="line.1243"></a>
+<span class="sourceLineNo">1244</span>      connection.tracedWriteRequest(call, pcrc.getPriority(), Trace.currentSpan());<a name="line.1244"></a>
+<span class="sourceLineNo">1245</span>    }<a name="line.1245"></a>
+<span class="sourceLineNo">1246</span><a name="line.1246"></a>
+<span class="sourceLineNo">1247</span>    while (!call.done) {<a name="line.1247"></a>
+<span class="sourceLineNo">1248</span>      if (call.checkAndSetTimeout()) {<a name="line.1248"></a>
+<span class="sourceLineNo">1249</span>        if (cts != null) connection.callSender.remove(cts);<a name="line.1249"></a>
+<span class="sourceLineNo">1250</span>        break;<a name="line.1250"></a>
+<span class="sourceLineNo">1251</span>      }<a name="line.1251"></a>
+<span class="sourceLineNo">1252</span>      if (connection.shouldCloseConnection.get()) {<a name="line.1252"></a>
+<span class="sourceLineNo">1253</span>        throw new ConnectionClosingException("Call id=" + call.id +<a name="line.1253"></a>
+<span class="sourceLineNo">1254</span>            " on server " + addr + " aborted: connection is closing");<a name="line.1254"></a>
+<span class="sourceLineNo">1255</span>      }<a name="line.1255"></a>
+<span class="sourceLineNo">1256</span>      try {<a name="line.1256"></a>
+<span class="sourceLineNo">1257</span>        synchronized (call) {<a name="line.1257"></a>
+<span class="sourceLineNo">1258</span>          if (call.done) break;<a name="line.1258"></a>
+<span class="sourceLineNo">1259</span>          call.wait(Math.min(call.remainingTime(), 1000) + 1);<a name="line.1259"></a>
+<span class="sourceLineNo">1260</span>        }<a name="line.1260"></a>
+<span class="sourceLineNo">1261</span>      } catch (InterruptedException e) {<a name="line.1261"></a>
+<span class="sourceLineNo">1262</span>        call.setException(new InterruptedIOException());<a name="line.1262"></a>
+<span class="sourceLineNo">1263</span>        if (cts != null) connection.callSender.remove(cts);<a name="line.1263"></a>
+<span class="sourceLineNo">1264</span>        throw e;<a name="line.1264"></a>
+<span class="sourceLineNo">1265</span>      }<a name="line.1265"></a>
+<span class="sourceLineNo">1266</span>    }<a name="line.1266"></a>
+<span class="sourceLineNo">1267</span><a name="line.1267"></a>
+<span class="sourceLineNo">1268</span>    if (call.error != null) {<a name="line.1268"></a>
+<span class="sourceLineNo">1269</span>      if (call.error instanceof RemoteException) {<a name="line.1269"></a>
+<span class="sourceLineNo">1270</span>        call.error.fillInStackTrace();<a name="line.1270"></a>
+<span class="sourceLineNo">1271</span>        throw call.error;<a name="line.1271"></a>
+<span class="sourceLineNo">1272</span>      }<a name="line.1272"></a>
+<span class="sourceLineNo">1273</span>      // local exception<a name="line.1273"></a>
+<span class="sourceLineNo">1274</span>      throw wrapException(addr, call.error);<a name="line.1274"></a>
+<span class="sourceLineNo">1275</span>    }<a name="line.1275"></a>
+<span class="sourceLineNo">1276</span><a name="line.1276"></a>
+<span class="sourceLineNo">1277</span>    return new Pair&lt;Message, CellScanner&gt;(call.response, call.cells);<a name="line.1277"></a>
+<span class="sourceLineNo">1278</span>  }<a name="line.1278"></a>
+<span class="sourceLineNo">1279</span><a name="line.1279"></a>
+<span class="sourceLineNo">1280</span><a name="line.1280"></a>
+<span class="sourceLineNo">1281</span>  /**<a name="line.1281"></a>
+<span class="sourceLineNo">1282</span>   * Interrupt the connections to the given ip:port server. This should be called if the server<a name="line.1282"></a>
+<span class="sourceLineNo">1283</span>   *  is known as actually dead. This will not prevent current operation to be retried, and,<a name="line.1283"></a>
+<span class="sourceLineNo">1284</span>   *  depending on their own behavior, they may retry on the same server. This can be a feature,<a name="line.1284"></a>
+<span class="sourceLineNo">1285</span>   *  for example at startup. In any case, they're likely to get connection refused (if the<a name="line.1285"></a>
+<span class="sourceLineNo">1286</span>   *  process died) or no route to host: i.e. their next retries should be faster and with a<a name="line.1286"></a>
+<span class="sourceLineNo">1287</span>   *  safe exception.<a name="line.1287"></a>
+<span class="sourceLineNo">1288</span>   */<a name="line.1288"></a>
+<span class="sourceLineNo">1289</span>  @Override<a name="line.1289"></a>
+<span class="sourceLineNo">1290</span>  public void cancelConnections(ServerName sn) {<a name="line.1290"></a>
+<span class="sourceLineNo">1291</span>    synchronized (connections) {<a name="line.1291"></a>
+<span class="sourceLineNo">1292</span>      for (Connection connection : connections.values()) {<a name="line.1292"></a>
+<span class="sourceLineNo">1293</span>        if (connection.isAlive() &amp;&amp;<a name="line.1293"></a>
+<span class="sourceLineNo">1294</span>            connection.getRemoteAddress().getPort() == sn.getPort() &amp;&amp;<a name="line.1294"></a>
+<span class="sourceLineNo">1295</span>            connection.getRemoteAddress().getHostName().equals(sn.getHostname())) {<a name="line.1295"></a>
+<span class="sourceLineNo">1296</span>          LOG.info("The server on " + sn.toString() +<a name="line.1296"></a>
+<span class="sourceLineNo">1297</span>              " is dead - stopping the connection " + connection.remoteId);<a name="line.1297"></a>
+<span class="sourceLineNo">1298</span>          connection.interrupt(); // We're interrupting a Reader. It means we want it to finish.<a name="line.1298"></a>
+<span class="sourceLineNo">1299</span>                                  // This will close the connection as well.<a name="line.1299"></a>
+<span class="sourceLineNo">1300</span>        }<a name="line.1300"></a>
+<span class="sourceLineNo">1301</span>      }<a name="line.1301"></a>
+<span class="sourceLineNo">1302</span>    }<a name="line.1302"></a>
+<span class="sourceLineNo">1303</span>  }<a name="line.1303"></a>
+<span class="sourceLineNo">1304</span><a name="line.1304"></a>
+<span class="sourceLineNo">1305</span>  /**<a name="line.1305"></a>
+<span class="sourceLineNo">1306</span>   *  Get a connection from the pool, or create a new one and add it to the<a name="line.1306"></a>
+<span class="sourceLineNo">1307</span>   * pool. Connections to a given host/port are reused.<a name="line.1307"></a>
+<span class="sourceLineNo">1308</span>   */<a name="line.1308"></a>
+<span class="sourceLineNo">1309</span>  protected Connection getConnection(User ticket, Call call, InetSocketAddress addr)<a name="line.1309"></a>
+<span class="sourceLineNo">1310</span>  throws IOException {<a name="line.1310"></a>
+<span class="sourceLineNo">1311</span>    if (!running.get()) throw new StoppedRpcClientException();<a name="line.1311"></a>
+<span class="sourceLineNo">1312</span>    Connection connection;<a name="line.1312"></a>
+<span class="sourceLineNo">1313</span>    ConnectionId remoteId =<a name="line.1313"></a>
+<span class="sourceLineNo">1314</span>      new ConnectionId(ticket, call.md.getService().getName(), addr);<a name="line.1314"></a>
+<span class="sourceLineNo">1315</span>    synchronized (connections) {<a name="line.1315"></a>
+<span class="sourceLineNo">1316</span>      connection = connections.get(remoteId);<a name="line.1316"></a>
+<span class="sourceLineNo">1317</span>      if (connection == null) {<a name="line.1317"></a>
+<span class="sourceLineNo">1318</span>        connection = createConnection(remoteId, this.codec, this.compressor);<a name="line.1318"></a>
+<span class="sourceLineNo">1319</span>        connections.put(remoteId, connection);<a name="line.1319"></a>
+<span class="sourceLineNo">1320</span>      }<a name="line.1320"></a>
+<span class="sourceLineNo">1321</span>    }<a name="line.1321"></a>
+<span class="sourceLineNo">1322</span><a name="line.1322"></a>
+<span class="sourceLineNo">1323</span>    return connection;<a name="line.1323"></a>
+<span class="sourceLineNo">1324</span>  }<a name="line.1324"></a>
+<span class="sourceLineNo">1325</span>}<a name="line.1325"></a>