You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flume.apache.org by "Bhaskar V. Karambelkar" <bh...@gmail.com> on 2012/08/24 00:24:19 UTC

At flume shutdown only one HDFSSink is closed properly

I have 3 HDFS sinks all writing to the same namenode, but different paths.

e.g. sink1 = hdfs://namenode/path1
sink2 = hdfs://namenode/path2
etc.

When flume is shutdown (kill <flume-pid>), the file for the first sink is
closed correctly and renamed to remove the .tmp extension
but the second file's closing throws the following exception and the file's
.tmp extension is also not removed.
I see this happening very consistently, for 1+ HDFS sinks, only the first
one is closed properly and renamed, the rest all throw exception
when being closed, and are not renamed to remove the .tmp extension.

2012-08-23 19:51:39,837 WARN hdfs.BucketWriter: failed to close()
HDFSWriter for file
(hdfs://hadoop-namnode:9000/flume/avro/2012/08/23/l/avro_.1345751470750.tmp).
Exception follows.
java.io.IOException: Filesystem closed
    at org.apache.hadoop.hdfs.DFSClient.checkOpen(DFSClient.java:264)
    at org.apache.hadoop.hdfs.DFSClient.access$1100(DFSClient.java:74)
    at
org.apache.hadoop.hdfs.DFSClient$DFSOutputStream.sync(DFSClient.java:3667)
    at
org.apache.hadoop.fs.FSDataOutputStream.sync(FSDataOutputStream.java:97)
    at
org.apache.flume.sink.hdfs.HDFSDataStream.close(HDFSDataStream.java:103)
    at
org.apache.flume.sink.hdfs.BucketWriter.doClose(BucketWriter.java:250)
    at
org.apache.flume.sink.hdfs.BucketWriter.access$400(BucketWriter.java:48)
    at org.apache.flume.sink.hdfs.BucketWriter$3.run(BucketWriter.java:236)
    at org.apache.flume.sink.hdfs.BucketWriter$3.run(BucketWriter.java:233)
    at
org.apache.flume.sink.hdfs.BucketWriter.runPrivileged(BucketWriter.java:125)
    at org.apache.flume.sink.hdfs.BucketWriter.close(BucketWriter.java:233)
    at
org.apache.flume.sink.hdfs.HDFSEventSink$3.call(HDFSEventSink.java:747)
    at
org.apache.flume.sink.hdfs.HDFSEventSink$3.call(HDFSEventSink.java:744)
    at java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:303)
    at java.util.concurrent.FutureTask.run(FutureTask.java:138)
    at
java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)
    at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)
    at java.lang.Thread.run(Thread.java:662)
2012-08-23 19:51:39,840 WARN hdfs.HDFSEventSink: Exception while closing
hdfs://hadoop-namenode:9000/flume/avro/2012/08/23/avro_. Exception follows.
java.io.IOException: Filesystem closed
    at org.apache.hadoop.hdfs.DFSClient.checkOpen(DFSClient.java:264)
    at org.apache.hadoop.hdfs.DFSClient.getFileInfo(DFSClient.java:873)
    at
org.apache.hadoop.hdfs.DistributedFileSystem.getFileStatus(DistributedFileSystem.java:513)
    at org.apache.hadoop.fs.FileSystem.exists(FileSystem.java:768)
    at
org.apache.flume.sink.hdfs.BucketWriter.renameBucket(BucketWriter.java:369)
    at
org.apache.flume.sink.hdfs.BucketWriter.doClose(BucketWriter.java:269)
    at
org.apache.flume.sink.hdfs.BucketWriter.access$400(BucketWriter.java:48)
    at org.apache.flume.sink.hdfs.BucketWriter$3.run(BucketWriter.java:236)
    at org.apache.flume.sink.hdfs.BucketWriter$3.run(BucketWriter.java:233)
    at
org.apache.flume.sink.hdfs.BucketWriter.runPrivileged(BucketWriter.java:1125)
    at org.apache.flume.sink.hdfs.BucketWriter.close(BucketWriter.java:233)
    at
org.apache.flume.sink.hdfs.HDFSEventSink$3.call(HDFSEventSink.java:747)
    at
org.apache.flume.sink.hdfs.HDFSEventSink$3.call(HDFSEventSink.java:744)
    at java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:303)
    at java.util.concurrent.FutureTask.run(FutureTask.java:138)
    at
java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)
    at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)
    at java.lang.Thread.run(Thread.java:662)

Re: HostName intercepter not working -- Anything i am missing

Posted by Mike Percy <mp...@apache.org>.
Worth trying out the AVRO_EVENT serializer which comes out of the box,
which writes the data in Avro container format (including all headers).

Regards,
Mike

On Wed, Aug 29, 2012 at 2:11 AM, Hari Shreedharan <hshreedharan@cloudera.com
> wrote:

