You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-dev@hadoop.apache.org by "Yongjun Zhang (JIRA)" <ji...@apache.org> on 2018/09/05 04:07:00 UTC

[jira] [Created] (HADOOP-15720) rpcTimeout may not have been applied correctly

Yongjun Zhang created HADOOP-15720:
--------------------------------------

             Summary: rpcTimeout may not have been applied correctly
                 Key: HADOOP-15720
                 URL: https://issues.apache.org/jira/browse/HADOOP-15720
             Project: Hadoop Common
          Issue Type: Bug
          Components: common
            Reporter: Yongjun Zhang


org.apache.hadoop.ipc.Client send multiple RPC calls to server synchronously via the same connection as in the following synchronized code block:
{code:java}
      synchronized (sendRpcRequestLock) {
        Future<?> senderFuture = sendParamsExecutor.submit(new Runnable() {
          @Override
          public void run() {
            try {
              synchronized (Connection.this.out) {
                if (shouldCloseConnection.get()) {
                  return;
                }
                
                if (LOG.isDebugEnabled()) {
                  LOG.debug(getName() + " sending #" + call.id
                      + " " + call.rpcRequest);
                }
         
                byte[] data = d.getData();
                int totalLength = d.getLength();
                out.writeInt(totalLength); // Total Length
                out.write(data, 0, totalLength);// RpcRequestHeader + RpcRequest
                out.flush();
              }
            } catch (IOException e) {
              // exception at this point would leave the connection in an
              // unrecoverable state (eg half a call left on the wire).
              // So, close the connection, killing any outstanding calls
              markClosed(e);
            } finally {
              //the buffer is just an in-memory buffer, but it is still polite to
              // close early
              IOUtils.closeStream(d);
            }
          }
        });
      
        try {
          senderFuture.get();
        } catch (ExecutionException e) {
          Throwable cause = e.getCause();
          
          // cause should only be a RuntimeException as the Runnable above
          // catches IOException
          if (cause instanceof RuntimeException) {
            throw (RuntimeException) cause;
          } else {
            throw new RuntimeException("unexpected checked exception", cause);
          }
        }
      }
{code}
And it then waits for result asynchronously via
{code:java}
    /* Receive a response.
     * Because only one receiver, so no synchronization on in.
     */
    private void receiveRpcResponse() {
      if (shouldCloseConnection.get()) {
        return;
      }
      touch();
      
      try {
        int totalLen = in.readInt();
        RpcResponseHeaderProto header = 
            RpcResponseHeaderProto.parseDelimitedFrom(in);
        checkResponse(header);

        int headerLen = header.getSerializedSize();
        headerLen += CodedOutputStream.computeRawVarint32Size(headerLen);

        int callId = header.getCallId();
        if (LOG.isDebugEnabled())
          LOG.debug(getName() + " got value #" + callId);

        Call call = calls.get(callId);
        RpcStatusProto status = header.getStatus();
......
{code}
However, we can see that the {{call}} returned by {{receiveRpcResonse()}} above may be in any order.

The following code
{code:java}
        int totalLen = in.readInt();
{code}
eventually calls one of the following two methods, where rpcTimeOut is checked against:
{code:java}
      /** Read a byte from the stream.
       * Send a ping if timeout on read. Retries if no failure is detected
       * until a byte is read.
       * @throws IOException for any IO problem other than socket timeout
       */
      @Override
      public int read() throws IOException {
        int waiting = 0;
        do {
          try {
            return super.read();
          } catch (SocketTimeoutException e) {
            waiting += soTimeout;
            handleTimeout(e, waiting);
          }
        } while (true);
      }

      /** Read bytes into a buffer starting from offset <code>off</code>
       * Send a ping if timeout on read. Retries if no failure is detected
       * until a byte is read.
       * 
       * @return the total number of bytes read; -1 if the connection is closed.
       */
      @Override
      public int read(byte[] buf, int off, int len) throws IOException {
        int waiting = 0;
        do {
          try {
            return super.read(buf, off, len);
          } catch (SocketTimeoutException e) {
            waiting += soTimeout;
            handleTimeout(e, waiting);
          }
        } while (true);
      }
{code}
But the waiting time is always initialized to 0 for each of the above read calls, so each call can take up to rpcTimeout. And the real time to time out a call appears to be accumulative.

For example, if the client issue call1, call2, then it waits for result, if the first call1 took (rpcTimeout - 1), thus no time out, the second took (rpcTimeout -1), thus no timeout, but it effectively took 2*(rpcTimeout -1) which could be much bigger than rpcTimeout and should time out.

It seems more accurate to remember the time that an RPC is sent to the server, and then check time out here:
{code:java}
  public Writable call(RPC.RpcKind rpcKind, Writable rpcRequest,
      ConnectionId remoteId, int serviceClass,
      AtomicBoolean fallbackToSimpleAuth) throws IOException {
    final Call call = createCall(rpcKind, rpcRequest);
    Connection connection = getConnection(remoteId, call, serviceClass,
      fallbackToSimpleAuth);
    try {
      connection.sendRpcRequest(call);                 // send the rpc request
    } catch (RejectedExecutionException e) {
      throw new IOException("connection has been closed", e);
    } catch (InterruptedException e) {
      Thread.currentThread().interrupt();
      LOG.warn("interrupted waiting to send rpc request to server", e);
      throw new IOException(e);
    }

    synchronized (call) {
      while (!call.done) {
        try {
          call.wait();                           // wait for the result
        } catch (InterruptedException ie) {
          Thread.currentThread().interrupt();
          throw new InterruptedIOException("Call interrupted");
        } <=should check how long it has waited here, time out if rpcTimeout has been reached
      }  

      if (call.error != null) {
        if (call.error instanceof RemoteException) {
          call.error.fillInStackTrace();
          throw call.error;
        } else { // local exception
          InetSocketAddress address = connection.getRemoteAddress();
          throw NetUtils.wrapException(address.getHostName(),
                  address.getPort(),
                  NetUtils.getHostname(),
                  0,
                  call.error);
        }
      } else {
        return call.getRpcResponse();
      }
    }
  }
{code}
basically we should change the call highlighted above from
{code:java}
    public final void wait() throws InterruptedException
{code}
to
{code:java}
public final void wait(long timeout, int nanos) throws InterruptedException
{code}
and apply rpcTimeout here.

Hi [~daryn] and [~kihwal], would you please help taking a look at my above analysis to see if I have any misunderstanding here?

Thanks a lot.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: common-dev-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-dev-help@hadoop.apache.org