You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@flume.apache.org by "Hari Shreedharan (JIRA)" <ji...@apache.org> on 2012/11/29 19:34:58 UTC

[jira] [Created] (FLUME-1748) HDFS Sink can end up in a bad state if the Callables are interrupted.

Hari Shreedharan created FLUME-1748:
---------------------------------------

             Summary: HDFS Sink can end up in a bad state if the Callables are interrupted.
                 Key: FLUME-1748
                 URL: https://issues.apache.org/jira/browse/FLUME-1748
             Project: Flume
          Issue Type: Bug
            Reporter: Hari Shreedharan


If one or more HDFS Sink writes/flush/close end up taking too long, the thread is interrupted by the future.cancel(true) call. The Hadoop RPC client re-sets the interrupt flag on a thread which is interrupted. Also if the thread is interrupted after the RPC call, but before the call() method returns, the interrupt flag stays on the thread. A future HDFS file open call would lead to an exception of this sort:

{code}
[SinkRunner-PollingRunner-DefaultSinkProcessor] (org.apache.flume.sink.hdfs.HDFSEventSink.process:457)  - HDFS IO error
java.io.IOException: Failed on local exception: java.nio.channels.ClosedByInterruptException; Host Details : local host is: "random.example.com/10.20.81.108"; destination host is: "random2.example.com":8020; 
	at org.apache.hadoop.net.NetUtils.wrapException(NetUtils.java:759)
	at org.apache.hadoop.ipc.Client.call(Client.java:1164)
	at org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:202)
	at $Proxy9.create(Unknown Source)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
	at java.lang.reflect.Method.invoke(Method.java:597)
	at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:164)
	at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:83)
	at $Proxy9.create(Unknown Source)
	at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.create(ClientNamenodeProtocolTranslatorPB.java:192)
	at org.apache.hadoop.hdfs.DFSOutputStream.<init>(DFSOutputStream.java:1298)
	at org.apache.hadoop.hdfs.DFSOutputStream.newStreamForCreate(DFSOutputStream.java:1317)
	at org.apache.hadoop.hdfs.DFSClient.create(DFSClient.java:1215)
	at org.apache.hadoop.hdfs.DFSClient.create(DFSClient.java:1173)
	at org.apache.hadoop.hdfs.DistributedFileSystem.create(DistributedFileSystem.java:272)
	at org.apache.hadoop.hdfs.DistributedFileSystem.create(DistributedFileSystem.java:261)
	at org.apache.hadoop.hdfs.DistributedFileSystem.create(DistributedFileSystem.java:78)
	at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:805)
	at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:786)
	at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:685)
	at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:674)
	at org.apache.flume.sink.hdfs.HDFSDataStream.open(HDFSDataStream.java:60)
	at org.apache.flume.sink.hdfs.BucketWriter.doOpen(BucketWriter.java:210)
	at org.apache.flume.sink.hdfs.BucketWriter.access$000(BucketWriter.java:53)
	at org.apache.flume.sink.hdfs.BucketWriter$1.run(BucketWriter.java:172)
	at org.apache.flume.sink.hdfs.BucketWriter$1.run(BucketWriter.java:170)
	at org.apache.flume.sink.hdfs.BucketWriter.runPrivileged(BucketWriter.java:143)
	at org.apache.flume.sink.hdfs.BucketWriter.open(BucketWriter.java:170)
	at org.apache.flume.sink.hdfs.BucketWriter.append(BucketWriter.java:365)
	at org.apache.flume.sink.hdfs.HDFSEventSink$2.call(HDFSEventSink.java:730)
	at org.apache.flume.sink.hdfs.HDFSEventSink$2.call(HDFSEventSink.java:728)
	at java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:303)
	at java.util.concurrent.FutureTask.run(FutureTask.java:138)
	at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)
	at java.lang.Thread.run(Thread.java:662)
Caused by: java.nio.channels.ClosedByInterruptException
	at java.nio.channels.spi.AbstractInterruptibleChannel.end(AbstractInterruptibleChannel.java:184)
	at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:511)
	at org.apache.hadoop.net.SocketIOWithTimeout.connect(SocketIOWithTimeout.java:193)
	at org.apache.hadoop.net.NetUtils.connect(NetUtils.java:523)
	at org.apache.hadoop.net.NetUtils.connect(NetUtils.java:488)
	at org.apache.hadoop.ipc.Client$Connection.setupConnection(Client.java:476)
	at org.apache.hadoop.ipc.Client$Connection.setupIOstreams(Client.java:570)
	at org.apache.hadoop.ipc.Client$Connection.access$1700(Client.java:220)
	at org.apache.hadoop.ipc.Client.getConnection(Client.java:1213)
	at org.apache.hadoop.ipc.Client.call(Client.java:1140)
	... 36 more
{code}

The relevant code that re-sets the interrupt flag is in http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java?revision=1410024&view=markup

The method is Client#call(RPC.RpcKind, Writable,ConnectionId). Note that the InterruptedException is caught and the interrupt flag is re-set.

{code}
  public Writable call(RPC.RpcKind rpcKind, Writable rpcRequest,
      ConnectionId remoteId) throws InterruptedException, IOException {
    Call call = new Call(rpcKind, rpcRequest);
    Connection connection = getConnection(remoteId, call);
    connection.sendParam(call);                 // send the parameter
    boolean interrupted = false;
    synchronized (call) {
      while (!call.done) {
        try {
          call.wait();                           // wait for the result
        } catch (InterruptedException ie) {
          // save the fact that we were interrupted
          interrupted = true;
        }
      }

      if (interrupted) {
        // set the interrupt flag now that we are done waiting
        Thread.currentThread().interrupt();
      }

      if (call.error != null) {
        if (call.error instanceof RemoteException) {
          call.error.fillInStackTrace();
          throw call.error;
        } else { // local exception
          InetSocketAddress address = connection.getRemoteAddress();
          throw NetUtils.wrapException(address.getHostName(),
                  address.getPort(),
                  NetUtils.getHostname(),
                  0,
                  call.error);
        }
      } else {
        return call.getRpcResult();
      }
    }
  }
{code}

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira

[jira] [Updated] (FLUME-1748) HDFS Sink can end up in a bad state if the Callables are interrupted.