> Hi,
>
> The interceptor is probably working as expected. The reason for this is
> that the serializer that the FILE_ROLL sink uses by default is the BodyText
> serializer which simply serializes the body. The hostname is in the header
> though, so you need to write a serializer that can write the header out in
> the format you want. See :
> http://flume.apache.org/FlumeUserGuide.html#file-roll-sink. You can write
> your own serializer and then supply it through the configuration.
>
>
> Thanks
> Hari
> --
> Hari Shreedharan
>
>
> On Wednesday, August 29, 2012 at 1:57 AM, Ashutosh Panigrahi wrote:
>
> > Hi All,
> >
> > I am a newbie to flume. I have started using it for log aggregation in
> my company.
> >
> > I have been desperately trying to make the hostname interceptor work for
> me , but i am not getting the hostName in the final aggregated log file.
> >
> > The conf file at my AGGREGATOR machine is
> >
> > # Describe/configure source1
> > agent1.sources.source1.type = avro
> > agent1.sources.source1.bind = A.B.C.D
> > agent1.sources.source1.port = 44444
> >
> > agent1.sources.source1.interceptors = host_int inter1
> > agent1.sources.source1.interceptors.host_int.type = host
> > agent1.sources.source1.interceptors.host_int.hostHeader = hostname
> >
> > agent1.sources.source1.interceptors.inter1.type = timestamp
> >
> >
> > # Describe sink1
> > agent1.sinks.sink1.type = FILE_ROLL
> > agent1.sinks.sink1.sink.directory=/home/ashutosh/flumeLogs
> > agent1.sinks.sink1.sink.rollInterval=0
> >
> >
> >
> > The configuration file at my agents are .
> >
> > agent1.sources.source1.type = exec
> > agent1.sources.source1.command = tail -F /mnt1/logs/LOGGER.log
> >
> >
> > My expectation was that in the Aggregator machine, whatever logs i get,
> will have the hostname in the beginning. But i am getting the actual string
> which was generated by agent.
> >
> > Please help.
> >
> >
> > Regards
> > Ashutosh
>
>
>
>

Re: HostName intercepter not working -- Anything i am missing

Posted by Hari Shreedharan <hs...@cloudera.com>.
Hi, 

The interceptor is probably working as expected. The reason for this is that the serializer that the FILE_ROLL sink uses by default is the BodyText serializer which simply serializes the body. The hostname is in the header though, so you need to write a serializer that can write the header out in the format you want. See : http://flume.apache.org/FlumeUserGuide.html#file-roll-sink. You can write your own serializer and then supply it through the configuration.  


Thanks
Hari
-- 
Hari Shreedharan


On Wednesday, August 29, 2012 at 1:57 AM, Ashutosh Panigrahi wrote:

> Hi All,
> 
> I am a newbie to flume. I have started using it for log aggregation in my company.
> 
> I have been desperately trying to make the hostname interceptor work for me , but i am not getting the hostName in the final aggregated log file.
> 
> The conf file at my AGGREGATOR machine is 
> 
> # Describe/configure source1
> agent1.sources.source1.type = avro
> agent1.sources.source1.bind = A.B.C.D
> agent1.sources.source1.port = 44444
> 
> agent1.sources.source1.interceptors = host_int inter1
> agent1.sources.source1.interceptors.host_int.type = host
> agent1.sources.source1.interceptors.host_int.hostHeader = hostname
> 
> agent1.sources.source1.interceptors.inter1.type = timestamp
> 
> 
> # Describe sink1
> agent1.sinks.sink1.type = FILE_ROLL
> agent1.sinks.sink1.sink.directory=/home/ashutosh/flumeLogs
> agent1.sinks.sink1.sink.rollInterval=0
> 
> 
> 
> The configuration file at my agents are . 
> 
> agent1.sources.source1.type = exec
> agent1.sources.source1.command = tail -F /mnt1/logs/LOGGER.log
> 
> 
> My expectation was that in the Aggregator machine, whatever logs i get, will have the hostname in the beginning. But i am getting the actual string which was generated by agent.
> 
> Please help.
> 
> 
> Regards
> Ashutosh




HostName intercepter not working -- Anything i am missing

Posted by Ashutosh Panigrahi <ap...@sprinklr.com>.
Hi All,

I am a newbie to flume. I have started using it for log aggregation in my company.

I have been desperately trying to make the hostname interceptor work for me , but i am not getting the hostName in the final aggregated log file.

The conf file at my AGGREGATOR machine is 

# Describe/configure source1
agent1.sources.source1.type = avro
agent1.sources.source1.bind = A.B.C.D
agent1.sources.source1.port = 44444

agent1.sources.source1.interceptors = host_int inter1
agent1.sources.source1.interceptors.host_int.type = host
agent1.sources.source1.interceptors.host_int.hostHeader = hostname

agent1.sources.source1.interceptors.inter1.type = timestamp


