You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flume.apache.org by Ashish Tadose <as...@pubmatic.com> on 2013/05/24 12:38:49 UTC

HDFS Sink stops writing events because HDFSWriter failed to append and close a file

Hi All,

We are facing this issue in production flume setup.

Issue initiates when HDFS sink BucketWriter fails to append a batch for 
a file because of hadoop datanode issue.
Then it tries to close that file but even close() file throws a 
exception and HDFS sink does not remove that bucketwriter instance. 
[Cause may be no. of bucketwriter instances are below *maxOpenFiles*].

Whats creating more problem is that HDFS sink is not letting go off that 
bucketwriter instance and keeps trying to append/close to that file 
infinitely which cause it to disrupt the event processing towards to HDFS.

*Logs from flume agent **
*
2013-05-20 02:32:40,669 (ResponseProcessor for block 
blk_8857674139042547711_2143611) [WARN - 
org.apache.hadoop.hdfs.DFSClient$DFSOutputStream$ResponseProcessor.run(DFSClient.java:3015)] 
DFSOutputStream ResponseProcessor exception  for block 
blk_8857674139042547711_2143611java.net.SocketTimeoutException: 69000 
millis timeout while waiting for channel to be ready for read. ch : 
java.nio.channels.SocketChannel[connected local=/<datanode_ip>:41129 
remote=/<datanode2_ip>:60010]
         at 
org.apache.hadoop.net.SocketIOWithTimeout.doIO(SocketIOWithTimeout.java:164)
         at 
org.apache.hadoop.net.SocketInputStream.read(SocketInputStream.java:155)
         at 
org.apache.hadoop.net.SocketInputStream.read(SocketInputStream.java:128)
         at java.io.DataInputStream.readFully(DataInputStream.java:178)
         at java.io.DataInputStream.readLong(DataInputStream.java:399)
         at 
org.apache.hadoop.hdfs.protocol.DataTransferProtocol$PipelineAck.readFields(DataTransferProtocol.java:124)
         at 
org.apache.hadoop.hdfs.DFSClient$DFSOutputStream$ResponseProcessor.run(DFSClient.java:2967)

2013-05-20 02:32:56,321 (DataStreamer for file 
/flume/data/Flume_raw_1_.1368960029025.lzo.tmp block 
blk_8857674139042547711_2143611) [WARN - 
org.apache.hadoop.hdfs.DFSClient$DFSOutputStream.processDatanodeError(DFSClient.java:3051)] 
*Error Recovery for block blk_8857674139042547711_2143611 bad 
datanode[0] <datanode2_ip>:60010*
2013-05-20 02:32:56,322 (DataStreamer for file 
/flume/data/Flume_raw_1_.1368960029025.lzo.tmp block 
blk_8857674139042547711_2143611) [WARN - 
org.apache.hadoop.hdfs.DFSClient$DFSOutputStream.processDatanodeError(DFSClient.java:3102)] 
Error Recovery for block blk_8857674139042547711_2143611 in pipeline 
<datanode2_ip>:60010, <datanode3_ip>:60010, <datanode4_ip>:60010: bad 
datanode <datanode2_ip>:60010
2013-05-20 02:32:56,538 (Log-BackgroundWorker-fileChannel) [INFO - 
org.apache.flume.channel.file.EventQueueBackingStoreFile.checkpoint(EventQueueBackingStoreFile.java:109)] 
Start checkpoint for 
/home/flume/flume_channel/checkpointDir15/checkpoint, elements to sync = 
42000
2013-05-20 02:32:56,634 (Log-BackgroundWorker-fileChannel) [INFO - 
org.apache.flume.channel.file.EventQueueBackingStoreFile.checkpoint(EventQueueBackingStoreFile.java:117)] 
Updating checkpoint metadata: logWriteOrderID: 1370969225649, queueSize: 
140156, queueHead: 72872626
2013-05-20 02:32:56,722 (Log-BackgroundWorker-fileChannel) [INFO - 
org.apache.flume.channel.file.LogFileV3$MetaDataWriter.markCheckpoint(LogFileV3.java:85)] 
Updating log-543.meta currentPosition = 743847065, logWriteOrderID = 
1370969225649
2013-05-20 02:32:56,759 (Log-BackgroundWorker-fileChannel) [INFO - 
org.apache.flume.channel.file.Log.writeCheckpoint(Log.java:886)] Updated 
checkpoint for file: /home/flume/flume_channel/dataDir15/log-543 
position: 743847065 logWriteOrderID: 1370969225649
2013-05-20 02:33:56,331 (DataStreamer for file 
/flume/data/Flume_raw_1_.1368960029025.lzo.tmp block 
blk_8857674139042547711_2143611) [WARN - 
org.apache.hadoop.hdfs.DFSClient$DFSOutputStream.processDatanodeError(DFSClient.java:3123)] 
Failed recovery attempt #0 from primary datanode <datanode3_ip>:60010
java.net.SocketTimeoutException: Call to /<datanode3_ip>:60021 failed on 
socket timeout exception: java.net.SocketTimeoutException: 60000 millis 
timeout while waiting for channel to be ready for read. ch : 
java.nio.channels.SocketChannel[connected local=/<datanode_ip>:41526 
remote=/<datanode3_ip>:60021]
..
..
..
2013-05-20 02:36:00,249 (DataStreamer for file 
/flume/data/Flume_raw_1_.1368960029025.lzo.tmp block 
blk_8857674139042547711_2143611) [WARN - 
org.apache.hadoop.hdfs.DFSClient$DFSOutputStream.processDatanodeError(DFSClient.java:3123)] 
Failed recovery attempt #0 from primary datanode <datanode2_ip>:60010
org.apache.hadoop.ipc.RemoteException: 
org.apache.hadoop.ipc.RemoteException: java.io.IOException: 
blk_8857674139042547711_2143611 is already commited, storedBlock == null.
         at 
org.apache.hadoop.hdfs.server.namenode.FSNamesystem.nextGenerationStampForBlock(FSNamesystem.java:5388)
         at 
org.apache.hadoop.hdfs.server.namenode.NameNode.nextGenerationStamp(NameNode.java:748)
         at sun.reflect.GeneratedMethodAccessor24.invoke(Unknown Source)
         at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
         at java.lang.reflect.Method.invoke(Method.java:597)
         at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:563)
         at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:1388)
         at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:1384)
         at java.security.AccessController.doPrivileged(Native Method)
         at javax.security.auth.Subject.doAs(Subject.java:396)
         at 
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1121)
         at org.apache.hadoop.ipc.Server$Handler.run(Server.java:1382)

         at org.apache.hadoop.ipc.Client.call(Client.java:1070)
         at org.apache.hadoop.ipc.RPC$Invoker.invoke(RPC.java:225)
         at $Proxy5.nextGenerationStamp(Unknown Source)
         at 