Posted by "Hari Shreedharan (JIRA)" <ji...@apache.org>.
     [ https://issues.apache.org/jira/browse/FLUME-1748?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Hari Shreedharan updated FLUME-1748:
------------------------------------

    Attachment: FLUME-1748.patch

Simple patch, not run unit tests. Will update later if necessary. Let me know if you don't agree with this.
                
> HDFS Sink can end up in a bad state if the Callables are interrupted.
> ---------------------------------------------------------------------
>
>                 Key: FLUME-1748
>                 URL: https://issues.apache.org/jira/browse/FLUME-1748
>             Project: Flume
>          Issue Type: Bug
>            Reporter: Hari Shreedharan
>         Attachments: FLUME-1748.patch
>
>
> If one or more HDFS Sink writes/flush/close end up taking too long, the thread is interrupted by the future.cancel(true) call. The Hadoop RPC client re-sets the interrupt flag on a thread which is interrupted. Also if the thread is interrupted after the RPC call, but before the call() method returns, the interrupt flag stays on the thread. A future HDFS file open call would lead to an exception of this sort:
> {code}
> [SinkRunner-PollingRunner-DefaultSinkProcessor] (org.apache.flume.sink.hdfs.HDFSEventSink.process:457)  - HDFS IO error
> java.io.IOException: Failed on local exception: java.nio.channels.ClosedByInterruptException; Host Details : local host is: "random.example.com/10.20.81.108"; destination host is: "random2.example.com":8020; 
> 	at org.apache.hadoop.net.NetUtils.wrapException(NetUtils.java:759)
> 	at org.apache.hadoop.ipc.Client.call(Client.java:1164)
> 	at org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:202)
> 	at $Proxy9.create(Unknown Source)
> 	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> 	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39)
> 	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
> 	at java.lang.reflect.Method.invoke(Method.java:597)
> 	at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:164)
> 	at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:83)
> 	at $Proxy9.create(Unknown Source)
> 	at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.create(ClientNamenodeProtocolTranslatorPB.java:192)
> 	at org.apache.hadoop.hdfs.DFSOutputStream.<init>(DFSOutputStream.java:1298)
> 	at org.apache.hadoop.hdfs.DFSOutputStream.newStreamForCreate(DFSOutputStream.java:1317)
> 	at org.apache.hadoop.hdfs.DFSClient.create(DFSClient.java:1215)
> 	at org.apache.hadoop.hdfs.DFSClient.create(DFSClient.java:1173)
> 	at org.apache.hadoop.hdfs.DistributedFileSystem.create(DistributedFileSystem.java:272)
> 	at org.apache.hadoop.hdfs.DistributedFileSystem.create(DistributedFileSystem.java:261)
> 	at org.apache.hadoop.hdfs.DistributedFileSystem.create(DistributedFileSystem.java:78)
> 	at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:805)
> 	at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:786)
> 	at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:685)
> 	at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:674)
> 	at org.apache.flume.sink.hdfs.HDFSDataStream.open(HDFSDataStream.java:60)
> 	at org.apache.flume.sink.hdfs.BucketWriter.doOpen(BucketWriter.java:210)
> 	at org.apache.flume.sink.hdfs.BucketWriter.access$000(BucketWriter.java:53)
> 	at org.apache.flume.sink.hdfs.BucketWriter$1.run(BucketWriter.java:172)
> 	at org.apache.flume.sink.hdfs.BucketWriter$1.run(BucketWriter.java:170)
> 	at org.apache.flume.sink.hdfs.BucketWriter.runPrivileged(BucketWriter.java:143)
> 	at org.apache.flume.sink.hdfs.BucketWriter.open(BucketWriter.java:170)
> 	at org.apache.flume.sink.hdfs.BucketWriter.append(BucketWriter.java:365)
> 	at org.apache.flume.sink.hdfs.HDFSEventSink$2.call(HDFSEventSink.java:730)
> 	at org.apache.flume.sink.hdfs.HDFSEventSink$2.call(HDFSEventSink.java:728)
> 	at java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:303)
> 	at java.util.concurrent.FutureTask.run(FutureTask.java:138)
> 	at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)
> 	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)
> 	at java.lang.Thread.run(Thread.java:662)
> Caused by: java.nio.channels.ClosedByInterruptException
> 	at java.nio.channels.spi.AbstractInterruptibleChannel.end(AbstractInterruptibleChannel.java:184)
> 	at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:511)
> 	at org.apache.hadoop.net.SocketIOWithTimeout.connect(SocketIOWithTimeout.java:193)
> 	at org.apache.hadoop.net.NetUtils.connect(NetUtils.java:523)
> 	at org.apache.hadoop.net.NetUtils.connect(NetUtils.java:488)
> 	at org.apache.hadoop.ipc.Client$Connection.setupConnection(Client.java:476)
> 	at org.apache.hadoop.ipc.Client$Connection.setupIOstreams(Client.java:570)
> 	at org.apache.hadoop.ipc.Client$Connection.access$1700(Client.java:220)
> 	at org.apache.hadoop.ipc.Client.getConnection(Client.java:1213)
> 	at org.apache.hadoop.ipc.Client.call(Client.java:1140)
> 	... 36 more
> {code}
> The relevant code that re-sets the interrupt flag is in http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java?revision=1410024&view=markup
> The method is Client#call(RPC.RpcKind, Writable,ConnectionId). Note that the InterruptedException is caught and the interrupt flag is re-set.
> {code}
>   public Writable call(RPC.RpcKind rpcKind, Writable rpcRequest,
>       ConnectionId remoteId) throws InterruptedException, IOException {
>     Call call = new Call(rpcKind, rpcRequest);
>     Connection connection = getConnection(remoteId, call);
>     connection.sendParam(call);                 // send the parameter
>     boolean interrupted = false;
>     synchronized (call) {
>       while (!call.done) {
>         try {
>           call.wait();                           // wait for the result
>         } catch (InterruptedException ie) {
>           // save the fact that we were interrupted
>           interrupted = true;
>         }
>       }
>       if (interrupted) {
>         // set the interrupt flag now that we are done waiting
>         Thread.currentThread().interrupt();
>       }
>       if (call.error != null) {
>         if (call.error instanceof RemoteException) {
>           call.error.fillInStackTrace();
>           throw call.error;
>         } else { // local exception
>           InetSocketAddress address = connection.getRemoteAddress();
>           throw NetUtils.wrapException(address.getHostName(),
>                   address.getPort(),
>                   NetUtils.getHostname(),
>                   0,
>                   call.error);
>         }
>       } else {
>         return call.getRpcResult();
>       }
>     }
>   }
> {code}

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira

[jira] [Commented] (FLUME-1748) HDFS Sink can end up in a bad state if the Callables are interrupted.