# Describe sink1
agent1.sinks.sink1.type = FILE_ROLL
agent1.sinks.sink1.sink.directory=/home/ashutosh/flumeLogs
agent1.sinks.sink1.sink.rollInterval=0


The configuration  file at my  agents are .  

agent1.sources.source1.type = exec
agent1.sources.source1.command = tail -F /mnt1/logs/LOGGER.log


My expectation was that in the Aggregator machine, whatever logs i get, will have the hostname in the beginning. But i am getting the actual string which was generated by agent.

Please help.


Regards
Ashutosh

Re: At flume shutdown only one HDFSSink is closed properly

Posted by Mike Percy <mp...@apache.org>.
On Mon, Aug 27, 2012 at 7:53 AM, Bhaskar V. Karambelkar <bhaskarvk@gmail.com
> wrote:

> Hi Mike,
> There are indeed Jira issues for this.
> A flume jira issue , which was fixed in 1.2.0.  FLUME-1163.
>

Thanks Bhaskar, I just marked that a dup of
https://issues.apache.org/jira/browse/FLUME-1219 which has the relevant
patch.

But that is only half the story, the hadoop jira issue for supporting the
> 'fs.automatic.shutdown' flag
> is logged at
> https://issues.apache.org/jira/browse/HADOOP-4829
>
> There is even a mail thread about it on this mailing list
>
> https://mail-archives.apache.org/mod_mbox/flume-user/201208.mbox/%3CCAFukC%3D5fTnQTofaBpXUFPd%2Bqr%3DjU1G7iEpnwPX3VVtZ3Kg6VZA%40mail.gmail.com%3E
>
> Hadoop 1.x don't have the 4829 patch, so we can't use Hadoop 1.x with
> Flume if we want proper
> renaming of .tmp files at flume shutdown.
>
> CDH4, as well as CDH3u5 do have that patch pulled in, so they are fine.
>

Bummer, I did not realize that Hadoop 1.0.3 did not have the relevant
functionality. It would be nice to get that patch backported. If there is
another way around this, I'm open to suggestions.

Regards,
Mike

Re: At flume shutdown only one HDFSSink is closed properly

Posted by "Bhaskar V. Karambelkar" <bh...@gmail.com>.
Hi Mike,
There are indeed Jira issues for this.
A flume jira issue , which was fixed in 1.2.0.  FLUME-1163.

But that is only half the story, the hadoop jira issue for supporting the
'fs.automatic.shutdown' flag
is logged at
https://issues.apache.org/jira/browse/HADOOP-4829

There is even a mail thread about it on this mailing list
https://mail-archives.apache.org/mod_mbox/flume-user/201208.mbox/%3CCAFukC%3D5fTnQTofaBpXUFPd%2Bqr%3DjU1G7iEpnwPX3VVtZ3Kg6VZA%40mail.gmail.com%3E

Hadoop 1.x don't have the 4829 patch, so we can't use Hadoop 1.x with Flume
if we want proper
renaming of .tmp files at flume shutdown.

CDH4, as well as CDH3u5 do have that patch pulled in, so they are fine.

thanks
Bhaskar

On Fri, Aug 24, 2012 at 3:40 PM, Mike Percy <mp...@apache.org> wrote:

