You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-issues@hadoop.apache.org by "Haibo Yan (JIRA)" <ji...@apache.org> on 2018/10/17 06:41:00 UTC

[jira] [Commented] (HADOOP-15530) RPC could stuck at senderFuture.get()

    [ https://issues.apache.org/jira/browse/HADOOP-15530?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16653057#comment-16653057 ] 

Haibo Yan commented on HADOOP-15530:
------------------------------------

It looks like main issue here is the thread in sendParamsExecutor either has some unexpected behavior or maybe never have data back(stuck), for solving this deadlock, we could cancel the future when timeouts, this can temporarily solve the deadlock. But we need consider the nice logging for why the thread stuck

> RPC could stuck at senderFuture.get()
> -------------------------------------
>
>                 Key: HADOOP-15530
>                 URL: https://issues.apache.org/jira/browse/HADOOP-15530
>             Project: Hadoop Common
>          Issue Type: Bug
>          Components: common
>            Reporter: Yongjun Zhang
>            Assignee: Yongjun Zhang
>            Priority: Major
>
> In Client.java, sendRpcRequest does the following
> {code}
>    /** Initiates a rpc call by sending the rpc request to the remote server.
>      * Note: this is not called from the Connection thread, but by other
>      * threads.
>      * @param call - the rpc request
>      */
>     public void sendRpcRequest(final Call call)
>         throws InterruptedException, IOException {
>       if (shouldCloseConnection.get()) {
>         return;
>       }
>       // Serialize the call to be sent. This is done from the actual
>       // caller thread, rather than the sendParamsExecutor thread,
>       // so that if the serialization throws an error, it is reported
>       // properly. This also parallelizes the serialization.
>       //
>       // Format of a call on the wire:
>       // 0) Length of rest below (1 + 2)
>       // 1) RpcRequestHeader  - is serialized Delimited hence contains length
>       // 2) RpcRequest
>       //
>       // Items '1' and '2' are prepared here. 
>       RpcRequestHeaderProto header = ProtoUtil.makeRpcRequestHeader(
>           call.rpcKind, OperationProto.RPC_FINAL_PACKET, call.id, call.retry,
>           clientId);
>       final ResponseBuffer buf = new ResponseBuffer();
>       header.writeDelimitedTo(buf);
>       RpcWritable.wrap(call.rpcRequest).writeTo(buf);
>       synchronized (sendRpcRequestLock) {
>         Future<?> senderFuture = sendParamsExecutor.submit(new Runnable() {
>           @Override
>           public void run() {
>             try {
>               synchronized (ipcStreams.out) {
>                 if (shouldCloseConnection.get()) {
>                   return;
>                 }
>                 if (LOG.isDebugEnabled()) {
>                   LOG.debug(getName() + " sending #" + call.id
>                       + " " + call.rpcRequest);
>                 }
>                 // RpcRequestHeader + RpcRequest
>                 ipcStreams.sendRequest(buf.toByteArray());
>                 ipcStreams.flush();
>               }
>             } catch (IOException e) {
>               // exception at this point would leave the connection in an
>               // unrecoverable state (eg half a call left on the wire).
>               // So, close the connection, killing any outstanding calls
>               markClosed(e);
>             } finally {
>               //the buffer is just an in-memory buffer, but it is still polite to
>               // close early
>               IOUtils.closeStream(buf);
>             }
>           }
>         });
>         try {
>           senderFuture.get();
>         } catch (ExecutionException e) {
>           Throwable cause = e.getCause();
>           // cause should only be a RuntimeException as the Runnable above
>           // catches IOException
>           if (cause instanceof RuntimeException) {
>             throw (RuntimeException) cause;
>           } else {
>             throw new RuntimeException("unexpected checked exception", cause);
>           }
>         }
>       }
>     }
> {code}
> It's observed that the call can be stuck at {{senderFuture.get();}} with the following stack
> {code}
> "Thread-13" #40 prio=5 os_prio=0 tid=0x000000000fb0d000 nid=0xf189c waiting on condition [0x00007f697c582000]
>    java.lang.Thread.State: WAITING (parking)
>         at sun.misc.Unsafe.park(Native Method)
>         - parking to wait for  <0x00000006187e5ec0> (a java.util.concurrent.FutureTask)
>         at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
>         at java.util.concurrent.FutureTask.awaitDone(FutureTask.java:429)
>         at java.util.concurrent.FutureTask.get(FutureTask.java:191)
>         at org.apache.hadoop.ipc.Client$Connection.sendRpcRequest(Client.java:1088)
>         - locked <0x00000006215c1e08> (a java.lang.Object)
>         at org.apache.hadoop.ipc.Client.call(Client.java:1483)
>         at org.apache.hadoop.ipc.Client.call(Client.java:1441)
>         at org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:230)
>         at com.sun.proxy.$Proxy10.getBlockLocations(Unknown Source)
>         at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.getBlockLocations(ClientNamenodeProtocolTranslatorPB.java:266)
>         at sun.reflect.GeneratedMethodAccessor4.invoke(Unknown Source)
>         at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>         at java.lang.reflect.Method.invoke(Method.java:498)
>         at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:258)
>         at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:104)
>         at com.sun.proxy.$Proxy11.getBlockLocations(Unknown Source)
>         at org.apache.hadoop.hdfs.DFSClient.callGetBlockLocations(DFSClient.java:1323)
>         at org.apache.hadoop.hdfs.DFSClient.getLocatedBlocks(DFSClient.java:1310)
>         at org.apache.hadoop.hdfs.DFSClient.getLocatedBlocks(DFSClient.java:1298)
>         at org.apache.hadoop.hdfs.DFSInputStream.fetchLocatedBlocksAndGetLastBlockLength(DFSInputStream.java:309)
>         at org.apache.hadoop.hdfs.DFSInputStream.openInfo(DFSInputStream.java:275)
>         - locked <0x00000006187e5530> (a java.lang.Object)
>         at org.apache.hadoop.hdfs.DFSInputStream.<init>(DFSInputStream.java:267)
>         at org.apache.hadoop.hdfs.DFSClient.open(DFSClient.java:1629)
>         at org.apache.hadoop.hdfs.DistributedFileSystem$4.doCall(DistributedFileSystem.java:338)
>         at org.apache.hadoop.hdfs.DistributedFileSystem$4.doCall(DistributedFileSystem.java:334)
>         at org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81)
>         at org.apache.hadoop.hdfs.DistributedFileSystem.open(DistributedFileSystem.java:334)
> {code}
> Given that we support rpcTimeOut, we could chose the second method of Future below:
> {code}
>   /**
>      * Waits if necessary for the computation to complete, and then
>      * retrieves its result.
>      *
>      * @return the computed result
>      * @throws CancellationException if the computation was cancelled
>      * @throws ExecutionException if the computation threw an
>      * exception
>      * @throws InterruptedException if the current thread was interrupted
>      * while waiting
>      */
>     V get() throws InterruptedException, ExecutionException;
>     /**
>      * Waits if necessary for at most the given time for the computation
>      * to complete, and then retrieves its result, if available.
>      *
>      * @param timeout the maximum time to wait
>      * @param unit the time unit of the timeout argument
>      * @return the computed result
>      * @throws CancellationException if the computation was cancelled
>      * @throws ExecutionException if the computation threw an
>      * exception
>      * @throws InterruptedException if the current thread was interrupted
>      * while waiting
>      * @throws TimeoutException if the wait timed out
>      */
>     V get(long timeout, TimeUnit unit)
>         throws InterruptedException, ExecutionException, TimeoutException;
> {code}
> In theory, since the RPC at client is serialized, we could just use the main thread to do the execution, instead of using a threadpool to create new thread. This can be discussed in a separate jira.
> And why the RPC is not processed and returned by NN is another topic (HADOOP-15538).
>                                               



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: common-issues-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-issues-help@hadoop.apache.org