Posted by "Hari Shreedharan (JIRA)" <ji...@apache.org>.
    [ https://issues.apache.org/jira/browse/FLUME-1748?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13507037#comment-13507037 ] 

Hari Shreedharan commented on FLUME-1748:
-----------------------------------------

Seems like a previous Hadoop RPC using the same bucket writer taking too long can cause this issue, because several threads end up being blocked on the same method. This eventually will cause the HDFS sink threads to be interrupted by the future.cancel call -- but they still end up going into the RPC calls only to be hit by a bunch of exceptions.
                
> HDFS Sink can end up in a bad state if the Callables are interrupted.
> ---------------------------------------------------------------------
>
>                 Key: FLUME-1748
>                 URL: https://issues.apache.org/jira/browse/FLUME-1748
>             Project: Flume
>          Issue Type: Bug
>            Reporter: Hari Shreedharan
>
> If one or more HDFS Sink writes/flush/close end up taking too long, the thread is interrupted by the future.cancel(true) call. The Hadoop RPC client re-sets the interrupt flag on a thread which is interrupted. Also if the thread is interrupted after the RPC call, but before the call() method returns, the interrupt flag stays on the thread. A future HDFS file open call would lead to an exception of this sort:
> {code}
> [SinkRunner-PollingRunner-DefaultSinkProcessor] (org.apache.flume.sink.hdfs.HDFSEventSink.process:457)  - HDFS IO error
> java.io.IOException: Failed on local exception: java.nio.channels.ClosedByInterruptException; Host Details : local host is: "random.example.com/10.20.81.108"; destination host is: "random2.example.com":8020; 
> 	at org.apache.hadoop.net.NetUtils.wrapException(NetUtils.java:759)
> 	at org.apache.hadoop.ipc.Client.call(Client.java:1164)
> 	at org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:202)
> 	at $Proxy9.create(Unknown Source)
> 	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> 	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39)
> 	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
> 	at java.lang.reflect.Method.invoke(Method.java:597)
> 	at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:164)
> 	at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:83)
> 	at $Proxy9.create(Unknown Source)
> 	at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.create(ClientNamenodeProtocolTranslatorPB.java:192)
> 	at org.apache.hadoop.hdfs.DFSOutputStream.<init>(DFSOutputStream.java:1298)
> 	at org.apache.hadoop.hdfs.DFSOutputStream.newStreamForCreate(DFSOutputStream.java:1317)
> 	at org.apache.hadoop.hdfs.DFSClient.create(DFSClient.java:1215)
> 	at org.apache.hadoop.hdfs.DFSClient.create(DFSClient.java:1173)
> 	at org.apache.hadoop.hdfs.DistributedFileSystem.create(DistributedFileSystem.java:272)
> 	at org.apache.hadoop.hdfs.DistributedFileSystem.create(DistributedFileSystem.java:261)
> 	at org.apache.hadoop.hdfs.DistributedFileSystem.create(DistributedFileSystem.java:78)
> 	at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:805)
> 	at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:786)
> 	at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:685)
> 	at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:674)
> 	at org.apache.flume.sink.hdfs.HDFSDataStream.open(HDFSDataStream.java:60)
> 	at org.apache.flume.sink.hdfs.BucketWriter.doOpen(BucketWriter.java:210)
> 	at org.apache.flume.sink.hdfs.BucketWriter.access$000(BucketWriter.java:53)
> 	at org.apache.flume.sink.hdfs.BucketWriter$1.run(BucketWriter.java:172)
> 	at org.apache.flume.sink.hdfs.BucketWriter$1.run(BucketWriter.java:170)
> 	at org.apache.flume.sink.hdfs.BucketWriter.runPrivileged(BucketWriter.java:143)
> 	at org.apache.flume.sink.hdfs.BucketWriter.open(BucketWriter.java:170)
> 	at org.apache.flume.sink.hdfs.BucketWriter.append(BucketWriter.java:365)
> 	at org.apache.flume.sink.hdfs.HDFSEventSink$2.call(HDFSEventSink.java:730)
> 	at org.apache.flume.sink.hdfs.HDFSEventSink$2.call(HDFSEventSink.java:728)
> 	at java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:303)
> 	at java.util.concurrent.FutureTask.run(FutureTask.java:138)
> 	at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)
> 	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)
> 	at java.lang.Thread.run(Thread.java:662)
> Caused by: java.nio.channels.ClosedByInterruptException
> 	at java.nio.channels.spi.AbstractInterruptibleChannel.end(AbstractInterruptibleChannel.java:184)
> 	at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:511)
> 	at org.apache.hadoop.net.SocketIOWithTimeout.connect(SocketIOWithTimeout.java:193)
> 	at org.apache.hadoop.net.NetUtils.connect(NetUtils.java:523)
> 	at org.apache.hadoop.net.NetUtils.connect(NetUtils.java:488)
> 	at org.apache.hadoop.ipc.Client$Connection.setupConnection(Client.java:476)
> 	at org.apache.hadoop.ipc.Client$Connection.setupIOstreams(Client.java:570)
> 	at org.apache.hadoop.ipc.Client$Connection.access$1700(Client.java:220)
> 	at org.apache.hadoop.ipc.Client.getConnection(Client.java:1213)
> 	at org.apache.hadoop.ipc.Client.call(Client.java:1140)
> 	... 36 more
> {code}
> The relevant code that re-sets the interrupt flag is in http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java?revision=1410024&view=markup
> The method is Client#call(RPC.RpcKind, Writable,ConnectionId). Note that the InterruptedException is caught and the interrupt flag is re-set.
> {code}
>   public Writable call(RPC.RpcKind rpcKind, Writable rpcRequest,
>       ConnectionId remoteId) throws InterruptedException, IOException {
>     Call call = new Call(rpcKind, rpcRequest);
>     Connection connection = getConnection(remoteId, call);
>     connection.sendParam(call);                 // send the parameter
>     boolean interrupted = false;
>     synchronized (call) {
>       while (!call.done) {
>         try {
>           call.wait();                           // wait for the result
>         } catch (InterruptedException ie) {
>           // save the fact that we were interrupted
>           interrupted = true;
>         }
>       }
>       if (interrupted) {
>         // set the interrupt flag now that we are done waiting
>         Thread.currentThread().interrupt();
>       }
>       if (call.error != null) {
>         if (call.error instanceof RemoteException) {
>           call.error.fillInStackTrace();
>           throw call.error;
>         } else { // local exception
>           InetSocketAddress address = connection.getRemoteAddress();
>           throw NetUtils.wrapException(address.getHostName(),
>                   address.getPort(),
>                   NetUtils.getHostname(),
>                   0,
>                   call.error);
>         }
>       } else {
>         return call.getRpcResult();
>       }
>     }
>   }
> {code}

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira

[jira] [Updated] (FLUME-1748) HDFS Sink should check if the thread is interrupted before performing any HDFS operations