> Thanks for the additional info Bhaskar. So is this a known issue in
> vanilla Hadoop 1.0.3? If so do you have a JIRA number?
>
> Regards,
> Mike
>
>
> On Fri, Aug 24, 2012 at 12:13 PM, Bhaskar V. Karambelkar <
> bhaskarvk@gmail.com> wrote:
>
>> oops, this is just the same Hadoop's FileSystem.close() shutdown hook
>> issue. I was getting the exception no matter whether I had 1 HDFS sink or
>> more.
>> I was using hadoop vanilla 1.0.3, and looks like that one doesn't respect
>> the fs.automatic.close option.
>> Switched to CDH3u5, and no more problems, all the HDFS sinks correctly
>> rename the file on shutdown.
>>
>> In conclusion, the vanilla hadoop 1.x series is not an option for flume.
>>
>> Go with Hadoop 2.x or CDH3u5, CDH4
>>
>> thanks
>> Bhaskar
>>
>>
>> On Thu, Aug 23, 2012 at 9:35 PM, Mike Percy <mp...@apache.org> wrote:
>>
>>> Hmm... this likely happens because Hadoop statically caches the
>>> FileSystem object, so as it turns out, the multiple Sinks are sharing the
>>> same FileSystem objects.
>>>
>>> I think the only reason we need to explicitly close the FileSystem
>>> objects is to support the deleteOnExit feature. We are explicitly closing
>>> them because we removed the automatic shutdown hook typically installed by
>>> Hadoop to invoke FileSystem.close(), since it was interfering with the .tmp
>>> rolling. I wonder if we can get away with never closing them in our
>>> case... I'm not sure if we need the deleteOnExit() functionality implicitly
>>> for any reason, or if there are other more important reasons behind why the
>>> FileSystem objects should be closed.
>>>
>>> Regards,
>>> Mike
>>>
>>>
>>> On Thu, Aug 23, 2012 at 3:24 PM, Bhaskar V. Karambelkar <
>>> bhaskarvk@gmail.com> wrote:
>>>
>>>> I have 3 HDFS sinks all writing to the same namenode, but different
>>>> paths.
>>>>
>>>> e.g. sink1 = hdfs://namenode/path1
>>>> sink2 = hdfs://namenode/path2
>>>> etc.
>>>>
>>>> When flume is shutdown (kill <flume-pid>), the file for the first sink
>>>> is closed correctly and renamed to remove the .tmp extension
>>>> but the second file's closing throws the following exception and the
>>>> file's .tmp extension is also not removed.
>>>> I see this happening very consistently, for 1+ HDFS sinks, only the
>>>> first one is closed properly and renamed, the rest all throw exception
>>>> when being closed, and are not renamed to remove the .tmp extension.
>>>>
>>>> 2012-08-23 19:51:39,837 WARN hdfs.BucketWriter: failed to close()
>>>> HDFSWriter for file
>>>> (hdfs://hadoop-namnode:9000/flume/avro/2012/08/23/l/avro_.1345751470750.tmp).
>>>> Exception follows.
>>>> java.io.IOException: Filesystem closed
>>>>     at org.apache.hadoop.hdfs.DFSClient.checkOpen(DFSClient.java:264)
>>>>     at org.apache.hadoop.hdfs.DFSClient.access$1100(DFSClient.java:74)
>>>>     at
>>>> org.apache.hadoop.hdfs.DFSClient$DFSOutputStream.sync(DFSClient.java:3667)
>>>>     at
>>>> org.apache.hadoop.fs.FSDataOutputStream.sync(FSDataOutputStream.java:97)
>>>>     at
>>>> org.apache.flume.sink.hdfs.HDFSDataStream.close(HDFSDataStream.java:103)
>>>>     at
>>>> org.apache.flume.sink.hdfs.BucketWriter.doClose(BucketWriter.java:250)
>>>>     at
>>>> org.apache.flume.sink.hdfs.BucketWriter.access$400(BucketWriter.java:48)
>>>>     at
>>>> org.apache.flume.sink.hdfs.BucketWriter$3.run(BucketWriter.java:236)
>>>>     at
>>>> org.apache.flume.sink.hdfs.BucketWriter$3.run(BucketWriter.java:233)
>>>>     at
>>>> org.apache.flume.sink.hdfs.BucketWriter.runPrivileged(BucketWriter.java:125)
>>>>     at
>>>> org.apache.flume.sink.hdfs.BucketWriter.close(BucketWriter.java:233)
>>>>     at
>>>> org.apache.flume.sink.hdfs.HDFSEventSink$3.call(HDFSEventSink.java:747)
>>>>     at
>>>> org.apache.flume.sink.hdfs.HDFSEventSink$3.call(HDFSEventSink.java:744)
>>>>     at
>>>> java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:303)
>>>>     at java.util.concurrent.FutureTask.run(FutureTask.java:138)
>>>>     at
>>>> java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)
>>>>     at
>>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)
>>>>     at java.lang.Thread.run(Thread.java:662)
>>>> 2012-08-23 19:51:39,840 WARN hdfs.HDFSEventSink: Exception while
>>>> closing hdfs://hadoop-namenode:9000/flume/avro/2012/08/23/avro_. Exception
>>>> follows.
>>>> java.io.IOException: Filesystem closed
>>>>     at org.apache.hadoop.hdfs.DFSClient.checkOpen(DFSClient.java:264)
>>>>     at org.apache.hadoop.hdfs.DFSClient.getFileInfo(DFSClient.java:873)
>>>>     at
>>>> org.apache.hadoop.hdfs.DistributedFileSystem.getFileStatus(DistributedFileSystem.java:513)
>>>>     at org.apache.hadoop.fs.FileSystem.exists(FileSystem.java:768)
>>>>     at
>>>> org.apache.flume.sink.hdfs.BucketWriter.renameBucket(BucketWriter.java:369)
>>>>     at
>>>> org.apache.flume.sink.hdfs.BucketWriter.doClose(BucketWriter.java:269)
>>>>     at
>>>> org.apache.flume.sink.hdfs.BucketWriter.access$400(BucketWriter.java:48)
>>>>     at
>>>> org.apache.flume.sink.hdfs.BucketWriter$3.run(BucketWriter.java:236)
>>>>     at
>>>> org.apache.flume.sink.hdfs.BucketWriter$3.run(BucketWriter.java:233)
>>>>     at
>>>> org.apache.flume.sink.hdfs.BucketWriter.runPrivileged(BucketWriter.java:1125)
>>>>     at
>>>> org.apache.flume.sink.hdfs.BucketWriter.close(BucketWriter.java:233)
>>>>     at
>>>> org.apache.flume.sink.hdfs.HDFSEventSink$3.call(HDFSEventSink.java:747)
>>>>     at
>>>> org.apache.flume.sink.hdfs.HDFSEventSink$3.call(HDFSEventSink.java:744)
>>>>     at
>>>> java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:303)
>>>>     at java.util.concurrent.FutureTask.run(FutureTask.java:138)
>>>>     at
>>>> java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)
>>>>     at
>>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)
>>>>     at java.lang.Thread.run(Thread.java:662)
>>>>
>>>>
>>>
>>
>

