You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@flume.apache.org by "Hari Shreedharan (JIRA)" <ji...@apache.org> on 2014/02/05 20:46:14 UTC

[jira] [Commented] (FLUME-2307) Old log data is not cleaned up

    [ https://issues.apache.org/jira/browse/FLUME-2307?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13892482#comment-13892482 ] 

Hari Shreedharan commented on FLUME-2307:
-----------------------------------------

On reading the code, I think I know what is happening here:
* A bunch of takes happen from the queue -> the events end up in the inflightTakes file, which means that they are in the inflightTakes and log-X won't be deleted until the transaction gets committed or rolled back.
* Commit or rollback - first thing we do is grab the lock - which fails, so the rollback never happens, causing the transaction to be stuck in limbo - never committed or rolled back.
* This means log-X would never get deleted, so any log file with Y > X would never get deleted.

I don't see a way around it, since we need to make sure that the rollback is completed so the events are put back into the queue. Else we would essentially lose data (on restart since they are in the inflights, they get put back into the channel - so the data becomes "available" again).

> Old log data is not cleaned up
> ------------------------------
>
>                 Key: FLUME-2307
>                 URL: https://issues.apache.org/jira/browse/FLUME-2307
>             Project: Flume
>          Issue Type: Bug
>          Components: Channel
>    Affects Versions: v1.4.0
>            Reporter: Steve Zesch
>
> I've observed Flume failing to clean up old log data in FileChannels. The amount of old log data can range anywhere from tens to hundreds of GB. I was able to confirm that the channels were in fact empty. This behavior always occurs after lock timeouts when attempting to put, take, rollback, or commit to a FileChannel. Once the timeout occurs, Flume stops cleaning up the old files. I was able to confirm that the Log's writeCheckpoint method was still being called and successfully obtaining a lock from tryLockExclusive(), but I was not able to confirm removeOldLogs being called. The application log did not include "Removing old file: log-xyz" for the old files which the Log class would output if they were correctly being removed. I suspect the lock timeouts were due to high I/O load at the time.
> Some stack traces:
> org.apache.flume.ChannelException: Failed to obtain lock for writing to the log. Try increasing the log write timeout value. [channel=fileChannel]
>         at org.apache.flume.channel.file.FileChannel$FileBackedTransaction.doPut(FileChannel.java:478)
>         at org.apache.flume.channel.BasicTransactionSemantics.put(BasicTransactionSemantics.java:93)
>         at org.apache.flume.channel.BasicChannelSemantics.put(BasicChannelSemantics.java:80)
>         at org.apache.flume.channel.ChannelProcessor.processEventBatch(ChannelProcessor.java:189)
> org.apache.flume.ChannelException: Failed to obtain lock for writing to the log. Try increasing the log write timeout value. [channel=fileChannel]
>         at org.apache.flume.channel.file.FileChannel$FileBackedTransaction.doCommit(FileChannel.java:594)
>         at org.apache.flume.channel.BasicTransactionSemantics.commit(BasicTransactionSemantics.java:151)
>         at dataxu.flume.plugins.avro.AsyncAvroSink.process(AsyncAvroSink.java:548)
>         at dataxu.flume.plugins.ClassLoaderFlumeSink.process(ClassLoaderFlumeSink.java:33)
>         at org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:68)
>         at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:147)
>         at java.lang.Thread.run(Thread.java:619)
> org.apache.flume.ChannelException: Failed to obtain lock for writing to the log. Try increasing the log write timeout value. [channel=fileChannel]
>         at org.apache.flume.channel.file.FileChannel$FileBackedTransaction.doRollback(FileChannel.java:621)
>         at org.apache.flume.channel.BasicTransactionSemantics.rollback(BasicTransactionSemantics.java:168)
>         at org.apache.flume.channel.ChannelProcessor.processEventBatch(ChannelProcessor.java:194)
>         at dataxu.flume.plugins.avro.AvroSource.appendBatch(AvroSource.java:209)
>         at sun.reflect.GeneratedMethodAccessor19.invoke(Unknown Source)
>         at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
>         at java.lang.reflect.Method.invoke(Method.java:597)
>         at org.apache.avro.ipc.specific.SpecificResponder.respond(SpecificResponder.java:91)
>         at org.apache.avro.ipc.Responder.respond(Responder.java:151)
>         at org.apache.avro.ipc.NettyServer$NettyServerAvroHandler.messageReceived(NettyServer.java:188)
>         at org.jboss.netty.channel.SimpleChannelUpstreamHandler.handleUpstream(SimpleChannelUpstreamHandler.java:75)
>         at org.apache.avro.ipc.NettyServer$NettyServerAvroHandler.handleUpstream(NettyServer.java:173)
>         at org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:564)
>         at org.jboss.netty.channel.DefaultChannelPipeline$DefaultChannelHandlerContext.sendUpstream(DefaultChannelPipeline.java:792)
>         at org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:296)
>         at org.jboss.netty.handler.codec.frame.FrameDecoder.unfoldAndFireMessageReceived(FrameDecoder.java:321)
>         at org.jboss.netty.handler.codec.frame.FrameDecoder.callDecode(FrameDecoder.java:303)
>         at org.jboss.netty.handler.codec.frame.FrameDecoder.messageReceived(FrameDecoder.java:220)
>         at org.jboss.netty.channel.SimpleChannelUpstreamHandler.handleUpstream(SimpleChannelUpstreamHandler.java:75)
>         at org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:564)
>         at org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:559)
>         at org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:268)
>         at org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:255)
>         at org.jboss.netty.channel.socket.nio.NioWorker.read(NioWorker.java:94)
>         at org.jboss.netty.channel.socket.nio.AbstractNioWorker.processSelectedKeys(AbstractNioWorker.java:364)
>         at org.jboss.netty.channel.socket.nio.AbstractNioWorker.run(AbstractNioWorker.java:238)
>         at org.jboss.netty.channel.socket.nio.NioWorker.run(NioWorker.java:38)
>         at org.jboss.netty.util.internal.DeadLockProofWorker$1.run(DeadLockProofWorker.java:42)
>         at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)
>         at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)
>         at java.lang.Thread.run(Thread.java:619)
> Channel Config:
> agent.channels.fileChannel.type = file
> agent.channels.fileChannel.checkpointDir = /var/log/flume-ng/channels/fileChannel/checkpoint
> agent.channels.fileChannel.dataDirs = /var/log/flume-ng/channels/fileChannel/data
> agent.channels.fileChannel.capacity = 100000000
> agent.channels.fileChannel.transactionCapacity = 100000000
> agent.channels.fileChannel.maxFileSize = 104857600 



--
This message was sent by Atlassian JIRA
(v6.1.5#6160)