Posted by "Hari Shreedharan (JIRA)" <ji...@apache.org>.
     [ https://issues.apache.org/jira/browse/FLUME-1748?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Hari Shreedharan updated FLUME-1748:
------------------------------------

    Summary: HDFS Sink should check if the thread is interrupted before performing any HDFS operations  (was: HDFS Sink can end up in a bad state if the Callables are interrupted.)
    
> HDFS Sink should check if the thread is interrupted before performing any HDFS operations
> -----------------------------------------------------------------------------------------
>
>                 Key: FLUME-1748
>                 URL: https://issues.apache.org/jira/browse/FLUME-1748
>             Project: Flume
>          Issue Type: Bug
>            Reporter: Hari Shreedharan
>            Assignee: Hari Shreedharan
>         Attachments: FLUME-1748.patch
>
>
> If one or more HDFS Sink writes/flush/close end up taking too long, the thread is interrupted by the future.cancel(true) call. The Hadoop RPC client re-sets the interrupt flag on a thread which is interrupted. Also if the thread is interrupted after the RPC call, but before the call() method returns, the interrupt flag stays on the thread. A future HDFS file open call would lead to an exception of this sort:
> {code}
> [SinkRunner-PollingRunner-DefaultSinkProcessor] (org.apache.flume.sink.hdfs.HDFSEventSink.process:457)  - HDFS IO error
> java.io.IOException: Failed on local exception: java.nio.channels.ClosedByInterruptException; Host Details : local host is: "random.example.com/10.20.81.108"; destination host is: "random2.example.com":8020; 
> 	at org.apache.hadoop.net.NetUtils.wrapException(NetUtils.java:759)
> 	at org.apache.hadoop.ipc.Client.call(Client.java:1164)
> 	at org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:202)
> 	at $Proxy9.create(Unknown Source)
> 	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> 	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39)
> 	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
> 	at java.lang.reflect.Method.invoke(Method.java:597)
> 	at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:164)
> 	at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:83)
> 	at $Proxy9.create(Unknown Source)
> 	at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.create(ClientNamenodeProtocolTranslatorPB.java:192)
> 	at org.apache.hadoop.hdfs.DFSOutputStream.<init>(DFSOutputStream.java:1298)
> 	at org.apache.hadoop.hdfs.DFSOutputStream.newStreamForCreate(DFSOutputStream.java:1317)
> 	at org.apache.hadoop.hdfs.DFSClient.create(DFSClient.java:1215)
> 	at org.apache.hadoop.hdfs.DFSClient.create(DFSClient.java:1173)
> 	at org.apache.hadoop.hdfs.DistributedFileSystem.create(DistributedFileSystem.java:272)
> 	at org.apache.hadoop.hdfs.DistributedFileSystem.create(DistributedFileSystem.java:261)
> 	at org.apache.hadoop.hdfs.DistributedFileSystem.create(DistributedFileSystem.java:78)
> 	at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:805)
> 	at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:786)
> 	at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:685)
> 	at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:674)
> 	at org.apache.flume.sink.hdfs.HDFSDataStream.open(HDFSDataStream.java:60)
> 	at org.apache.flume.sink.hdfs.BucketWriter.doOpen(BucketWriter.java:210)
> 	at org.apache.flume.sink.hdfs.BucketWriter.access$000(BucketWriter.java:53)
> 	at org.apache.flume.sink.hdfs.BucketWriter$1.run(BucketWriter.java:172)
> 	at org.apache.flume.sink.hdfs.BucketWriter$1.run(BucketWriter.java:170)
> 	at org.apache.flume.sink.hdfs.BucketWriter.runPrivileged(BucketWriter.java:143)
> 	at org.apache.flume.sink.hdfs.BucketWriter.open(BucketWriter.java:170)
> 	at org.apache.flume.sink.hdfs.BucketWriter.append(BucketWriter.java:365)
> 	at org.apache.flume.sink.hdfs.HDFSEventSink$2.call(HDFSEventSink.java:730)
> 	at org.apache.flume.sink.hdfs.HDFSEventSink$2.call(HDFSEventSink.java:728)
> 	at java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:303)
> 	at java.util.concurrent.FutureTask.run(FutureTask.java:138)
> 	at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)
> 	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)
> 	at java.lang.Thread.run(Thread.java:662)
> Caused by: java.nio.channels.ClosedByInterruptException
> 	at java.nio.channels.spi.AbstractInterruptibleChannel.end(AbstractInterruptibleChannel.java:184)
> 	at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:511)
> 	at org.apache.hadoop.net.SocketIOWithTimeout.connect(SocketIOWithTimeout.java:193)
> 	at org.apache.hadoop.net.NetUtils.connect(NetUtils.java:523)
> 	at org.apache.hadoop.net.NetUtils.connect(NetUtils.java:488)
> 	at org.apache.hadoop.ipc.Client$Connection.setupConnection(Client.java:476)
> 	at org.apache.hadoop.ipc.Client$Connection.setupIOstreams(Client.java:570)
> 	at org.apache.hadoop.ipc.Client$Connection.access$1700(Client.java:220)
> 	at org.apache.hadoop.ipc.Client.getConnection(Client.java:1213)
> 	at org.apache.hadoop.ipc.Client.call(Client.java:1140)
> 	... 36 more
> {code}
> The relevant code that re-sets the interrupt flag is in http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java?revision=1410024&view=markup
> The method is Client#call(RPC.RpcKind, Writable,ConnectionId). Note that the InterruptedException is caught and the interrupt flag is re-set.
> {code}
>   public Writable call(RPC.RpcKind rpcKind, Writable rpcRequest,
>       ConnectionId remoteId) throws InterruptedException, IOException {
>     Call call = new Call(rpcKind, rpcRequest);
>     Connection connection = getConnection(remoteId, call);
>     connection.sendParam(call);                 // send the parameter
>     boolean interrupted = false;
>     synchronized (call) {
>       while (!call.done) {
>         try {
>           call.wait();                           // wait for the result
>         } catch (InterruptedException ie) {
>           // save the fact that we were interrupted
>           interrupted = true;
>         }
>       }
>       if (interrupted) {
>         // set the interrupt flag now that we are done waiting
>         Thread.currentThread().interrupt();
>       }
>       if (call.error != null) {
>         if (call.error instanceof RemoteException) {
>           call.error.fillInStackTrace();
>           throw call.error;
>         } else { // local exception
>           InetSocketAddress address = connection.getRemoteAddress();
>           throw NetUtils.wrapException(address.getHostName(),
>                   address.getPort(),
>                   NetUtils.getHostname(),
>                   0,
>                   call.error);
>         }
>       } else {
>         return call.getRpcResult();
>       }
>     }
>   }
> {code}

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira

[jira] [Commented] (FLUME-1748) HDFS Sink should check if the thread is interrupted before performing any HDFS operations