Re: At flume shutdown only one HDFSSink is closed properly

Posted by Mike Percy <mp...@apache.org>.
Thanks for the additional info Bhaskar. So is this a known issue in vanilla
Hadoop 1.0.3? If so do you have a JIRA number?

Regards,
Mike

On Fri, Aug 24, 2012 at 12:13 PM, Bhaskar V. Karambelkar <
bhaskarvk@gmail.com> wrote:

> oops, this is just the same Hadoop's FileSystem.close() shutdown hook
> issue. I was getting the exception no matter whether I had 1 HDFS sink or
> more.
> I was using hadoop vanilla 1.0.3, and looks like that one doesn't respect
> the fs.automatic.close option.
> Switched to CDH3u5, and no more problems, all the HDFS sinks correctly
> rename the file on shutdown.
>
> In conclusion, the vanilla hadoop 1.x series is not an option for flume.
>
> Go with Hadoop 2.x or CDH3u5, CDH4
>
> thanks
> Bhaskar
>
>
> On Thu, Aug 23, 2012 at 9:35 PM, Mike Percy <mp...@apache.org> wrote:
>
>> Hmm... this likely happens because Hadoop statically caches the
>> FileSystem object, so as it turns out, the multiple Sinks are sharing the
>> same FileSystem objects.
>>
>> I think the only reason we need to explicitly close the FileSystem
>> objects is to support the deleteOnExit feature. We are explicitly closing
>> them because we removed the automatic shutdown hook typically installed by
>> Hadoop to invoke FileSystem.close(), since it was interfering with the .tmp
>> rolling. I wonder if we can get away with never closing them in our
>> case... I'm not sure if we need the deleteOnExit() functionality implicitly
>> for any reason, or if there are other more important reasons behind why the
>> FileSystem objects should be closed.
>>
>> Regards,
>> Mike
>>
>>
>> On Thu, Aug 23, 2012 at 3:24 PM, Bhaskar V. Karambelkar <
>> bhaskarvk@gmail.com> wrote:
>>
>>> I have 3 HDFS sinks all writing to the same namenode, but different
>>> paths.
>>>
>>> e.g. sink1 = hdfs://namenode/path1
>>> sink2 = hdfs://namenode/path2
>>> etc.
>>>
>>> When flume is shutdown (kill <flume-pid>), the file for the first sink
>>> is closed correctly and renamed to remove the .tmp extension
>>> but the second file's closing throws the following exception and the
>>> file's .tmp extension is also not removed.
>>> I see this happening very consistently, for 1+ HDFS sinks, only the
>>> first one is closed properly and renamed, the rest all throw exception
>>> when being closed, and are not renamed to remove the .tmp extension.
>>>
>>> 2012-08-23 19:51:39,837 WARN hdfs.BucketWriter: failed to close()
>>> HDFSWriter for file
>>> (hdfs://hadoop-namnode:9000/flume/avro/2012/08/23/l/avro_.1345751470750.tmp).
>>> Exception follows.
>>> java.io.IOException: Filesystem closed
>>>     at org.apache.hadoop.hdfs.DFSClient.checkOpen(DFSClient.java:264)
>>>     at org.apache.hadoop.hdfs.DFSClient.access$1100(DFSClient.java:74)
>>>     at
>>> org.apache.hadoop.hdfs.DFSClient$DFSOutputStream.sync(DFSClient.java:3667)
>>>     at
>>> org.apache.hadoop.fs.FSDataOutputStream.sync(FSDataOutputStream.java:97)
>>>     at
>>> org.apache.flume.sink.hdfs.HDFSDataStream.close(HDFSDataStream.java:103)
>>>     at
>>> org.apache.flume.sink.hdfs.BucketWriter.doClose(BucketWriter.java:250)
>>>     at
>>> org.apache.flume.sink.hdfs.BucketWriter.access$400(BucketWriter.java:48)
>>>     at
>>> org.apache.flume.sink.hdfs.BucketWriter$3.run(BucketWriter.java:236)
>>>     at
>>> org.apache.flume.sink.hdfs.BucketWriter$3.run(BucketWriter.java:233)
>>>     at
>>> org.apache.flume.sink.hdfs.BucketWriter.runPrivileged(BucketWriter.java:125)
>>>     at
>>> org.apache.flume.sink.hdfs.BucketWriter.close(BucketWriter.java:233)
>>>     at
>>> org.apache.flume.sink.hdfs.HDFSEventSink$3.call(HDFSEventSink.java:747)
>>>     at
>>> org.apache.flume.sink.hdfs.HDFSEventSink$3.call(HDFSEventSink.java:744)
>>>     at java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:303)
>>>     at java.util.concurrent.FutureTask.run(FutureTask.java:138)
>>>     at
>>> java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)
>>>     at
>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)
>>>     at java.lang.Thread.run(Thread.java:662)
>>> 2012-08-23 19:51:39,840 WARN hdfs.HDFSEventSink: Exception while closing
>>> hdfs://hadoop-namenode:9000/flume/avro/2012/08/23/avro_. Exception follows.
>>> java.io.IOException: Filesystem closed
>>>     at org.apache.hadoop.hdfs.DFSClient.checkOpen(DFSClient.java:264)
>>>     at org.apache.hadoop.hdfs.DFSClient.getFileInfo(DFSClient.java:873)
>>>     at
>>> org.apache.hadoop.hdfs.DistributedFileSystem.getFileStatus(DistributedFileSystem.java:513)
>>>     at org.apache.hadoop.fs.FileSystem.exists(FileSystem.java:768)
>>>     at
>>> org.apache.flume.sink.hdfs.BucketWriter.renameBucket(BucketWriter.java:369)
>>>     at
>>> org.apache.flume.sink.hdfs.BucketWriter.doClose(BucketWriter.java:269)
>>>     at
>>> org.apache.flume.sink.hdfs.BucketWriter.access$400(BucketWriter.java:48)
>>>     at
>>> org.apache.flume.sink.hdfs.BucketWriter$3.run(BucketWriter.java:236)
>>>     at
>>> org.apache.flume.sink.hdfs.BucketWriter$3.run(BucketWriter.java:233)
>>>     at
>>> org.apache.flume.sink.hdfs.BucketWriter.runPrivileged(BucketWriter.java:1125)
>>>     at
>>> org.apache.flume.sink.hdfs.BucketWriter.close(BucketWriter.java:233)
>>>     at
>>> org.apache.flume.sink.hdfs.HDFSEventSink$3.call(HDFSEventSink.java:747)
>>>     at
>>> org.apache.flume.sink.hdfs.HDFSEventSink$3.call(HDFSEventSink.java:744)
>>>     at java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:303)
>>>     at java.util.concurrent.FutureTask.run(FutureTask.java:138)
>>>     at
>>> java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)
>>>     at
>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)
>>>     at java.lang.Thread.run(Thread.java:662)
>>>
>>>
>>
>

