You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-issues@hadoop.apache.org by "Wenzhe Zhou (Jira)" <ji...@apache.org> on 2022/03/28 22:00:00 UTC

[jira] [Commented] (HADOOP-15720) rpcTimeout may not have been applied correctly

    [ https://issues.apache.org/jira/browse/HADOOP-15720?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17513673#comment-17513673 ] 

Wenzhe Zhou commented on HADOOP-15720:
--------------------------------------

Hi Hadoop team, does anyone is actively working on this issue? Any ETA?  

> rpcTimeout may not have been applied correctly
> ----------------------------------------------
>
>                 Key: HADOOP-15720
>                 URL: https://issues.apache.org/jira/browse/HADOOP-15720
>             Project: Hadoop Common
>          Issue Type: Bug
>          Components: common
>            Reporter: Yongjun Zhang
>            Priority: Major
>
> org.apache.hadoop.ipc.Client send multiple RPC calls to server synchronously via the same connection as in the following synchronized code block:
> {code:java}
>       synchronized (sendRpcRequestLock) {
>         Future<?> senderFuture = sendParamsExecutor.submit(new Runnable() {
>           @Override
>           public void run() {
>             try {
>               synchronized (Connection.this.out) {
>                 if (shouldCloseConnection.get()) {
>                   return;
>                 }
>                 
>                 if (LOG.isDebugEnabled()) {
>                   LOG.debug(getName() + " sending #" + call.id
>                       + " " + call.rpcRequest);
>                 }
>          
>                 byte[] data = d.getData();
>                 int totalLength = d.getLength();
>                 out.writeInt(totalLength); // Total Length
>                 out.write(data, 0, totalLength);// RpcRequestHeader + RpcRequest
>                 out.flush();
>               }
>             } catch (IOException e) {
>               // exception at this point would leave the connection in an
>               // unrecoverable state (eg half a call left on the wire).
>               // So, close the connection, killing any outstanding calls
>               markClosed(e);
>             } finally {
>               //the buffer is just an in-memory buffer, but it is still polite to
>               // close early
>               IOUtils.closeStream(d);
>             }
>           }
>         });
>       
>         try {
>           senderFuture.get();
>         } catch (ExecutionException e) {
>           Throwable cause = e.getCause();
>           
>           // cause should only be a RuntimeException as the Runnable above
>           // catches IOException
>           if (cause instanceof RuntimeException) {
>             throw (RuntimeException) cause;
>           } else {
>             throw new RuntimeException("unexpected checked exception", cause);
>           }
>         }
>       }
> {code}
> And it then waits for result asynchronously via
> {code:java}
>     /* Receive a response.
>      * Because only one receiver, so no synchronization on in.
>      */
>     private void receiveRpcResponse() {
>       if (shouldCloseConnection.get()) {
>         return;
>       }
>       touch();
>       
>       try {
>         int totalLen = in.readInt();
>         RpcResponseHeaderProto header = 
>             RpcResponseHeaderProto.parseDelimitedFrom(in);
>         checkResponse(header);
>         int headerLen = header.getSerializedSize();
>         headerLen += CodedOutputStream.computeRawVarint32Size(headerLen);
>         int callId = header.getCallId();
>         if (LOG.isDebugEnabled())
>           LOG.debug(getName() + " got value #" + callId);
>         Call call = calls.get(callId);
>         RpcStatusProto status = header.getStatus();
> ......
> {code}
> However, we can see that the {{call}} returned by {{receiveRpcResonse()}} above may be in any order.
> The following code
> {code:java}
>         int totalLen = in.readInt();
> {code}
> eventually calls one of the following two methods, where rpcTimeOut is checked against:
> {code:java}
>       /** Read a byte from the stream.
>        * Send a ping if timeout on read. Retries if no failure is detected
>        * until a byte is read.
>        * @throws IOException for any IO problem other than socket timeout
>        */
>       @Override
>       public int read() throws IOException {
>         int waiting = 0;
>         do {
>           try {
>             return super.read();
>           } catch (SocketTimeoutException e) {
>             waiting += soTimeout;
>             handleTimeout(e, waiting);
>           }
>         } while (true);
>       }
>       /** Read bytes into a buffer starting from offset <code>off</code>
>        * Send a ping if timeout on read. Retries if no failure is detected
>        * until a byte is read.
>        * 
>        * @return the total number of bytes read; -1 if the connection is closed.
>        */
>       @Override
>       public int read(byte[] buf, int off, int len) throws IOException {
>         int waiting = 0;
>         do {
>           try {
>             return super.read(buf, off, len);
>           } catch (SocketTimeoutException e) {
>             waiting += soTimeout;
>             handleTimeout(e, waiting);
>           }
>         } while (true);
>       }
> {code}
> But the waiting time is always initialized to 0 for each of the above read calls, so each call can take up to rpcTimeout. And the real time to time out a call appears to be accumulative.
> For example, if the client issue call1, call2, then it waits for result, if the first call1 took (rpcTimeout - 1), thus no time out, the second took (rpcTimeout -1), thus no timeout, but it effectively took 2*(rpcTimeout -1) which could be much bigger than rpcTimeout and should time out.
> Worst case is that a RPC may take indeterminatey long and doesn't time out.
> It seems more accurate to remember the time that an RPC is sent to the server, and then check time out here:
> {code:java}
>   public Writable call(RPC.RpcKind rpcKind, Writable rpcRequest,
>       ConnectionId remoteId, int serviceClass,
>       AtomicBoolean fallbackToSimpleAuth) throws IOException {
>     final Call call = createCall(rpcKind, rpcRequest);
>     Connection connection = getConnection(remoteId, call, serviceClass,
>       fallbackToSimpleAuth);
>     try {
>       connection.sendRpcRequest(call);                 // send the rpc request
>     } catch (RejectedExecutionException e) {
>       throw new IOException("connection has been closed", e);
>     } catch (InterruptedException e) {
>       Thread.currentThread().interrupt();
>       LOG.warn("interrupted waiting to send rpc request to server", e);
>       throw new IOException(e);
>     }
>     synchronized (call) {
>       while (!call.done) {
>         try {
>           call.wait();                           // wait for the result
>         } catch (InterruptedException ie) {
>           Thread.currentThread().interrupt();
>           throw new InterruptedIOException("Call interrupted");
>         } <=should check how long it has waited here, time out if rpcTimeout has been reached
>       }  
>       if (call.error != null) {
>         if (call.error instanceof RemoteException) {
>           call.error.fillInStackTrace();
>           throw call.error;
>         } else { // local exception
>           InetSocketAddress address = connection.getRemoteAddress();
>           throw NetUtils.wrapException(address.getHostName(),
>                   address.getPort(),
>                   NetUtils.getHostname(),
>                   0,
>                   call.error);
>         }
>       } else {
>         return call.getRpcResponse();
>       }
>     }
>   }
> {code}
> basically we should change the call highlighted above from
> {code:java}
>     public final void wait() throws InterruptedException
> {code}
> to
> {code:java}
> public final void wait(long timeout, int nanos) throws InterruptedException
> {code}
> and apply rpcTimeout as the parameter value here (notice that I'm ignoring the time needed to send rpc over to the server, and ideally we should include that too, so rpcTimeout could mean what it intends to mean).
> Hi [~daryn] and [~kihwal], would you please help taking a look at my above analysis to see if I have any misunderstanding here?
> Thanks a lot.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

---------------------------------------------------------------------
To unsubscribe, e-mail: common-issues-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-issues-help@hadoop.apache.org