Posted by "Hari Shreedharan (JIRA)" <ji...@apache.org>.
    [ https://issues.apache.org/jira/browse/FLUME-1748?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13508123#comment-13508123 ] 

Hari Shreedharan commented on FLUME-1748:
-----------------------------------------

Adding some context:

Because all of the HDFS write operations happen from synchronized calls, they block on the object monitor to write to HDFS. Since Hadoop RPC does not respond to interrupts until the call actually completes, the timeouts from Flume are pretty much useless. So if one of the calls takes way longer than the timeout, the futures for other writes may be cancelled before they acquire the monitor itself - therefore these threads end up having their interrupt flag set. The IO operations usually check the interrupt status and fail the operation if the flag is set (usually throwing a ClosedByInterruptedException or something). 

The patch only tries to throw an exception which makes this clear.
                
> HDFS Sink should check if the thread is interrupted before performing any HDFS operations
> -----------------------------------------------------------------------------------------
>
>                 Key: FLUME-1748
>                 URL: https://issues.apache.org/jira/browse/FLUME-1748
>             Project: Flume
>          Issue Type: Bug
>            Reporter: Hari Shreedharan
>            Assignee: Hari Shreedharan
>         Attachments: FLUME-1748.patch
>
>
> If one or more HDFS Sink writes/flush/close end up taking too long, the thread is interrupted by the future.cancel(true) call. The Hadoop RPC client re-sets the interrupt flag on a thread which is interrupted. Also if the thread is interrupted after the RPC call, but before the call() method returns, the interrupt flag stays on the thread. A future HDFS file open call would lead to an exception of this sort:
> {code}
> [SinkRunner-PollingRunner-DefaultSinkProcessor] (org.apache.flume.sink.hdfs.HDFSEventSink.process:457)  - HDFS IO error
> java.io.IOException: Failed on local exception: java.nio.channels.ClosedByInterruptException; Host Details : local host is: "random.example.com/10.20.81.108"; destination host is: "random2.example.com":8020; 
> 	at org.apache.hadoop.net.NetUtils.wrapException(NetUtils.java:759)
> 	at org.apache.hadoop.ipc.Client.call(Client.java:1164)
> 	at org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:202)
> 	at $Proxy9.create(Unknown Source)
> 	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> 	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39)
> 	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
> 	at java.lang.reflect.Method.invoke(Method.java:597)
> 	at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:164)
> 	at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:83)
> 	at $Proxy9.create(Unknown Source)
> 	at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.create(ClientNamenodeProtocolTranslatorPB.java:192)
> 	at org.apache.hadoop.hdfs.DFSOutputStream.<init>(DFSOutputStream.java:1298)
> 	at org.apache.hadoop.hdfs.DFSOutputStream.newStreamForCreate(DFSOutputStream.java:1317)
> 	at org.apache.hadoop.hdfs.DFSClient.create(DFSClient.java:1215)
> 	at org.apache.hadoop.hdfs.DFSClient.create(DFSClient.java:1173)
> 	at org.apache.hadoop.hdfs.DistributedFileSystem.create(DistributedFileSystem.java:272)
> 	at org.apache.hadoop.hdfs.DistributedFileSystem.create(DistributedFileSystem.java:261)
> 	at org.apache.hadoop.hdfs.DistributedFileSystem.create(DistributedFileSystem.java:78)
> 	at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:805)
> 	at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:786)
> 	at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:685)
> 	at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:674)
> 	at org.apache.flume.sink.hdfs.HDFSDataStream.open(HDFSDataStream.java:60)
> 	at org.apache.flume.sink.hdfs.BucketWriter.doOpen(BucketWriter.java:210)
> 	at org.apache.flume.sink.hdfs.BucketWriter.access$000(BucketWriter.java:53)
> 	at org.apache.flume.sink.hdfs.BucketWriter$1.run(BucketWriter.java:172)
> 	at org.apache.flume.sink.hdfs.BucketWriter$1.run(BucketWriter.java:170)
> 	at org.apache.flume.sink.hdfs.BucketWriter.runPrivileged(BucketWriter.java:143)
> 	at org.apache.flume.sink.hdfs.BucketWriter.open(BucketWriter.java:170)
> 	at org.apache.flume.sink.hdfs.BucketWriter.append(BucketWriter.java:365)
> 	at org.apache.flume.sink.hdfs.HDFSEventSink$2.call(HDFSEventSink.java:730)
> 	at org.apache.flume.sink.hdfs.HDFSEventSink$2.call(HDFSEventSink.java:728)
> 	at java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:303)
> 	at java.util.concurrent.FutureTask.run(FutureTask.java:138)
> 	at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)
> 	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)
> 	at java.lang.Thread.run(Thread.java:662)
> Caused by: java.nio.channels.ClosedByInterruptException
> 	at java.nio.channels.spi.AbstractInterruptibleChannel.end(AbstractInterruptibleChannel.java:184)
> 	at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:511)
> 	at org.apache.hadoop.net.SocketIOWithTimeout.connect(SocketIOWithTimeout.java:193)
> 	at org.apache.hadoop.net.NetUtils.connect(NetUtils.java:523)
> 	at org.apache.hadoop.net.NetUtils.connect(NetUtils.java:488)
> 	at org.apache.hadoop.ipc.Client$Connection.setupConnection(Client.java:476)
> 	at org.apache.hadoop.ipc.Client$Connection.setupIOstreams(Client.java:570)
> 	at org.apache.hadoop.ipc.Client$Connection.access$1700(Client.java:220)
> 	at org.apache.hadoop.ipc.Client.getConnection(Client.java:1213)
> 	at org.apache.hadoop.ipc.Client.call(Client.java:1140)
> 	... 36 more
> {code}
> The relevant code that re-sets the interrupt flag is in http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java?revision=1410024&view=markup
> The method is Client#call(RPC.RpcKind, Writable,ConnectionId). Note that the InterruptedException is caught and the interrupt flag is re-set.
> {code}
>   public Writable call(RPC.RpcKind rpcKind, Writable rpcRequest,
>       ConnectionId remoteId) throws InterruptedException, IOException {
>     Call call = new Call(rpcKind, rpcRequest);
>     Connection connection = getConnection(remoteId, call);
>     connection.sendParam(call);                 // send the parameter
>     boolean interrupted = false;
>     synchronized (call) {
>       while (!call.done) {
>         try {
>           call.wait();                           // wait for the result
>         } catch (InterruptedException ie) {
>           // save the fact that we were interrupted
>           interrupted = true;
>         }
>       }
>       if (interrupted) {
>         // set the interrupt flag now that we are done waiting
>         Thread.currentThread().interrupt();
>       }
>       if (call.error != null) {
>         if (call.error instanceof RemoteException) {
>           call.error.fillInStackTrace();
>           throw call.error;
>         } else { // local exception
>           InetSocketAddress address = connection.getRemoteAddress();
>           throw NetUtils.wrapException(address.getHostName(),
>                   address.getPort(),
>                   NetUtils.getHostname(),
>                   0,
>                   call.error);
>         }
>       } else {
>         return call.getRpcResult();
>       }
>     }
>   }
> {code}

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira

[jira] [Commented] (FLUME-1748) HDFS Sink should check if the thread is interrupted before performing any HDFS operations