Re: At flume shutdown only one HDFSSink is closed properly

Posted by "Bhaskar V. Karambelkar" <bh...@gmail.com>.
oops, this is just the same Hadoop's FileSystem.close() shutdown hook
issue. I was getting the exception no matter whether I had 1 HDFS sink or
more.
I was using hadoop vanilla 1.0.3, and looks like that one doesn't respect
the fs.automatic.close option.
Switched to CDH3u5, and no more problems, all the HDFS sinks correctly
rename the file on shutdown.

In conclusion, the vanilla hadoop 1.x series is not an option for flume.

Go with Hadoop 2.x or CDH3u5, CDH4

thanks
Bhaskar

On Thu, Aug 23, 2012 at 9:35 PM, Mike Percy <mp...@apache.org> wrote:

> Hmm... this likely happens because Hadoop statically caches the FileSystem
> object, so as it turns out, the multiple Sinks are sharing the same
> FileSystem objects.
>
> I think the only reason we need to explicitly close the FileSystem objects
> is to support the deleteOnExit feature. We are explicitly closing them
> because we removed the automatic shutdown hook typically installed by
> Hadoop to invoke FileSystem.close(), since it was interfering with the .tmp
> rolling. I wonder if we can get away with never closing them in our
> case... I'm not sure if we need the deleteOnExit() functionality implicitly
> for any reason, or if there are other more important reasons behind why the
> FileSystem objects should be closed.
>
> Regards,
> Mike
>
>
> On Thu, Aug 23, 2012 at 3:24 PM, Bhaskar V. Karambelkar <
> bhaskarvk@gmail.com> wrote:
>
>> I have 3 HDFS sinks all writing to the same namenode, but different paths.
>>
>> e.g. sink1 = hdfs://namenode/path1
>> sink2 = hdfs://namenode/path2
>> etc.
>>
>> When flume is shutdown (kill <flume-pid>), the file for the first sink is
>> closed correctly and renamed to remove the .tmp extension
>> but the second file's closing throws the following exception and the
>> file's .tmp extension is also not removed.
>> I see this happening very consistently, for 1+ HDFS sinks, only the first
>> one is closed properly and renamed, the rest all throw exception
>> when being closed, and are not renamed to remove the .tmp extension.
>>
>> 2012-08-23 19:51:39,837 WARN hdfs.BucketWriter: failed to close()
>> HDFSWriter for file
>> (hdfs://hadoop-namnode:9000/flume/avro/2012/08/23/l/avro_.1345751470750.tmp).
>> Exception follows.
>> java.io.IOException: Filesystem closed
>>     at org.apache.hadoop.hdfs.DFSClient.checkOpen(DFSClient.java:264)
>>     at org.apache.hadoop.hdfs.DFSClient.access$1100(DFSClient.java:74)
>>     at
>> org.apache.hadoop.hdfs.DFSClient$DFSOutputStream.sync(DFSClient.java:3667)
>>     at
>> org.apache.hadoop.fs.FSDataOutputStream.sync(FSDataOutputStream.java:97)
>>     at
>> org.apache.flume.sink.hdfs.HDFSDataStream.close(HDFSDataStream.java:103)
>>     at
>> org.apache.flume.sink.hdfs.BucketWriter.doClose(BucketWriter.java:250)
>>     at
>> org.apache.flume.sink.hdfs.BucketWriter.access$400(BucketWriter.java:48)
>>     at
>> org.apache.flume.sink.hdfs.BucketWriter$3.run(BucketWriter.java:236)
>>     at
>> org.apache.flume.sink.hdfs.BucketWriter$3.run(BucketWriter.java:233)
>>     at
>> org.apache.flume.sink.hdfs.BucketWriter.runPrivileged(BucketWriter.java:125)
>>     at
>> org.apache.flume.sink.hdfs.BucketWriter.close(BucketWriter.java:233)
>>     at
>> org.apache.flume.sink.hdfs.HDFSEventSink$3.call(HDFSEventSink.java:747)
>>     at
>> org.apache.flume.sink.hdfs.HDFSEventSink$3.call(HDFSEventSink.java:744)
>>     at java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:303)
>>     at java.util.concurrent.FutureTask.run(FutureTask.java:138)
>>     at
>> java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)
>>     at
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)
>>     at java.lang.Thread.run(Thread.java:662)
>> 2012-08-23 19:51:39,840 WARN hdfs.HDFSEventSink: Exception while closing
>> hdfs://hadoop-namenode:9000/flume/avro/2012/08/23/avro_. Exception follows.
>> java.io.IOException: Filesystem closed
>>     at org.apache.hadoop.hdfs.DFSClient.checkOpen(DFSClient.java:264)
>>     at org.apache.hadoop.hdfs.DFSClient.getFileInfo(DFSClient.java:873)
>>     at
>> org.apache.hadoop.hdfs.DistributedFileSystem.getFileStatus(DistributedFileSystem.java:513)
>>     at org.apache.hadoop.fs.FileSystem.exists(FileSystem.java:768)
>>     at
>> org.apache.flume.sink.hdfs.BucketWriter.renameBucket(BucketWriter.java:369)
>>     at
>> org.apache.flume.sink.hdfs.BucketWriter.doClose(BucketWriter.java:269)
>>     at
>> org.apache.flume.sink.hdfs.BucketWriter.access$400(BucketWriter.java:48)
>>     at
>> org.apache.flume.sink.hdfs.BucketWriter$3.run(BucketWriter.java:236)
>>     at
>> org.apache.flume.sink.hdfs.BucketWriter$3.run(BucketWriter.java:233)
>>     at
>> org.apache.flume.sink.hdfs.BucketWriter.runPrivileged(BucketWriter.java:1125)
>>     at
>> org.apache.flume.sink.hdfs.BucketWriter.close(BucketWriter.java:233)
>>     at
>> org.apache.flume.sink.hdfs.HDFSEventSink$3.call(HDFSEventSink.java:747)
>>     at
>> org.apache.flume.sink.hdfs.HDFSEventSink$3.call(HDFSEventSink.java:744)
>>     at java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:303)
>>     at java.util.concurrent.FutureTask.run(FutureTask.java:138)
>>     at
>> java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)
>>     at
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)
>>     at java.lang.Thread.run(Thread.java:662)
>>
>>
>

Re: At flume shutdown only one HDFSSink is closed properly

Posted by Mike Percy <mp...@apache.org>.
Hmm... this likely happens because Hadoop statically caches the FileSystem
object, so as it turns out, the multiple Sinks are sharing the same
FileSystem objects.

I think the only reason we need to explicitly close the FileSystem objects
is to support the deleteOnExit feature. We are explicitly closing them
because we removed the automatic shutdown hook typically installed by
Hadoop to invoke FileSystem.close(), since it was interfering with the .tmp
rolling. I wonder if we can get away with never closing them in our
case... I'm not sure if we need the deleteOnExit() functionality implicitly
for any reason, or if there are other more important reasons behind why the
FileSystem objects should be closed.

Regards,
Mike

On Thu, Aug 23, 2012 at 3:24 PM, Bhaskar V. Karambelkar <bhaskarvk@gmail.com
> wrote:

> I have 3 HDFS sinks all writing to the same namenode, but different paths.
>
> e.g. sink1 = hdfs://namenode/path1
> sink2 = hdfs://namenode/path2
> etc.
>
> When flume is shutdown (kill <flume-pid>), the file for the first sink is
> closed correctly and renamed to remove the .tmp extension
> but the second file's closing throws the following exception and the
> file's .tmp extension is also not removed.
> I see this happening very consistently, for 1+ HDFS sinks, only the first
> one is closed properly and renamed, the rest all throw exception
> when being closed, and are not renamed to remove the .tmp extension.
>
> 2012-08-23 19:51:39,837 WARN hdfs.BucketWriter: failed to close()
> HDFSWriter for file
> (hdfs://hadoop-namnode:9000/flume/avro/2012/08/23/l/avro_.1345751470750.tmp).
> Exception follows.
> java.io.IOException: Filesystem closed
>     at org.apache.hadoop.hdfs.DFSClient.checkOpen(DFSClient.java:264)
>     at org.apache.hadoop.hdfs.DFSClient.access$1100(DFSClient.java:74)
>     at
> org.apache.hadoop.hdfs.DFSClient$DFSOutputStream.sync(DFSClient.java:3667)
>     at
> org.apache.hadoop.fs.FSDataOutputStream.sync(FSDataOutputStream.java:97)
>     at
> org.apache.flume.sink.hdfs.HDFSDataStream.close(HDFSDataStream.java:103)
>     at
> org.apache.flume.sink.hdfs.BucketWriter.doClose(BucketWriter.java:250)
>     at
> org.apache.flume.sink.hdfs.BucketWriter.access$400(BucketWriter.java:48)
>     at org.apache.flume.sink.hdfs.BucketWriter$3.run(BucketWriter.java:236)
>     at org.apache.flume.sink.hdfs.BucketWriter$3.run(BucketWriter.java:233)
>     at
> org.apache.flume.sink.hdfs.BucketWriter.runPrivileged(BucketWriter.java:125)
>     at org.apache.flume.sink.hdfs.BucketWriter.close(BucketWriter.java:233)
>     at
> org.apache.flume.sink.hdfs.HDFSEventSink$3.call(HDFSEventSink.java:747)
>     at
> org.apache.flume.sink.hdfs.HDFSEventSink$3.call(HDFSEventSink.java:744)
>     at java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:303)
>     at java.util.concurrent.FutureTask.run(FutureTask.java:138)
>     at
> java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)
>     at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)
>     at java.lang.Thread.run(Thread.java:662)
> 2012-08-23 19:51:39,840 WARN hdfs.HDFSEventSink: Exception while closing
> hdfs://hadoop-namenode:9000/flume/avro/2012/08/23/avro_. Exception follows.
> java.io.IOException: Filesystem closed
>     at org.apache.hadoop.hdfs.DFSClient.checkOpen(DFSClient.java:264)
>     at org.apache.hadoop.hdfs.DFSClient.getFileInfo(DFSClient.java:873)
>     at
> org.apache.hadoop.hdfs.DistributedFileSystem.getFileStatus(DistributedFileSystem.java:513)
>     at org.apache.hadoop.fs.FileSystem.exists(FileSystem.java:768)
>     at
> org.apache.flume.sink.hdfs.BucketWriter.renameBucket(BucketWriter.java:369)
>     at
> org.apache.flume.sink.hdfs.BucketWriter.doClose(BucketWriter.java:269)
>     at
> org.apache.flume.sink.hdfs.BucketWriter.access$400(BucketWriter.java:48)
>     at org.apache.flume.sink.hdfs.BucketWriter$3.run(BucketWriter.java:236)
>     at org.apache.flume.sink.hdfs.BucketWriter$3.run(BucketWriter.java:233)
>     at
> org.apache.flume.sink.hdfs.BucketWriter.runPrivileged(BucketWriter.java:1125)
>     at org.apache.flume.sink.hdfs.BucketWriter.close(BucketWriter.java:233)
>     at
> org.apache.flume.sink.hdfs.HDFSEventSink$3.call(HDFSEventSink.java:747)
>     at
> org.apache.flume.sink.hdfs.HDFSEventSink$3.call(HDFSEventSink.java:744)
>     at java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:303)
>     at java.util.concurrent.FutureTask.run(FutureTask.java:138)
>     at
> java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)
>     at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)
>     at java.lang.Thread.run(Thread.java:662)
>
>