org.apache.hadoop.hdfs.server.datanode.DataNode.syncBlock(DataNode.java:1999)
         at 
org.apache.hadoop.hdfs.server.datanode.DataNode.recoverBlock(DataNode.java:1967)
         at 
org.apache.hadoop.hdfs.server.datanode.DataNode.recoverBlock(DataNode.java:2047)
         at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
         at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39)
         at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
         at java.lang.reflect.Method.invoke(Method.java:597)
         at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:563)
         at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:1388)
         at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:1384)
         at java.security.AccessController.doPrivileged(Native Method)
         at javax.security.auth.Subject.doAs(Subject.java:396)
         at 
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1121)
2013-05-20 02:36:00,249 (DataStreamer for file 
/flume/data/Flume_raw_1_.1368960029025.lzo.tmp block 
blk_8857674139042547711_2143611) [WARN - 
org.apache.hadoop.hdfs.DFSClient$DFSOutputStream.processDatanodeError(DFSClient.java:3123)] 
Failed recovery attempt #0 from primary datanode <datanode2_ip>:60010
org.apache.hadoop.ipc.RemoteException: 
org.apache.hadoop.ipc.RemoteException: java.io.IOException: 
blk_8857674139042547711_2143611 is already commited, storedBlock == null.
         at 
org.apache.hadoop.hdfs.server.namenode.FSNamesystem.nextGenerationStampForBlock(FSNamesystem.java:5388)
         at 
org.apache.hadoop.hdfs.server.namenode.NameNode.nextGenerationStamp(NameNode.java:748)
         at sun.reflect.GeneratedMethodAccessor24.invoke(Unknown Source)
         at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
         at java.lang.reflect.Method.invoke(Method.java:597)
         at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:563)
         at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:1388)
         at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:1384)
         at java.security.AccessController.doPrivileged(Native Method)
         at javax.security.auth.Subject.doAs(Subject.java:396)
         at 
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1121)
         at org.apache.hadoop.ipc.Server$Handler.run(Server.java:1382)

         at org.apache.hadoop.ipc.Client.call(Client.java:1070)
         at org.apache.hadoop.ipc.RPC$Invoker.invoke(RPC.java:225)
         at $Proxy5.nextGenerationStamp(Unknown Source)
         at 
org.apache.hadoop.hdfs.server.datanode.DataNode.syncBlock(DataNode.java:1999)
         at 
org.apache.hadoop.hdfs.server.datanode.DataNode.recoverBlock(DataNode.java:1967)
         at 
org.apache.hadoop.hdfs.server.datanode.DataNode.recoverBlock(DataNode.java:2047)
         at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
         at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39)
         at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
         at java.lang.reflect.Method.invoke(Method.java:597)
         at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:563)
         at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:1388)
         at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:1384)
         at java.security.AccessController.doPrivileged(Native Method)
         at javax.security.auth.Subject.doAs(Subject.java:396)
         at 
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1121)
        at org.apache.hadoop.ipc.Server$Handler.run(Server.java:1382)

         at org.apache.hadoop.ipc.Client.call(Client.java:1070)
         at org.apache.hadoop.ipc.RPC$Invoker.invoke(RPC.java:225)
         at $Proxy7.recoverBlock(Unknown Source)
         at 
org.apache.hadoop.hdfs.DFSClient$DFSOutputStream.processDatanodeError(DFSClient.java:3121)
         at 
org.apache.hadoop.hdfs.DFSClient$DFSOutputStream.access$2100(DFSClient.java:2589)
         at 
org.apache.hadoop.hdfs.DFSClient$DFSOutputStream$DataStreamer.run(DFSClient.java:2793)
2013-05-20 02:36:00,249 (DataStreamer for file 
/flume/data/Flume_raw_1_.1368960029025.lzo.tmp block 
blk_8857674139042547711_2143611) [WARN - 
org.apache.hadoop.hdfs.DFSClient$DFSOutputStream.processDatanodeError(DFSClient.java:3162)] 
Error Recovery for block blk_8857674139042547711_2143611 failed because 
recovery from primary datanode <datanode2_ip>:60010 failed 1 times.  
Pipeline was <datanode2_ip>:60010. Will retry...
..
..
..
2013-05-20 02:36:16,933 (hdfs-hdfsSink1-call-runner-8) [WARN - 
org.apache.flume.sink.hdfs.BucketWriter.*append*(BucketWriter.java:378)] 
Caught IOException writing to HDFSWriter (write beyond end of stream). 
*Closing file 
(hdfs://<namenode_ip>:64310/flume/data/Flume_raw_1_.1368960029025.lzo.tmp) 
and rethrowing exception.*
2013-05-20 02:36:16,934 (hdfs-hdfsSink1-call-runner-8) [WARN - 
org.apache.flume.sink.hdfs.BucketWriter.append(BucketWriter.java:384)] 
Caught IOException while closing file 
(hdfs://<namenode_ip>:64310/flume/data/Flume_raw_1_.1368960029025.lzo.tmp). 
*Exception follows.**
**java.io.IOException: DFSOutputStream is closed*
         at 
org.apache.hadoop.hdfs.DFSClient$DFSOutputStream.sync(DFSClient.java:3669)
         at 
org.apache.hadoop.fs.FSDataOutputStream.sync(FSDataOutputStream.java:97)
         at 
org.apache.flume.sink.hdfs.HDFSCompressedDataStream.sync(HDFSCompressedDataStream.java:96)
         at 
org.apache.flume.sink.hdfs.BucketWriter.doFlush(BucketWriter.java:345)
         at 
org.apache.flume.sink.hdfs.BucketWriter.access$500(BucketWriter.java:53)
         at 
org.apache.flume.sink.hdfs.BucketWriter$4.run(BucketWriter.java:310)
         at 
org.apache.flume.sink.hdfs.BucketWriter$4.run(BucketWriter.java:308)
         at 
org.apache.flume.sink.hdfs.BucketWriter.runPrivileged(BucketWriter.java:143)
         at 
org.apache.flume.sink.hdfs.BucketWriter.flush(BucketWriter.java:308)
         at 
org.apache.flume.sink.hdfs.BucketWriter.close(BucketWriter.java:257)
         at 
org.apache.flume.sink.hdfs.BucketWriter.append(BucketWriter.java:382)
         at 
org.apache.flume.sink.hdfs.HDFSEventSink$2.call(HDFSEventSink.java:729)
         at 
org.apache.flume.sink.hdfs.HDFSEventSink$2.call(HDFSEventSink.java:727)
         at 
java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:303)
         at java.util.concurrent.FutureTask.run(FutureTask.java:138)
         at 
java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)
         at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)
         at java.lang.Thread.run(Thread.java:662)
2013-05-20 02:36:16,934 (SinkRunner-PollingRunner-DefaultSinkProcessor) 
[WARN - 
org.apache.flume.sink.hdfs.HDFSEventSink.process(HDFSEventSink.java:456)] HDFS 
IO error
java.io.IOException: write beyond end of stream
         at 
com.hadoop.compression.lzo.LzopOutputStream.write(LzopOutputStream.java:127)
         at java.io.OutputStream.write(OutputStream.java:58)
         at 
org.apache.flume.sink.hdfs.HDFSCompressedDataStream.append(HDFSCompressedDataStream.java:81)
         at 
org.apache.flume.sink.hdfs.BucketWriter.append(BucketWriter.java:376)
         at 
org.apache.flume.sink.hdfs.HDFSEventSink$2.call(HDFSEventSink.java:729)
         at 
org.apache.flume.sink.hdfs.HDFSEventSink$2.call(HDFSEventSink.java:727)
         at 
java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:303)
        at java.util.concurrent.FutureTask.run(FutureTask.java:138)
         at 
java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)
         at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)
         at java.lang.Thread.run(Thread.java:662)
2013-05-20 02:36:16,934 (SinkRunner-PollingRunner-DefaultSinkProcessor) 
[WARN - 
org.apache.flume.sink.hdfs.HDFSEventSink.process(HDFSEventSink.java:456)]*HDFS 
IO error**
**java.io.IOException: write beyond end of stream*
         at 
com.hadoop.compression.lzo.LzopOutputStream.write(LzopOutputStream.java:127)
         at java.io.OutputStream.write(OutputStream.java:58)
         at 
org.apache.flume.sink.hdfs.HDFSCompressedDataStream.append(HDFSCompressedDataStream.java:81)
         at 
org.apache.flume.sink.hdfs.BucketWriter.append(BucketWriter.java:376)
         at 
org.apache.flume.sink.hdfs.HDFSEventSink$2.call(HDFSEventSink.java:729)
         at 
org.apache.flume.sink.hdfs.HDFSEventSink$2.call(HDFSEventSink.java:727)
         at 
java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:303)
         at java.util.concurrent.FutureTask.run(FutureTask.java:138)
         at 
java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)
         at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)
         at java.lang.Thread.run(Thread.java:662)
2013-05-20 02:36:20,988 (hdfs-hdfsSink1-call-runner-7) [WARN - 
org.apache.flume.sink.hdfs.BucketWriter.append(BucketWriter.java:378)]*Caught 
IOException writing to HDFSWriter (Error Recovery for block 
blk_8857674139042547711_2143611 failed  because recovery from primary 
datanode <datanode2_ip>:60010 failed 6 times. Pipeline was 
<datanode2_ip>:60010. Aborting...). Closing file 
(hdfs://<namenode_ip>:64310/flume/data/Flume_raw_1_.1368960029025.lzo.tmp) 
and rethrowing exception.*
..
..
..
then it keep repeating these logs
*
**Flume agent HDFSsink properties*

agent.sinks.hdfsSink1.type = hdfs
agent.sinks.hdfsSink1.hdfs.path=hdfs://<namenode_ip>:64310/flume/tracker_data
#agent.sinks.hdfsSink1.hdfs.fileType =DataStream
agent.sinks.hdfsSink1.hdfs.fileType =CompressedStream
agent.sinks.hdfsSink1.hdfs.filePrefix=Flume_raw_1_
agent.sinks.hdfsSink1.hdfs.rollSize=0
agent.sinks.hdfsSink1.hdfs.codeC=LZopCodec
agent.sinks.hdfsSink1.hdfs.rollCount=0
agent.sinks.hdfsSink1.hdfs.batchSize=5000
agent.sinks.hdfsSink1.hdfs.rollInterval=60
agent.sinks.hdfsSink1.channel= fileChannel
agent.sinks.hdfsSink1.hdfs.callTimeout = 240000
agent.sinks.hdfsSink1.hdfs.idleTimeout = 300

Thanks,
Ashish


Re: HDFS Sink stops writing events because HDFSWriter failed to append and close a file

Posted by Ashish Tadose <as...@pubmatic.com>.
Hi Jeff,

We are using flume-ng version 1.3.

-Ashish


On Wednesday 29 May 2013 03:08 AM, Jeff Lord wrote:
> Hi Ashish,
>
> What version of flume are you running?
>
> flume-ng version
>
> -Jeff
>
>
> On Fri, May 24, 2013 at 3:38 AM, Ashish Tadose 
> <ashish.tadose@pubmatic.com <ma...@pubmatic.com>> wrote:
>
>     Hi All,
>
>     We are facing this issue in production flume setup.
>
>     Issue initiates when HDFS sink BucketWriter fails to append a
>     batch for a file because of hadoop datanode issue.
>     Then it tries to close that file but even close() file throws a
>     exception and HDFS sink does not remove that bucketwriter
>     instance. [Cause may be no. of bucketwriter instances are below
>     *maxOpenFiles*].
>
>     Whats creating more problem is that HDFS sink is not letting go
>     off that bucketwriter instance and keeps trying to append/close to
>     that file infinitely which cause it to disrupt the event
>     processing towards to HDFS.
>
>     *Logs from flume agent **
>     *
>     2013-05-20 02:32:40,669 (ResponseProcessor for block
>     blk_8857674139042547711_2143611) [WARN -
>     org.apache.hadoop.hdfs.DFSClient$DFSOutputStream$ResponseProcessor.run(DFSClient.java:3015)]
>     DFSOutputStream ResponseProcessor exception  for block
>     blk_8857674139042547711_2143611java.net.SocketTimeoutException:
>     69000 millis timeout while waiting for channel to be ready for
>     read. ch : java.nio.channels.SocketChannel[connected
>     local=/<datanode_ip>:41129 remote=/<datanode2_ip>:60010]
>             at
>     org.apache.hadoop.net.SocketIOWithTimeout.doIO(SocketIOWithTimeout.java:164)
>             at
>     org.apache.hadoop.net.SocketInputStream.read(SocketInputStream.java:155)
>             at
>     org.apache.hadoop.net.SocketInputStream.read(SocketInputStream.java:128)
>             at java.io.DataInputStream.readFully(DataInputStream.java:178)
>             at java.io.DataInputStream.readLong(DataInputStream.java:399)
>             at
>     org.apache.hadoop.hdfs.protocol.DataTransferProtocol$PipelineAck.readFields(DataTransferProtocol.java:124)
>             at
>     org.apache.hadoop.hdfs.DFSClient$DFSOutputStream$ResponseProcessor.run(DFSClient.java:2967)
>
>     2013-05-20 02:32:56,321 (DataStreamer for file
>     /flume/data/Flume_raw_1_.1368960029025.lzo.tmp block
>     blk_8857674139042547711_2143611) [WARN -
>     org.apache.hadoop.hdfs.DFSClient$DFSOutputStream.processDatanodeError(DFSClient.java:3051)]
>     *Error Recovery for block blk_8857674139042547711_2143611 bad
>     datanode[0] <datanode2_ip>:60010*
>     2013-05-20 02:32:56,322 (DataStreamer for file
>     /flume/data/Flume_raw_1_.1368960029025.lzo.tmp block
>     blk_8857674139042547711_2143611) [WARN -
>     org.apache.hadoop.hdfs.DFSClient$DFSOutputStream.processDatanodeError(DFSClient.java:3102)]
>     Error Recovery for block blk_8857674139042547711_2143611 in
>     pipeline <datanode2_ip>:60010, <datanode3_ip>:60010,
>     <datanode4_ip>:60010: bad datanode <datanode2_ip>:60010
>     2013-05-20 02:32:56,538 (Log-BackgroundWorker-fileChannel) [INFO -
>     org.apache.flume.channel.file.EventQueueBackingStoreFile.checkpoint(EventQueueBackingStoreFile.java:109)]
>     Start checkpoint for
>     /home/flume/flume_channel/checkpointDir15/checkpoint, elements to
>     sync = 42000
>     2013-05-20 02:32:56,634 (Log-BackgroundWorker-fileChannel) [INFO -
>     org.apache.flume.channel.file.EventQueueBackingStoreFile.checkpoint(EventQueueBackingStoreFile.java:117)]
>     Updating checkpoint metadata: logWriteOrderID: 1370969225649,
>     queueSize: 140156, queueHead: 72872626
>     2013-05-20 02:32:56,722 (Log-BackgroundWorker-fileChannel) [INFO -
>     org.apache.flume.channel.file.LogFileV3$MetaDataWriter.markCheckpoint(LogFileV3.java:85)]
>     Updating log-543.meta currentPosition = 743847065, logWriteOrderID
>     = 1370969225649
>     2013-05-20 02:32:56,759 (Log-BackgroundWorker-fileChannel) [INFO -
>     org.apache.flume.channel.file.Log.writeCheckpoint(Log.java:886)]
>     Updated checkpoint for file:
>     /home/flume/flume_channel/dataDir15/log-543 position: 743847065
>     logWriteOrderID: 1370969225649
>     2013-05-20 02:33:56,331 (DataStreamer for file
>     /flume/data/Flume_raw_1_.1368960029025.lzo.tmp block
>     blk_8857674139042547711_2143611) [WARN -
>     org.apache.hadoop.hdfs.DFSClient$DFSOutputStream.processDatanodeError(DFSClient.java:3123)]
>     Failed recovery attempt #0 from primary datanode <datanode3_ip>:60010
>     java.net.SocketTimeoutException: Call to /<datanode3_ip>:60021
>     failed on socket timeout exception:
>     java.net.SocketTimeoutException: 60000 millis timeout while
>     waiting for channel to be ready for read. ch :
>     java.nio.channels.SocketChannel[connected
>     local=/<datanode_ip>:41526 remote=/<datanode3_ip>:60021]
>     ..
>     ..
>     ..
>     2013-05-20 02:36:00,249 (DataStreamer for file
>     /flume/data/Flume_raw_1_.1368960029025.lzo.tmp block
>     blk_8857674139042547711_2143611) [WARN -
>     org.apache.hadoop.hdfs.DFSClient$DFSOutputStream.processDatanodeError(DFSClient.java:3123)]
>     Failed recovery attempt #0 from primary datanode <datanode2_ip>:60010
>     org.apache.hadoop.ipc.RemoteException:
>     org.apache.hadoop.ipc.RemoteException: java.io.IOException:
>     blk_8857674139042547711_2143611 is already commited, storedBlock
>     == null.
>             at
>     org.apache.hadoop.hdfs.server.namenode.FSNamesystem.nextGenerationStampForBlock(FSNamesystem.java:5388)
>             at
>     org.apache.hadoop.hdfs.server.namenode.NameNode.nextGenerationStamp(NameNode.java:748)
>             at sun.reflect.GeneratedMethodAccessor24.invoke(Unknown
>     Source)
>             at
>     sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
>             at java.lang.reflect.Method.invoke(Method.java:597)
>             at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:563)
>             at
>     org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:1388)
>             at
>     org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:1384)
>             at java.security.AccessController.doPrivileged(Native Method)
>             at javax.security.auth.Subject.doAs(Subject.java:396)
>             at
>     org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1121)
>             at org.apache.hadoop.ipc.Server$Handler.run(Server.java:1382)
>
>             at org.apache.hadoop.ipc.Client.call(Client.java:1070)
>             at org.apache.hadoop.ipc.RPC$Invoker.invoke(RPC.java:225)
>             at $Proxy5.nextGenerationStamp(Unknown Source)
>             at
>     org.apache.hadoop.hdfs.server.datanode.DataNode.syncBlock(DataNode.java:1999)
>             at
>     org.apache.hadoop.hdfs.server.datanode.DataNode.recoverBlock(DataNode.java:1967)
>             at
>     org.apache.hadoop.hdfs.server.datanode.DataNode.recoverBlock(DataNode.java:2047)
>             at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>             at
>     sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39)
>             at
>     sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
>             at java.lang.reflect.Method.invoke(Method.java:597)
>             at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:563)
>             at
>     org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:1388)
>             at
>     org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:1384)
>             at java.security.AccessController.doPrivileged(Native Method)
>             at javax.security.auth.Subject.doAs(Subject.java:396)
>             at
>     org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1121)
>     2013-05-20 02:36:00,249 (DataStreamer for file
>     /flume/data/Flume_raw_1_.1368960029025.lzo.tmp block
>     blk_8857674139042547711_2143611) [WARN -
>     org.apache.hadoop.hdfs.DFSClient$DFSOutputStream.processDatanodeError(DFSClient.java:3123)]
>     Failed recovery attempt #0 from primary datanode <datanode2_ip>:60010
>     org.apache.hadoop.ipc.RemoteException:
>     org.apache.hadoop.ipc.RemoteException: java.io.IOException:
>     blk_8857674139042547711_2143611 is already commited, storedBlock
>     == null.
>             at
>     org.apache.hadoop.hdfs.server.namenode.FSNamesystem.nextGenerationStampForBlock(FSNamesystem.java:5388)
>             at
>     org.apache.hadoop.hdfs.server.namenode.NameNode.nextGenerationStamp(NameNode.java:748)
>             at sun.reflect.GeneratedMethodAccessor24.invoke(Unknown
>     Source)
>             at
>     sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
>             at java.lang.reflect.Method.invoke(Method.java:597)
>             at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:563)
>             at
>     org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:1388)
>             at
>     org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:1384)
>             at java.security.AccessController.doPrivileged(Native Method)
>             at javax.security.auth.Subject.doAs(Subject.java:396)
>             at
>     org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1121)
>             at org.apache.hadoop.ipc.Server$Handler.run(Server.java:1382)
>
>             at org.apache.hadoop.ipc.Client.call(Client.java:1070)
>             at org.apache.hadoop.ipc.RPC$Invoker.invoke(RPC.java:225)
>             at $Proxy5.nextGenerationStamp(Unknown Source)
>             at
>     org.apache.hadoop.hdfs.server.datanode.DataNode.syncBlock(DataNode.java:1999)
>             at
>     org.apache.hadoop.hdfs.server.datanode.DataNode.recoverBlock(DataNode.java:1967)
>             at
>     org.apache.hadoop.hdfs.server.datanode.DataNode.recoverBlock(DataNode.java:2047)
>             at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>             at
>     sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39)
>             at
>     sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
>             at java.lang.reflect.Method.invoke(Method.java:597)
>             at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:563)
>             at
>     org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:1388)
>             at
>     org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:1384)
>             at java.security.AccessController.doPrivileged(Native Method)
>             at javax.security.auth.Subject.doAs(Subject.java:396)
>             at
>     org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1121)
>            at org.apache.hadoop.ipc.Server$Handler.run(Server.java:1382)
>
>             at org.apache.hadoop.ipc.Client.call(Client.java:1070)
>             at org.apache.hadoop.ipc.RPC$Invoker.invoke(RPC.java:225)
>             at $Proxy7.recoverBlock(Unknown Source)
>             at
>     org.apache.hadoop.hdfs.DFSClient$DFSOutputStream.processDatanodeError(DFSClient.java:3121)
>             at
>     org.apache.hadoop.hdfs.DFSClient$DFSOutputStream.access$2100(DFSClient.java:2589)
>             at
>     org.apache.hadoop.hdfs.DFSClient$DFSOutputStream$DataStreamer.run(DFSClient.java:2793)
>     2013-05-20 02:36:00,249 (DataStreamer for file
>     /flume/data/Flume_raw_1_.1368960029025.lzo.tmp block
>     blk_8857674139042547711_2143611) [WARN -
>     org.apache.hadoop.hdfs.DFSClient$DFSOutputStream.processDatanodeError(DFSClient.java:3162)]
>     Error Recovery for block blk_8857674139042547711_2143611 failed 
>     because recovery from primary datanode <datanode2_ip>:60010 failed
>     1 times.  Pipeline was <datanode2_ip>:60010. Will retry...
>     ..
>     ..
>     ..
>     2013-05-20 02:36:16,933 (hdfs-hdfsSink1-call-runner-8) [WARN -
>     org.apache.flume.sink.hdfs.BucketWriter.*append*(BucketWriter.java:378)]
>     Caught IOException writing to HDFSWriter (write beyond end of
>     stream). *Closing file
>     (hdfs://<namenode_ip>:64310/flume/data/Flume_raw_1_.1368960029025.lzo.tmp)
>     and rethrowing exception.*
>     2013-05-20 02:36:16,934 (hdfs-hdfsSink1-call-runner-8) [WARN -
>     org.apache.flume.sink.hdfs.BucketWriter.append(BucketWriter.java:384)]
>     Caught IOException while closing file
>     (hdfs://<namenode_ip>:64310/flume/data/Flume_raw_1_.1368960029025.lzo.tmp).
>     *Exception follows.**
>     **java.io.IOException: DFSOutputStream is closed*
>             at
>     org.apache.hadoop.hdfs.DFSClient$DFSOutputStream.sync(DFSClient.java:3669)
>             at
>     org.apache.hadoop.fs.FSDataOutputStream.sync(FSDataOutputStream.java:97)
>             at
>     org.apache.flume.sink.hdfs.HDFSCompressedDataStream.sync(HDFSCompressedDataStream.java:96)
>             at
>     org.apache.flume.sink.hdfs.BucketWriter.doFlush(BucketWriter.java:345)
>             at
>     org.apache.flume.sink.hdfs.BucketWriter.access$500(BucketWriter.java:53)
>             at
>     org.apache.flume.sink.hdfs.BucketWriter$4.run(BucketWriter.java:310)
>             at
>     org.apache.flume.sink.hdfs.BucketWriter$4.run(BucketWriter.java:308)
>             at
>     org.apache.flume.sink.hdfs.BucketWriter.runPrivileged(BucketWriter.java:143)
>             at
>     org.apache.flume.sink.hdfs.BucketWriter.flush(BucketWriter.java:308)
>             at
>     org.apache.flume.sink.hdfs.BucketWriter.close(BucketWriter.java:257)
>             at
>     org.apache.flume.sink.hdfs.BucketWriter.append(BucketWriter.java:382)
>             at
>     org.apache.flume.sink.hdfs.HDFSEventSink$2.call(HDFSEventSink.java:729)
>             at
>     org.apache.flume.sink.hdfs.HDFSEventSink$2.call(HDFSEventSink.java:727)
>             at
>     java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:303)
>             at java.util.concurrent.FutureTask.run(FutureTask.java:138)
>             at
>     java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)
>             at
>     java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)
>             at java.lang.Thread.run(Thread.java:662)
>     2013-05-20 02:36:16,934
>     (SinkRunner-PollingRunner-DefaultSinkProcessor) [WARN -
>     org.apache.flume.sink.hdfs.HDFSEventSink.process(HDFSEventSink.java:456)]
>     HDFS IO error
>     java.io.IOException: write beyond end of stream
>             at
>     com.hadoop.compression.lzo.LzopOutputStream.write(LzopOutputStream.java:127)
>             at java.io.OutputStream.write(OutputStream.java:58)
>             at
>     org.apache.flume.sink.hdfs.HDFSCompressedDataStream.append(HDFSCompressedDataStream.java:81)
>             at
>     org.apache.flume.sink.hdfs.BucketWriter.append(BucketWriter.java:376)
>             at
>     org.apache.flume.sink.hdfs.HDFSEventSink$2.call(HDFSEventSink.java:729)
>             at
>     org.apache.flume.sink.hdfs.HDFSEventSink$2.call(HDFSEventSink.java:727)
>             at
>     java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:303)
>            at java.util.concurrent.FutureTask.run(FutureTask.java:138)
>             at
>     java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)
>             at
>     java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)
>             at java.lang.Thread.run(Thread.java:662)
>     2013-05-20 02:36:16,934
>     (SinkRunner-PollingRunner-DefaultSinkProcessor) [WARN -
>     org.apache.flume.sink.hdfs.HDFSEventSink.process(HDFSEventSink.java:456)]*HDFS
>     IO error**
>     **java.io.IOException: write beyond end of stream*
>             at
>     com.hadoop.compression.lzo.LzopOutputStream.write(LzopOutputStream.java:127)
>             at java.io.OutputStream.write(OutputStream.java:58)
>             at
>     org.apache.flume.sink.hdfs.HDFSCompressedDataStream.append(HDFSCompressedDataStream.java:81)
>             at
>     org.apache.flume.sink.hdfs.BucketWriter.append(BucketWriter.java:376)
>             at
>     org.apache.flume.sink.hdfs.HDFSEventSink$2.call(HDFSEventSink.java:729)
>             at
>     org.apache.flume.sink.hdfs.HDFSEventSink$2.call(HDFSEventSink.java:727)
>             at
>     java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:303)
>             at java.util.concurrent.FutureTask.run(FutureTask.java:138)
>             at
>     java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)
>             at
>     java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)
>             at java.lang.Thread.run(Thread.java:662)
>     2013-05-20 02:36:20,988 (hdfs-hdfsSink1-call-runner-7) [WARN -
>     org.apache.flume.sink.hdfs.BucketWriter.append(BucketWriter.java:378)]*Caught
>     IOException writing to HDFSWriter (Error Recovery for block
>     blk_8857674139042547711_2143611 failed because recovery from
>     primary datanode <datanode2_ip>:60010 failed 6 times.  Pipeline
>     was <datanode2_ip>:60010. Aborting...). Closing file
>     (hdfs://<namenode_ip>:64310/flume/data/Flume_raw_1_.1368960029025.lzo.tmp)
>     and rethrowing exception.*
>     ..
>     ..
>     ..
>     then it keep repeating these logs
>     *
>     **Flume agent HDFSsink properties*
>
>     agent.sinks.hdfsSink1.type = hdfs
>     agent.sinks.hdfsSink1.hdfs.path=hdfs://<namenode_ip>:64310/flume/tracker_data
>     #agent.sinks.hdfsSink1.hdfs.fileType =DataStream
>     agent.sinks.hdfsSink1.hdfs.fileType =CompressedStream
>     agent.sinks.hdfsSink1.hdfs.filePrefix=Flume_raw_1_
>     agent.sinks.hdfsSink1.hdfs.rollSize=0
>     agent.sinks.hdfsSink1.hdfs.codeC=LZopCodec
>     agent.sinks.hdfsSink1.hdfs.rollCount=0
>     agent.sinks.hdfsSink1.hdfs.batchSize=5000
>     agent.sinks.hdfsSink1.hdfs.rollInterval=60
>     agent.sinks.hdfsSink1.channel= fileChannel
>     agent.sinks.hdfsSink1.hdfs.callTimeout = 240000
>     agent.sinks.hdfsSink1.hdfs.idleTimeout = 300
>
>     Thanks,
>     Ashish
>
>


Re: HDFS Sink stops writing events because HDFSWriter failed to append and close a file

Posted by Jeff Lord <jl...@cloudera.com>.
Hi Ashish,

What version of flume are you running?

flume-ng version

-Jeff


On Fri, May 24, 2013 at 3:38 AM, Ashish Tadose
<as...@pubmatic.com>wrote:

>  Hi All,
>
> We are facing this issue in production flume setup.
>
> Issue initiates when HDFS sink BucketWriter fails to append a batch for a
> file because of hadoop datanode issue.
> Then it tries to close that file but even close() file throws a exception
> and HDFS sink does not remove that bucketwriter instance. [Cause may be no.
> of bucketwriter instances are below *maxOpenFiles*].
>
> Whats creating more problem is that HDFS sink is not letting go off that
> bucketwriter instance and keeps trying to append/close to that file
> infinitely which cause it to disrupt the event processing towards to HDFS.
>
> *Logs from flume agent **
> *
> 2013-05-20 02:32:40,669 (ResponseProcessor for block
> blk_8857674139042547711_2143611) [WARN -
> org.apache.hadoop.hdfs.DFSClient$DFSOutputStream$ResponseProcessor.run(DFSClient.java:3015)]
> DFSOutputStream ResponseProcessor exception  for block
> blk_8857674139042547711_2143611java.net.SocketTimeoutException: 69000
> millis timeout while waiting for channel to be ready for read. ch :
> java.nio.channels.SocketChannel[connected local=/<datanode_ip>:41129
> remote=/<datanode2_ip>:60010]
>         at
> org.apache.hadoop.net.SocketIOWithTimeout.doIO(SocketIOWithTimeout.java:164)
>         at
> org.apache.hadoop.net.SocketInputStream.read(SocketInputStream.java:155)
>         at
> org.apache.hadoop.net.SocketInputStream.read(SocketInputStream.java:128)
>         at java.io.DataInputStream.readFully(DataInputStream.java:178)
>         at java.io.DataInputStream.readLong(DataInputStream.java:399)
>         at
> org.apache.hadoop.hdfs.protocol.DataTransferProtocol$PipelineAck.readFields(DataTransferProtocol.java:124)
>         at
> org.apache.hadoop.hdfs.DFSClient$DFSOutputStream$ResponseProcessor.run(DFSClient.java:2967)
>
> 2013-05-20 02:32:56,321 (DataStreamer for file
> /flume/data/Flume_raw_1_.1368960029025.lzo.tmp block
> blk_8857674139042547711_2143611) [WARN -
> org.apache.hadoop.hdfs.DFSClient$DFSOutputStream.processDatanodeError(DFSClient.java:3051)]
> *Error Recovery for block blk_8857674139042547711_2143611 bad datanode[0]
> <datanode2_ip>:60010*
> 2013-05-20 02:32:56,322 (DataStreamer for file
> /flume/data/Flume_raw_1_.1368960029025.lzo.tmp block
> blk_8857674139042547711_2143611) [WARN -
> org.apache.hadoop.hdfs.DFSClient$DFSOutputStream.processDatanodeError(DFSClient.java:3102)]
> Error Recovery for block blk_8857674139042547711_2143611 in pipeline
> <datanode2_ip>:60010, <datanode3_ip>:60010, <datanode4_ip>:60010: bad
> datanode <datanode2_ip>:60010
> 2013-05-20 02:32:56,538 (Log-BackgroundWorker-fileChannel) [INFO -
> org.apache.flume.channel.file.EventQueueBackingStoreFile.checkpoint(EventQueueBackingStoreFile.java:109)]
> Start checkpoint for /home/flume/flume_channel/checkpointDir15/checkpoint,
> elements to sync = 42000
> 2013-05-20 02:32:56,634 (Log-BackgroundWorker-fileChannel) [INFO -
> org.apache.flume.channel.file.EventQueueBackingStoreFile.checkpoint(EventQueueBackingStoreFile.java:117)]
> Updating checkpoint metadata: logWriteOrderID: 1370969225649, queueSize:
> 140156, queueHead: 72872626
> 2013-05-20 02:32:56,722 (Log-BackgroundWorker-fileChannel) [INFO -
> org.apache.flume.channel.file.LogFileV3$MetaDataWriter.markCheckpoint(LogFileV3.java:85)]
> Updating log-543.meta currentPosition = 743847065, logWriteOrderID =
> 1370969225649
> 2013-05-20 02:32:56,759 (Log-BackgroundWorker-fileChannel) [INFO -
> org.apache.flume.channel.file.Log.writeCheckpoint(Log.java:886)] Updated
> checkpoint for file: /home/flume/flume_channel/dataDir15/log-543 position:
> 743847065 logWriteOrderID: 1370969225649
> 2013-05-20 02:33:56,331 (DataStreamer for file
> /flume/data/Flume_raw_1_.1368960029025.lzo.tmp block
> blk_8857674139042547711_2143611) [WARN -
> org.apache.hadoop.hdfs.DFSClient$DFSOutputStream.processDatanodeError(DFSClient.java:3123)]
> Failed recovery attempt #0 from primary datanode <datanode3_ip>:60010
> java.net.SocketTimeoutException: Call to /<datanode3_ip>:60021 failed on
> socket timeout exception: java.net.SocketTimeoutException: 60000 millis
> timeout while waiting for channel to be ready for read. ch :
> java.nio.channels.SocketChannel[connected local=/<datanode_ip>:41526
> remote=/<datanode3_ip>:60021]
> ..
> ..
> ..
> 2013-05-20 02:36:00,249 (DataStreamer for file
> /flume/data/Flume_raw_1_.1368960029025.lzo.tmp block
> blk_8857674139042547711_2143611) [WARN -
> org.apache.hadoop.hdfs.DFSClient$DFSOutputStream.processDatanodeError(DFSClient.java:3123)]
> Failed recovery attempt #0 from primary datanode <datanode2_ip>:60010
> org.apache.hadoop.ipc.RemoteException:
> org.apache.hadoop.ipc.RemoteException: java.io.IOException:
> blk_8857674139042547711_2143611 is already commited, storedBlock == null.
>         at
> org.apache.hadoop.hdfs.server.namenode.FSNamesystem.nextGenerationStampForBlock(FSNamesystem.java:5388)
>         at
> org.apache.hadoop.hdfs.server.namenode.NameNode.nextGenerationStamp(NameNode.java:748)
>         at sun.reflect.GeneratedMethodAccessor24.invoke(Unknown Source)
>         at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
>         at java.lang.reflect.Method.invoke(Method.java:597)
>         at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:563)
>         at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:1388)
>         at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:1384)
>         at java.security.AccessController.doPrivileged(Native Method)
>         at javax.security.auth.Subject.doAs(Subject.java:396)
>         at
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1121)
>         at org.apache.hadoop.ipc.Server$Handler.run(Server.java:1382)
>
>         at org.apache.hadoop.ipc.Client.call(Client.java:1070)
>         at org.apache.hadoop.ipc.RPC$Invoker.invoke(RPC.java:225)
>         at $Proxy5.nextGenerationStamp(Unknown Source)
>         at
> org.apache.hadoop.hdfs.server.datanode.DataNode.syncBlock(DataNode.java:1999)
>         at
> org.apache.hadoop.hdfs.server.datanode.DataNode.recoverBlock(DataNode.java:1967)
>         at
> org.apache.hadoop.hdfs.server.datanode.DataNode.recoverBlock(DataNode.java:2047)
>         at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>         at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39)
>         at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
>         at java.lang.reflect.Method.invoke(Method.java:597)
>         at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:563)
>         at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:1388)
>         at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:1384)
>         at java.security.AccessController.doPrivileged(Native Method)
>         at javax.security.auth.Subject.doAs(Subject.java:396)
>         at
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1121)
> 2013-05-20 02:36:00,249 (DataStreamer for file
> /flume/data/Flume_raw_1_.1368960029025.lzo.tmp block
> blk_8857674139042547711_2143611) [WARN -
> org.apache.hadoop.hdfs.DFSClient$DFSOutputStream.processDatanodeError(DFSClient.java:3123)]
> Failed recovery attempt #0 from primary datanode <datanode2_ip>:60010
> org.apache.hadoop.ipc.RemoteException:
> org.apache.hadoop.ipc.RemoteException: java.io.IOException:
> blk_8857674139042547711_2143611 is already commited, storedBlock == null.
>         at
> org.apache.hadoop.hdfs.server.namenode.FSNamesystem.nextGenerationStampForBlock(FSNamesystem.java:5388)
>         at
> org.apache.hadoop.hdfs.server.namenode.NameNode.nextGenerationStamp(NameNode.java:748)
>         at sun.reflect.GeneratedMethodAccessor24.invoke(Unknown Source)
>         at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
>         at java.lang.reflect.Method.invoke(Method.java:597)
>         at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:563)
>         at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:1388)
>         at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:1384)
>         at java.security.AccessController.doPrivileged(Native Method)
>         at javax.security.auth.Subject.doAs(Subject.java:396)
>         at
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1121)
>         at org.apache.hadoop.ipc.Server$Handler.run(Server.java:1382)
>
>         at org.apache.hadoop.ipc.Client.call(Client.java:1070)
>         at org.apache.hadoop.ipc.RPC$Invoker.invoke(RPC.java:225)
>         at $Proxy5.nextGenerationStamp(Unknown Source)
>         at
> org.apache.hadoop.hdfs.server.datanode.DataNode.syncBlock(DataNode.java:1999)
>         at
> org.apache.hadoop.hdfs.server.datanode.DataNode.recoverBlock(DataNode.java:1967)
>         at
> org.apache.hadoop.hdfs.server.datanode.DataNode.recoverBlock(DataNode.java:2047)
>         at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>         at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39)
>         at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
>         at java.lang.reflect.Method.invoke(Method.java:597)
>         at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:563)
>         at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:1388)
>         at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:1384)
>         at java.security.AccessController.doPrivileged(Native Method)
>         at javax.security.auth.Subject.doAs(Subject.java:396)
>         at
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1121)
>        at org.apache.hadoop.ipc.Server$Handler.run(Server.java:1382)
>
>         at org.apache.hadoop.ipc.Client.call(Client.java:1070)
>         at org.apache.hadoop.ipc.RPC$Invoker.invoke(RPC.java:225)
>         at $Proxy7.recoverBlock(Unknown Source)
>         at
> org.apache.hadoop.hdfs.DFSClient$DFSOutputStream.processDatanodeError(DFSClient.java:3121)
>         at
> org.apache.hadoop.hdfs.DFSClient$DFSOutputStream.access$2100(DFSClient.java:2589)
>         at
> org.apache.hadoop.hdfs.DFSClient$DFSOutputStream$DataStreamer.run(DFSClient.java:2793)
> 2013-05-20 02:36:00,249 (DataStreamer for file
> /flume/data/Flume_raw_1_.1368960029025.lzo.tmp block
> blk_8857674139042547711_2143611) [WARN -
> org.apache.hadoop.hdfs.DFSClient$DFSOutputStream.processDatanodeError(DFSClient.java:3162)]
> Error Recovery for block blk_8857674139042547711_2143611 failed  because
> recovery from primary datanode <datanode2_ip>:60010 failed 1 times.
> Pipeline was <datanode2_ip>:60010. Will retry...
> ..
> ..
> ..
> 2013-05-20 02:36:16,933 (hdfs-hdfsSink1-call-runner-8) [WARN -
> org.apache.flume.sink.hdfs.BucketWriter.*append*(BucketWriter.java:378)]
> Caught IOException writing to HDFSWriter (write beyond end of stream). *Closing
> file
> (hdfs://<namenode_ip>:64310/flume/data/Flume_raw_1_.1368960029025.lzo.tmp)
> and rethrowing exception.*
> 2013-05-20 02:36:16,934 (hdfs-hdfsSink1-call-runner-8) [WARN -
> org.apache.flume.sink.hdfs.BucketWriter.append(BucketWriter.java:384)]
> Caught IOException while closing file
> (hdfs://<namenode_ip>:64310/flume/data/Flume_raw_1_.1368960029025.lzo.tmp).
> *Exception follows.**
> **java.io.IOException: DFSOutputStream is closed*
>         at
> org.apache.hadoop.hdfs.DFSClient$DFSOutputStream.sync(DFSClient.java:3669)
>         at
> org.apache.hadoop.fs.FSDataOutputStream.sync(FSDataOutputStream.java:97)
>         at
> org.apache.flume.sink.hdfs.HDFSCompressedDataStream.sync(HDFSCompressedDataStream.java:96)
>         at
> org.apache.flume.sink.hdfs.BucketWriter.doFlush(BucketWriter.java:345)
>         at
> org.apache.flume.sink.hdfs.BucketWriter.access$500(BucketWriter.java:53)
>         at
> org.apache.flume.sink.hdfs.BucketWriter$4.run(BucketWriter.java:310)
>         at
> org.apache.flume.sink.hdfs.BucketWriter$4.run(BucketWriter.java:308)
>         at
> org.apache.flume.sink.hdfs.BucketWriter.runPrivileged(BucketWriter.java:143)
>         at
> org.apache.flume.sink.hdfs.BucketWriter.flush(BucketWriter.java:308)
>         at
> org.apache.flume.sink.hdfs.BucketWriter.close(BucketWriter.java:257)
>         at
> org.apache.flume.sink.hdfs.BucketWriter.append(BucketWriter.java:382)
>         at
> org.apache.flume.sink.hdfs.HDFSEventSink$2.call(HDFSEventSink.java:729)
>         at
> org.apache.flume.sink.hdfs.HDFSEventSink$2.call(HDFSEventSink.java:727)
>         at
> java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:303)
>         at java.util.concurrent.FutureTask.run(FutureTask.java:138)
>         at
> java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)
>         at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)
>         at java.lang.Thread.run(Thread.java:662)
> 2013-05-20 02:36:16,934 (SinkRunner-PollingRunner-DefaultSinkProcessor)
> [WARN -
> org.apache.flume.sink.hdfs.HDFSEventSink.process(HDFSEventSink.java:456)]
> HDFS IO error
> java.io.IOException: write beyond end of stream
>         at
> com.hadoop.compression.lzo.LzopOutputStream.write(LzopOutputStream.java:127)
>         at java.io.OutputStream.write(OutputStream.java:58)
>         at
> org.apache.flume.sink.hdfs.HDFSCompressedDataStream.append(HDFSCompressedDataStream.java:81)
>         at
> org.apache.flume.sink.hdfs.BucketWriter.append(BucketWriter.java:376)
>         at
> org.apache.flume.sink.hdfs.HDFSEventSink$2.call(HDFSEventSink.java:729)
>         at
> org.apache.flume.sink.hdfs.HDFSEventSink$2.call(HDFSEventSink.java:727)
>         at
> java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:303)
>        at java.util.concurrent.FutureTask.run(FutureTask.java:138)
>         at
> java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)
>         at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)
>         at java.lang.Thread.run(Thread.java:662)
> 2013-05-20 02:36:16,934 (SinkRunner-PollingRunner-DefaultSinkProcessor)
> [WARN -
> org.apache.flume.sink.hdfs.HDFSEventSink.process(HDFSEventSink.java:456)]*HDFS IO error
> **
> **java.io.IOException: write beyond end of stream*
>         at
> com.hadoop.compression.lzo.LzopOutputStream.write(LzopOutputStream.java:127)
>         at java.io.OutputStream.write(OutputStream.java:58)
>         at
> org.apache.flume.sink.hdfs.HDFSCompressedDataStream.append(HDFSCompressedDataStream.java:81)
>         at
> org.apache.flume.sink.hdfs.BucketWriter.append(BucketWriter.java:376)
>         at
> org.apache.flume.sink.hdfs.HDFSEventSink$2.call(HDFSEventSink.java:729)
>         at
> org.apache.flume.sink.hdfs.HDFSEventSink$2.call(HDFSEventSink.java:727)
>         at
> java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:303)
>         at java.util.concurrent.FutureTask.run(FutureTask.java:138)
>         at
> java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)
>         at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)
>         at java.lang.Thread.run(Thread.java:662)
> 2013-05-20 02:36:20,988 (hdfs-hdfsSink1-call-runner-7) [WARN -
> org.apache.flume.sink.hdfs.BucketWriter.append(BucketWriter.java:378)]*Caught IOException writing to HDFSWriter (Error Recovery for block
> blk_8857674139042547711_2143611 failed  because recovery from primary
> datanode <datanode2_ip>:60010 failed 6 times.  Pipeline was
> <datanode2_ip>:60010. Aborting...). Closing file
> (hdfs://<namenode_ip>:64310/flume/data/Flume_raw_1_.1368960029025.lzo.tmp)
> and rethrowing exception.*
> ..
> ..
> ..
> then it keep repeating these logs
> *
> **Flume agent HDFSsink properties*
>
> agent.sinks.hdfsSink1.type = hdfs
>
> agent.sinks.hdfsSink1.hdfs.path=hdfs://<namenode_ip>:64310/flume/tracker_data
> #agent.sinks.hdfsSink1.hdfs.fileType =DataStream
> agent.sinks.hdfsSink1.hdfs.fileType =CompressedStream
> agent.sinks.hdfsSink1.hdfs.filePrefix=Flume_raw_1_
> agent.sinks.hdfsSink1.hdfs.rollSize=0
> agent.sinks.hdfsSink1.hdfs.codeC=LZopCodec
> agent.sinks.hdfsSink1.hdfs.rollCount=0
> agent.sinks.hdfsSink1.hdfs.batchSize=5000
> agent.sinks.hdfsSink1.hdfs.rollInterval=60
> agent.sinks.hdfsSink1.channel= fileChannel
> agent.sinks.hdfsSink1.hdfs.callTimeout = 240000
> agent.sinks.hdfsSink1.hdfs.idleTimeout = 300
>
> Thanks,
> Ashish
>
>