Posted by "Hudson (JIRA)" <ji...@apache.org>.
    [ https://issues.apache.org/jira/browse/FLUME-1748?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13508423#comment-13508423 ] 

Hudson commented on FLUME-1748:
-------------------------------

Integrated in flume-trunk #336 (See [https://builds.apache.org/job/flume-trunk/336/])
    FLUME-1748: HDFS Sink should check if the thread is interrupted before performing any HDFS operations (Revision aa549c4f27db848cb8900533fd0f16562d971aa2)

     Result = SUCCESS
brock : http://git-wip-us.apache.org/repos/asf/flume/repo?p=flume.git&a=commit&h=aa549c4f27db848cb8900533fd0f16562d971aa2
Files : 
* flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/BucketWriter.java

                
> HDFS Sink should check if the thread is interrupted before performing any HDFS operations
> -----------------------------------------------------------------------------------------
>
>                 Key: FLUME-1748
>                 URL: https://issues.apache.org/jira/browse/FLUME-1748
>             Project: Flume
>          Issue Type: Bug
>            Reporter: Hari Shreedharan
>            Assignee: Hari Shreedharan
>             Fix For: v1.4.0
>
>         Attachments: FLUME-1748-1.patch, FLUME-1748.patch
>
>
> If one or more HDFS Sink writes/flush/close end up taking too long, the thread is interrupted by the future.cancel(true) call. The Hadoop RPC client re-sets the interrupt flag on a thread which is interrupted. Also if the thread is interrupted after the RPC call, but before the call() method returns, the interrupt flag stays on the thread. A future HDFS file open call would lead to an exception of this sort:
> {code}
> [SinkRunner-PollingRunner-DefaultSinkProcessor] (org.apache.flume.sink.hdfs.HDFSEventSink.process:457)  - HDFS IO error
> java.io.IOException: Failed on local exception: java.nio.channels.ClosedByInterruptException; Host Details : local host is: "random.example.com/10.20.81.108"; destination host is: "random2.example.com":8020; 
> 	at org.apache.hadoop.net.NetUtils.wrapException(NetUtils.java:759)
> 	at org.apache.hadoop.ipc.Client.call(Client.java:1164)
> 	at org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:202)
> 	at $Proxy9.create(Unknown Source)
> 	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> 	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39)
> 	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
> 	at java.lang.reflect.Method.invoke(Method.java:597)
> 	at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:164)
> 	at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:83)
> 	at $Proxy9.create(Unknown Source)
> 	at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.create(ClientNamenodeProtocolTranslatorPB.java:192)
> 	at org.apache.hadoop.hdfs.DFSOutputStream.<init>(DFSOutputStream.java:1298)
> 	at org.apache.hadoop.hdfs.DFSOutputStream.newStreamForCreate(DFSOutputStream.java:1317)
> 	at org.apache.hadoop.hdfs.DFSClient.create(DFSClient.java:1215)
> 	at org.apache.hadoop.hdfs.DFSClient.create(DFSClient.java:1173)
> 	at org.apache.hadoop.hdfs.DistributedFileSystem.create(DistributedFileSystem.java:272)
> 	at org.apache.hadoop.hdfs.DistributedFileSystem.create(DistributedFileSystem.java:261)
> 	at org.apache.hadoop.hdfs.DistributedFileSystem.create(DistributedFileSystem.java:78)
> 	at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:805)
> 	at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:786)
> 	at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:685)
> 	at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:674)
> 	at org.apache.flume.sink.hdfs.HDFSDataStream.open(HDFSDataStream.java:60)
> 	at org.apache.flume.sink.hdfs.BucketWriter.doOpen(BucketWriter.java:210)
> 	at org.apache.flume.sink.hdfs.BucketWriter.access$000(BucketWriter.java:53)
> 	at org.apache.flume.sink.hdfs.BucketWriter$1.run(BucketWriter.java:172)
> 	at org.apache.flume.sink.hdfs.BucketWriter$1.run(BucketWriter.java:170)
> 	at org.apache.flume.sink.hdfs.BucketWriter.runPrivileged(BucketWriter.java:143)
> 	at org.apache.flume.sink.hdfs.BucketWriter.open(BucketWriter.java:170)
> 	at org.apache.flume.sink.hdfs.BucketWriter.append(BucketWriter.java:365)
> 	at org.apache.flume.sink.hdfs.HDFSEventSink$2.call(HDFSEventSink.java:730)
> 	at org.apache.flume.sink.hdfs.HDFSEventSink$2.call(HDFSEventSink.java:728)
> 	at java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:303)
> 	at java.util.concurrent.FutureTask.run(FutureTask.java:138)
> 	at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)
> 	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)
> 	at java.lang.Thread.run(Thread.java:662)
> Caused by: java.nio.channels.ClosedByInterruptException
> 	at java.nio.channels.spi.AbstractInterruptibleChannel.end(AbstractInterruptibleChannel.java:184)
> 	at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:511)
> 	at org.apache.hadoop.net.SocketIOWithTimeout.connect(SocketIOWithTimeout.java:193)
> 	at org.apache.hadoop.net.NetUtils.connect(NetUtils.java:523)
> 	at org.apache.hadoop.net.NetUtils.connect(NetUtils.java:488)
> 	at org.apache.hadoop.ipc.Client$Connection.setupConnection(Client.java:476)
> 	at org.apache.hadoop.ipc.Client$Connection.setupIOstreams(Client.java:570)
> 	at org.apache.hadoop.ipc.Client$Connection.access$1700(Client.java:220)
> 	at org.apache.hadoop.ipc.Client.getConnection(Client.java:1213)
> 	at org.apache.hadoop.ipc.Client.call(Client.java:1140)
> 	... 36 more
> {code}
> The relevant code that re-sets the interrupt flag is in http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java?revision=1410024&view=markup
> The method is Client#call(RPC.RpcKind, Writable,ConnectionId). Note that the InterruptedException is caught and the interrupt flag is re-set.
> {code}
>   public Writable call(RPC.RpcKind rpcKind, Writable rpcRequest,
>       ConnectionId remoteId) throws InterruptedException, IOException {
>     Call call = new Call(rpcKind, rpcRequest);
>     Connection connection = getConnection(remoteId, call);
>     connection.sendParam(call);                 // send the parameter
>     boolean interrupted = false;
>     synchronized (call) {
>       while (!call.done) {
>         try {
>           call.wait();                           // wait for the result
>         } catch (InterruptedException ie) {
>           // save the fact that we were interrupted
>           interrupted = true;
>         }
>       }
>       if (interrupted) {
>         // set the interrupt flag now that we are done waiting
>         Thread.currentThread().interrupt();
>       }
>       if (call.error != null) {
>         if (call.error instanceof RemoteException) {
>           call.error.fillInStackTrace();
>           throw call.error;
>         } else { // local exception
>           InetSocketAddress address = connection.getRemoteAddress();
>           throw NetUtils.wrapException(address.getHostName(),
>                   address.getPort(),
>                   NetUtils.getHostname(),
>                   0,
>                   call.error);
>         }
>       } else {
>         return call.getRpcResult();
>       }
>     }
>   }
> {code}

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira

[jira] [Updated] (FLUME-1748) HDFS Sink should check if the thread is interrupted before performing any HDFS operations

Posted by "Hari Shreedharan (JIRA)" <ji...@apache.org>.
     [ https://issues.apache.org/jira/browse/FLUME-1748?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Hari Shreedharan updated FLUME-1748:
------------------------------------

    Attachment: FLUME-1748-1.patch

Added spaces in the error message.
                
> HDFS Sink should check if the thread is interrupted before performing any HDFS operations
> -----------------------------------------------------------------------------------------
>
>                 Key: FLUME-1748
>                 URL: https://issues.apache.org/jira/browse/FLUME-1748
>             Project: Flume
>          Issue Type: Bug
>            Reporter: Hari Shreedharan
>            Assignee: Hari Shreedharan
>         Attachments: FLUME-1748-1.patch, FLUME-1748.patch
>
>
> If one or more HDFS Sink writes/flush/close end up taking too long, the thread is interrupted by the future.cancel(true) call. The Hadoop RPC client re-sets the interrupt flag on a thread which is interrupted. Also if the thread is interrupted after the RPC call, but before the call() method returns, the interrupt flag stays on the thread. A future HDFS file open call would lead to an exception of this sort:
> {code}
> [SinkRunner-PollingRunner-DefaultSinkProcessor] (org.apache.flume.sink.hdfs.HDFSEventSink.process:457)  - HDFS IO error
> java.io.IOException: Failed on local exception: java.nio.channels.ClosedByInterruptException; Host Details : local host is: "random.example.com/10.20.81.108"; destination host is: "random2.example.com":8020; 
> 	at org.apache.hadoop.net.NetUtils.wrapException(NetUtils.java:759)
> 	at org.apache.hadoop.ipc.Client.call(Client.java:1164)
> 	at org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:202)
> 	at $Proxy9.create(Unknown Source)
> 	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> 	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39)
> 	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
> 	at java.lang.reflect.Method.invoke(Method.java:597)
> 	at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:164)
> 	at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:83)
> 	at $Proxy9.create(Unknown Source)
> 	at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.create(ClientNamenodeProtocolTranslatorPB.java:192)
> 	at org.apache.hadoop.hdfs.DFSOutputStream.<init>(DFSOutputStream.java:1298)
> 	at org.apache.hadoop.hdfs.DFSOutputStream.newStreamForCreate(DFSOutputStream.java:1317)
> 	at org.apache.hadoop.hdfs.DFSClient.create(DFSClient.java:1215)
> 	at org.apache.hadoop.hdfs.DFSClient.create(DFSClient.java:1173)
> 	at org.apache.hadoop.hdfs.DistributedFileSystem.create(DistributedFileSystem.java:272)
> 	at org.apache.hadoop.hdfs.DistributedFileSystem.create(DistributedFileSystem.java:261)
> 	at org.apache.hadoop.hdfs.DistributedFileSystem.create(DistributedFileSystem.java:78)
> 	at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:805)
> 	at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:786)
> 	at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:685)
> 	at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:674)
> 	at org.apache.flume.sink.hdfs.HDFSDataStream.open(HDFSDataStream.java:60)
> 	at org.apache.flume.sink.hdfs.BucketWriter.doOpen(BucketWriter.java:210)
> 	at org.apache.flume.sink.hdfs.BucketWriter.access$000(BucketWriter.java:53)
> 	at org.apache.flume.sink.hdfs.BucketWriter$1.run(BucketWriter.java:172)
> 	at org.apache.flume.sink.hdfs.BucketWriter$1.run(BucketWriter.java:170)
> 	at org.apache.flume.sink.hdfs.BucketWriter.runPrivileged(BucketWriter.java:143)
> 	at org.apache.flume.sink.hdfs.BucketWriter.open(BucketWriter.java:170)
> 	at org.apache.flume.sink.hdfs.BucketWriter.append(BucketWriter.java:365)
> 	at org.apache.flume.sink.hdfs.HDFSEventSink$2.call(HDFSEventSink.java:730)
> 	at org.apache.flume.sink.hdfs.HDFSEventSink$2.call(HDFSEventSink.java:728)
> 	at java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:303)
> 	at java.util.concurrent.FutureTask.run(FutureTask.java:138)
> 	at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)
> 	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)
> 	at java.lang.Thread.run(Thread.java:662)
> Caused by: java.nio.channels.ClosedByInterruptException
> 	at java.nio.channels.spi.AbstractInterruptibleChannel.end(AbstractInterruptibleChannel.java:184)
> 	at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:511)
> 	at org.apache.hadoop.net.SocketIOWithTimeout.connect(SocketIOWithTimeout.java:193)
> 	at org.apache.hadoop.net.NetUtils.connect(NetUtils.java:523)
> 	at org.apache.hadoop.net.NetUtils.connect(NetUtils.java:488)
> 	at org.apache.hadoop.ipc.Client$Connection.setupConnection(Client.java:476)
> 	at org.apache.hadoop.ipc.Client$Connection.setupIOstreams(Client.java:570)
> 	at org.apache.hadoop.ipc.Client$Connection.access$1700(Client.java:220)
> 	at org.apache.hadoop.ipc.Client.getConnection(Client.java:1213)
> 	at org.apache.hadoop.ipc.Client.call(Client.java:1140)
> 	... 36 more
> {code}
> The relevant code that re-sets the interrupt flag is in http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java?revision=1410024&view=markup
> The method is Client#call(RPC.RpcKind, Writable,ConnectionId). Note that the InterruptedException is caught and the interrupt flag is re-set.
> {code}
>   public Writable call(RPC.RpcKind rpcKind, Writable rpcRequest,
>       ConnectionId remoteId) throws InterruptedException, IOException {
>     Call call = new Call(rpcKind, rpcRequest);
>     Connection connection = getConnection(remoteId, call);
>     connection.sendParam(call);                 // send the parameter
>     boolean interrupted = false;
>     synchronized (call) {
>       while (!call.done) {
>         try {
>           call.wait();                           // wait for the result
>         } catch (InterruptedException ie) {
>           // save the fact that we were interrupted
>           interrupted = true;
>         }
>       }
>       if (interrupted) {
>         // set the interrupt flag now that we are done waiting
>         Thread.currentThread().interrupt();
>       }
>       if (call.error != null) {
>         if (call.error instanceof RemoteException) {
>           call.error.fillInStackTrace();
>           throw call.error;
>         } else { // local exception
>           InetSocketAddress address = connection.getRemoteAddress();
>           throw NetUtils.wrapException(address.getHostName(),
>                   address.getPort(),
>                   NetUtils.getHostname(),
>                   0,
>                   call.error);
>         }
>       } else {
>         return call.getRpcResult();
>       }
>     }
>   }
> {code}

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira

[jira] [Commented] (FLUME-1748) HDFS Sink can end up in a bad state if the Callables are interrupted.

Posted by "Brock Noland (JIRA)" <ji...@apache.org>.
    [ https://issues.apache.org/jira/browse/FLUME-1748?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13508083#comment-13508083 ] 

Brock Noland commented on FLUME-1748:
-------------------------------------

The error messages are missing spaces.
                
> HDFS Sink can end up in a bad state if the Callables are interrupted.
> ---------------------------------------------------------------------
>
>                 Key: FLUME-1748
>                 URL: https://issues.apache.org/jira/browse/FLUME-1748
>             Project: Flume
>          Issue Type: Bug
>            Reporter: Hari Shreedharan
>            Assignee: Hari Shreedharan
>         Attachments: FLUME-1748.patch
>
>
> If one or more HDFS Sink writes/flush/close end up taking too long, the thread is interrupted by the future.cancel(true) call. The Hadoop RPC client re-sets the interrupt flag on a thread which is interrupted. Also if the thread is interrupted after the RPC call, but before the call() method returns, the interrupt flag stays on the thread. A future HDFS file open call would lead to an exception of this sort:
> {code}
> [SinkRunner-PollingRunner-DefaultSinkProcessor] (org.apache.flume.sink.hdfs.HDFSEventSink.process:457)  - HDFS IO error
> java.io.IOException: Failed on local exception: java.nio.channels.ClosedByInterruptException; Host Details : local host is: "random.example.com/10.20.81.108"; destination host is: "random2.example.com":8020; 
> 	at org.apache.hadoop.net.NetUtils.wrapException(NetUtils.java:759)
> 	at org.apache.hadoop.ipc.Client.call(Client.java:1164)
> 	at org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:202)
> 	at $Proxy9.create(Unknown Source)
> 	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> 	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39)
> 	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
> 	at java.lang.reflect.Method.invoke(Method.java:597)
> 	at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:164)
> 	at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:83)
> 	at $Proxy9.create(Unknown Source)
> 	at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.create(ClientNamenodeProtocolTranslatorPB.java:192)
> 	at org.apache.hadoop.hdfs.DFSOutputStream.<init>(DFSOutputStream.java:1298)
> 	at org.apache.hadoop.hdfs.DFSOutputStream.newStreamForCreate(DFSOutputStream.java:1317)
> 	at org.apache.hadoop.hdfs.DFSClient.create(DFSClient.java:1215)
> 	at org.apache.hadoop.hdfs.DFSClient.create(DFSClient.java:1173)
> 	at org.apache.hadoop.hdfs.DistributedFileSystem.create(DistributedFileSystem.java:272)
> 	at org.apache.hadoop.hdfs.DistributedFileSystem.create(DistributedFileSystem.java:261)
> 	at org.apache.hadoop.hdfs.DistributedFileSystem.create(DistributedFileSystem.java:78)
> 	at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:805)
> 	at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:786)
> 	at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:685)
> 	at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:674)
> 	at org.apache.flume.sink.hdfs.HDFSDataStream.open(HDFSDataStream.java:60)
> 	at org.apache.flume.sink.hdfs.BucketWriter.doOpen(BucketWriter.java:210)
> 	at org.apache.flume.sink.hdfs.BucketWriter.access$000(BucketWriter.java:53)
> 	at org.apache.flume.sink.hdfs.BucketWriter$1.run(BucketWriter.java:172)
> 	at org.apache.flume.sink.hdfs.BucketWriter$1.run(BucketWriter.java:170)
> 	at org.apache.flume.sink.hdfs.BucketWriter.runPrivileged(BucketWriter.java:143)
> 	at org.apache.flume.sink.hdfs.BucketWriter.open(BucketWriter.java:170)
> 	at org.apache.flume.sink.hdfs.BucketWriter.append(BucketWriter.java:365)
> 	at org.apache.flume.sink.hdfs.HDFSEventSink$2.call(HDFSEventSink.java:730)
> 	at org.apache.flume.sink.hdfs.HDFSEventSink$2.call(HDFSEventSink.java:728)
> 	at java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:303)
> 	at java.util.concurrent.FutureTask.run(FutureTask.java:138)
> 	at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)
> 	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)
> 	at java.lang.Thread.run(Thread.java:662)
> Caused by: java.nio.channels.ClosedByInterruptException
> 	at java.nio.channels.spi.AbstractInterruptibleChannel.end(AbstractInterruptibleChannel.java:184)
> 	at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:511)
> 	at org.apache.hadoop.net.SocketIOWithTimeout.connect(SocketIOWithTimeout.java:193)
> 	at org.apache.hadoop.net.NetUtils.connect(NetUtils.java:523)
> 	at org.apache.hadoop.net.NetUtils.connect(NetUtils.java:488)
> 	at org.apache.hadoop.ipc.Client$Connection.setupConnection(Client.java:476)
> 	at org.apache.hadoop.ipc.Client$Connection.setupIOstreams(Client.java:570)
> 	at org.apache.hadoop.ipc.Client$Connection.access$1700(Client.java:220)
> 	at org.apache.hadoop.ipc.Client.getConnection(Client.java:1213)
> 	at org.apache.hadoop.ipc.Client.call(Client.java:1140)
> 	... 36 more
> {code}
> The relevant code that re-sets the interrupt flag is in http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java?revision=1410024&view=markup
> The method is Client#call(RPC.RpcKind, Writable,ConnectionId). Note that the InterruptedException is caught and the interrupt flag is re-set.
> {code}
>   public Writable call(RPC.RpcKind rpcKind, Writable rpcRequest,
>       ConnectionId remoteId) throws InterruptedException, IOException {
>     Call call = new Call(rpcKind, rpcRequest);
>     Connection connection = getConnection(remoteId, call);
>     connection.sendParam(call);                 // send the parameter
>     boolean interrupted = false;
>     synchronized (call) {
>       while (!call.done) {
>         try {
>           call.wait();                           // wait for the result
>         } catch (InterruptedException ie) {
>           // save the fact that we were interrupted
>           interrupted = true;
>         }
>       }
>       if (interrupted) {
>         // set the interrupt flag now that we are done waiting
>         Thread.currentThread().interrupt();
>       }
>       if (call.error != null) {
>         if (call.error instanceof RemoteException) {
>           call.error.fillInStackTrace();
>           throw call.error;
>         } else { // local exception
>           InetSocketAddress address = connection.getRemoteAddress();
>           throw NetUtils.wrapException(address.getHostName(),
>                   address.getPort(),
>                   NetUtils.getHostname(),
>                   0,
>                   call.error);
>         }
>       } else {
>         return call.getRpcResult();
>       }
>     }
>   }
> {code}

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira