You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flume.apache.org by Harish Mandala <mv...@gmail.com> on 2012/09/25 00:01:04 UTC

HDFS Event Sink problems

Hello,


I’m having some trouble with the HDFS Event Sink. I’m using the latest
version of flume NG, checked out today.


I am using curloader to hit “MycustomSource”, which essentially takes in
HTTP messages, and splits the content into 2 “kinds” of flume events
(differentiated by header key-value). The first kind is sent to hdfs-sink1,
and the second kind to hdfs-sink2 by a multiplexing selector as outlined in
the configuration below. There’s also an hdfs-sink3 which can be ignored at
present.

I can’t really understand what’s going on. It seems related to some of the
race condition issues outlined here:

https://issues.apache.org/jira/browse/FLUME-1219


Please let me know if you need more information.


The following is my conf file. It is followed by flume.log.


#### flume.conf ####

agent1.channels = ch1 ch2 ch3

agent1.sources = mycustom-source1

agent1.sinks = hdfs-sink1 hdfs-sink2 hdfs-sink3

# Define a memory channel called ch1 on agent1

agent1.channels.ch1.type = memory

agent1.channels.ch1.capacity = 200000

agent1.channels.ch1.transactionCapacity = 20000

agent1.channels.ch2.type = memory

agent1.channels.ch2.capacity = 1000000

agent1.channels.ch2.transactionCapacity = 100000

agent1.channels.ch3.type = memory

agent1.channels.ch3.capacity = 10000

agent1.channels.ch3.transactionCapacity = 5000



#agent1.channels.ch2.type = memory

#agent1.channels.ch3.type = memory



# Define an Mycustom custom source called mycustom-source1 on agent1 and
tell it

# to bind to 0.0.0.0:41414. Connect it to channel ch1.

agent1.sources.mycustom-source1.channels = ch1 ch2 ch3

agent1.sources.mycustom-source1.type =
org.apache.flume.source.MycustomSource

agent1.sources.mycustom-source1.bind = 127.0.0.1

agent1.sources.mycustom-source1.port = 1234

agent1.sources.mycustom-source1.serialization_method = json

#agent1.sources.mycustom-source1.schema_filepath =
/home/ubuntu/Software/flume/trunk/conf/AvroEventSchema.avpr



# Define an HDFS sink

agent1.sinks.hdfs-sink1.channel = ch1

agent1.sinks.hdfs-sink1.type = hdfs

agent1.sinks.hdfs-sink1.hdfs.path = hdfs://localhost:54310/user/flumeDump1

agent1.sinks.hdfs-sink1.hdfs.filePrefix = events

agent1.sinks.hdfs-sink1.hdfs.batchSize = 20000

agent1.sinks.hdfs-sink1.hdfs.fileType = DataStream

agent1.sinks.hdfs-sink1.hdfs.writeFormat = Text

agent1.sinks.hdfs-sink1.hdfs.maxOpenFiles = 10000

agent1.sinks.hdfs-sink1.hdfs.rollSize = 0

agent1.sinks.hdfs-sink1.hdfs.rollInterval = 0

agent1.sinks.hdfs-sink1.hdfs.rollCount = 20000

agent1.sinks.hdfs-sink1.hdfs.hdfs.threadsPoolSize = 20



agent1.sinks.hdfs-sink2.channel = ch2

agent1.sinks.hdfs-sink2.type = hdfs

agent1.sinks.hdfs-sink2.hdfs.path = hdfs://localhost:54310/user/flumeDump2

agent1.sinks.hdfs-sink2.hdfs.filePrefix = events

agent1.sinks.hdfs-sink2.hdfs.batchSize = 100000

agent1.sinks.hdfs-sink2.hdfs.fileType = DataStream

agent1.sinks.hdfs-sink2.hdfs.writeFormat = Text

agent1.sinks.hdfs-sink2.hdfs.maxOpenFiles = 10000

agent1.sinks.hdfs-sink2.hdfs.rollSize = 0

agent1.sinks.hdfs-sink2.hdfs.rollInterval = 0

agent1.sinks.hdfs-sink2.hdfs.rollCount = 100000

agent1.sinks.hdfs-sink2.hdfs.hdfs.threadsPoolSize = 20



agent1.sinks.hdfs-sink3.channel = ch3

agent1.sinks.hdfs-sink3.type = hdfs

agent1.sinks.hdfs-sink3.hdfs.path = hdfs://localhost:54310/user/flumeDump3

agent1.sinks.hdfs-sink3.hdfs.filePrefix = events

agent1.sinks.hdfs-sink3.hdfs.batchSize = 1000

agent1.sinks.hdfs-sink3.hdfs.fileType = DataStream

agent1.sinks.hdfs-sink3.hdfs.writeFormat = Text

agent1.sinks.hdfs-sink3.hdfs.maxOpenFiles = 10000

agent1.sinks.hdfs-sink3.hdfs.rollSize = 0

agent1.sinks.hdfs-sink3.hdfs.rollInterval = 0

agent1.sinks.hdfs-sink3.hdfs.rollCount = 1000

agent1.sinks.hdfs-sink3.hdfs.hdfs.threadsPoolSize = 20



agent1.sources.mycustom-source1.selector.type = multiplexing

agent1.sources.mycustom-source1.selector.header = Type

agent1.sources.mycustom-source1.selector.mapping.type1 = ch1

agent1.sources.mycustom-source1.selector.mapping.type2 = ch2

agent1.sources.mycustom-source1.selector.mapping.type3 = ch3

agent1.sources.mycustom-source1.selector.default = ch1



#### end of conf file ####



Here are  the errors from flume.log.


24 Sep 2012 21:32:13,569 WARN  [SinkRunner-PollingRunner-DefaultSinkProcessor]
(org.apache.flume.sink.hdfs.HDFSEventSink.callWithTimeout:366)  -
Unexpected Exception null

java.lang.InterruptedException

               at
java.util.concurrent.locks.AbstractQueuedSynchronizer.tryAcquireSharedNanos(AbstractQueuedSynchronizer.java:1325)

               at
java.util.concurrent.FutureTask$Sync.innerGet(FutureTask.java:257)

               at java.util.concurrent.FutureTask.get(FutureTask.java:119)

               at
org.apache.flume.sink.hdfs.HDFSEventSink.callWithTimeout(HDFSEventSink.java:339)

               at
org.apache.flume.sink.hdfs.HDFSEventSink.flush(HDFSEventSink.java:732)

               at
org.apache.flume.sink.hdfs.HDFSEventSink.process(HDFSEventSink.java:430)

               at
org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:68)

               at
org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:147)

               at java.lang.Thread.run(Thread.java:679)

24 Sep 2012 21:32:13,572 ERROR
[SinkRunner-PollingRunner-DefaultSinkProcessor]
(org.apache.flume.sink.hdfs.HDFSEventSink.process:450)  - process failed

java.lang.InterruptedException

               at
java.util.concurrent.locks.AbstractQueuedSynchronizer.tryAcquireSharedNanos(AbstractQueuedSynchronizer.java:1325)

               at
java.util.concurrent.FutureTask$Sync.innerGet(FutureTask.java:257)

               at java.util.concurrent.FutureTask.get(FutureTask.java:119)

               at
org.apache.flume.sink.hdfs.HDFSEventSink.callWithTimeout(HDFSEventSink.java:339)

               at
org.apache.flume.sink.hdfs.HDFSEventSink.flush(HDFSEventSink.java:732)

               at
org.apache.flume.sink.hdfs.HDFSEventSink.process(HDFSEventSink.java:430)

               at
org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:68)

               at
org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:147)

               at java.lang.Thread.run(Thread.java:679)

24 Sep 2012 21:32:13,572 ERROR
[SinkRunner-PollingRunner-DefaultSinkProcessor]
(org.apache.flume.SinkRunner$PollingRunner.run:160)  - Unable to deliver
event. Exception follows.

org.apache.flume.EventDeliveryException: java.lang.InterruptedException

               at
org.apache.flume.sink.hdfs.HDFSEventSink.process(HDFSEventSink.java:454)

               at
org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:68)

               at
org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:147)

               at java.lang.Thread.run(Thread.java:679)

Caused by: java.lang.InterruptedException

               at
java.util.concurrent.locks.AbstractQueuedSynchronizer.tryAcquireSharedNanos(AbstractQueuedSynchronizer.java:1325)

               at
java.util.concurrent.FutureTask$Sync.innerGet(FutureTask.java:257)

               at java.util.concurrent.FutureTask.get(FutureTask.java:119)

               at
org.apache.flume.sink.hdfs.HDFSEventSink.callWithTimeout(HDFSEventSink.java:339)

               at
org.apache.flume.sink.hdfs.HDFSEventSink.flush(HDFSEventSink.java:732)

               at
org.apache.flume.sink.hdfs.HDFSEventSink.process(HDFSEventSink.java:430)

               ... 3 more

24 Sep 2012 21:32:16,350 WARN  [SinkRunner-PollingRunner-DefaultSinkProcessor]
(org.apache.flume.sink.hdfs.HDFSEventSink.process:446)  - HDFS IO error

java.io.IOException: Filesystem closed

               at
org.apache.hadoop.hdfs.DFSClient.checkOpen(DFSClient.java:264)

               at
org.apache.hadoop.hdfs.DFSClient.access$1200(DFSClient.java:74)

               at
org.apache.hadoop.hdfs.DFSClient$DFSOutputStream.sync(DFSClient.java:3664)

               at
org.apache.hadoop.fs.FSDataOutputStream.sync(FSDataOutputStream.java:97)

               at
org.apache.flume.sink.hdfs.HDFSDataStream.sync(HDFSDataStream.java:95)

               at
org.apache.flume.sink.hdfs.BucketWriter.doFlush(BucketWriter.java:298)

               at
org.apache.flume.sink.hdfs.BucketWriter.access$500(BucketWriter.java:50)

               at
org.apache.flume.sink.hdfs.BucketWriter$4.run(BucketWriter.java:287)

               at
org.apache.flume.sink.hdfs.BucketWriter$4.run(BucketWriter.java:284)

               at
org.apache.flume.sink.hdfs.BucketWriter.runPrivileged(BucketWriter.java:127)

               at
org.apache.flume.sink.hdfs.BucketWriter.flush(BucketWriter.java:284)

               at
org.apache.flume.sink.hdfs.HDFSEventSink$2.call(HDFSEventSink.java:735)

               at
org.apache.flume.sink.hdfs.HDFSEventSink$2.call(HDFSEventSink.java:732)

               at
java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:334)

               at java.util.concurrent.FutureTask.run(FutureTask.java:166)

               at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1110)

               at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:603)

               at java.lang.Thread.run(Thread.java:679)

24 Sep 2012 21:32:18,573 INFO  [node-shutdownHook]
(org.apache.flume.sink.hdfs.HDFSEventSink.stop:465)  - Closing
hdfs://localhost:54310/user/flumeDump2//events

24 Sep 2012 21:32:18,575 WARN  [hdfs-hdfs-sink2-call-runner-5]
(org.apache.flume.sink.hdfs.BucketWriter.doClose:259)  - failed to close()
HDFSWriter for file
(hdfs://localhost:54310/user/flumeDump2//events.1348522332892.tmp).
Exception follows.

java.io.IOException: Filesystem closed

               at
org.apache.hadoop.hdfs.DFSClient.checkOpen(DFSClient.java:264)

               at
org.apache.hadoop.hdfs.DFSClient.access$1200(DFSClient.java:74)

               at
org.apache.hadoop.hdfs.DFSClient$DFSOutputStream.sync(DFSClient.java:3664)

               at
org.apache.hadoop.fs.FSDataOutputStream.sync(FSDataOutputStream.java:97)

               at
org.apache.flume.sink.hdfs.HDFSDataStream.close(HDFSDataStream.java:103)

               at
org.apache.flume.sink.hdfs.BucketWriter.doClose(BucketWriter.java:256)

               at
org.apache.flume.sink.hdfs.BucketWriter.access$400(BucketWriter.java:50)

               at
org.apache.flume.sink.hdfs.BucketWriter$3.run(BucketWriter.java:242)

               at
org.apache.flume.sink.hdfs.BucketWriter$3.run(BucketWriter.java:239)

               at
org.apache.flume.sink.hdfs.BucketWriter.runPrivileged(BucketWriter.java:127)

               at
org.apache.flume.sink.hdfs.BucketWriter.close(BucketWriter.java:239)

               at
org.apache.flume.sink.hdfs.HDFSEventSink$3.call(HDFSEventSink.java:750)

               at
org.apache.flume.sink.hdfs.HDFSEventSink$3.call(HDFSEventSink.java:747)

               at
java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:334)

               at java.util.concurrent.FutureTask.run(FutureTask.java:166)

               at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1110)

               at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:603)

               at java.lang.Thread.run(Thread.java:679)

24 Sep 2012 21:32:18,576 WARN  [node-shutdownHook]
(org.apache.flume.sink.hdfs.HDFSEventSink.stop:470)  - Exception while
closing hdfs://localhost:54310/user/flumeDump2//events. Exception follows.

java.io.IOException: Filesystem closed

               at
org.apache.hadoop.hdfs.DFSClient.checkOpen(DFSClient.java:264)

               at
org.apache.hadoop.hdfs.DFSClient.getFileInfo(DFSClient.java:873)

               at
org.apache.hadoop.hdfs.DistributedFileSystem.getFileStatus(DistributedFileSystem.java:513)

               at
org.apache.hadoop.fs.FileSystem.exists(FileSystem.java:768)

               at
org.apache.flume.sink.hdfs.BucketWriter.renameBucket(BucketWriter.java:375)

               at
org.apache.flume.sink.hdfs.BucketWriter.doClose(BucketWriter.java:275)

               at
org.apache.flume.sink.hdfs.BucketWriter.access$400(BucketWriter.java:50)

               at
org.apache.flume.sink.hdfs.BucketWriter$3.run(BucketWriter.java:242)

               at
org.apache.flume.sink.hdfs.BucketWriter$3.run(BucketWriter.java:239)

               at
org.apache.flume.sink.hdfs.BucketWriter.runPrivileged(BucketWriter.java:127)

               at
org.apache.flume.sink.hdfs.BucketWriter.close(BucketWriter.java:239)

               at
org.apache.flume.sink.hdfs.HDFSEventSink$3.call(HDFSEventSink.java:750)

               at
org.apache.flume.sink.hdfs.HDFSEventSink$3.call(HDFSEventSink.java:747)

               at
java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:334)

               at java.util.concurrent.FutureTask.run(FutureTask.java:166)

               at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1110)

               at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:603)

               at java.lang.Thread.run(Thread.java:679)

24 Sep 2012 21:32:18,589 INFO  [node-shutdownHook]
(org.apache.flume.instrumentation.MonitoredCounterGroup.stop:87)  -
Component type: SINK, name: hdfs-sink2 stopped

24 Sep 2012 21:32:18,590 INFO  [node-shutdownHook]
(org.apache.flume.node.nodemanager.DefaultLogicalNodeManager.stopAllComponents:82)
- Stopping Sink hdfs-sink1

24 Sep 2012 21:32:18,590 INFO  [lifecycleSupervisor-1-4]
(org.apache.flume.lifecycle.LifecycleSupervisor$MonitorRunnable.run:215)  -
Component has already been stopped SinkRunner: {
policy:org.apache.flume.sink.DefaultSinkProcessor@49dc423f counterGroup:{
name:null counters:{runner.backoffs.consecutive=4, runner.backoffs=4,
runner.deliveryErrors=1} } }

24 Sep 2012 21:32:18,591 INFO  [node-shutdownHook]
(org.apache.flume.lifecycle.LifecycleSupervisor.unsupervise:156)  -
Stopping component: SinkRunner: {
policy:org.apache.flume.sink.DefaultSinkProcessor@1b815bfb counterGroup:{
name:null counters:{runner.backoffs.consecutive=5, runner.backoffs=5} } }

24 Sep 2012 21:32:18,592 INFO  [node-shutdownHook]
(org.apache.flume.sink.hdfs.HDFSEventSink.stop:465)  - Closing
hdfs://localhost:54310/user/flumeDump1//events

24 Sep 2012 21:32:18,594 WARN  [hdfs-hdfs-sink1-call-runner-3]
(org.apache.flume.sink.hdfs.BucketWriter.doClose:259)  - failed to close()
HDFSWriter for file
(hdfs://localhost:54310/user/flumeDump1//events.1348522332892.tmp).
Exception follows.

java.io.IOException: Filesystem closed

               at
org.apache.hadoop.hdfs.DFSClient.checkOpen(DFSClient.java:264)

               at
org.apache.hadoop.hdfs.DFSClient.access$1200(DFSClient.java:74)

               at
org.apache.hadoop.hdfs.DFSClient$DFSOutputStream.sync(DFSClient.java:3664)

               at
org.apache.hadoop.fs.FSDataOutputStream.sync(FSDataOutputStream.java:97)

               at
org.apache.flume.sink.hdfs.HDFSDataStream.close(HDFSDataStream.java:103)

               at
org.apache.flume.sink.hdfs.BucketWriter.doClose(BucketWriter.java:256)

               at
org.apache.flume.sink.hdfs.BucketWriter.access$400(BucketWriter.java:50)

               at
org.apache.flume.sink.hdfs.BucketWriter$3.run(BucketWriter.java:242)

               at
org.apache.flume.sink.hdfs.BucketWriter$3.run(BucketWriter.java:239)

               at
org.apache.flume.sink.hdfs.BucketWriter.runPrivileged(BucketWriter.java:127)

               at
org.apache.flume.sink.hdfs.BucketWriter.close(BucketWriter.java:239)

               at
org.apache.flume.sink.hdfs.HDFSEventSink$3.call(HDFSEventSink.java:750)

               at
org.apache.flume.sink.hdfs.HDFSEventSink$3.call(HDFSEventSink.java:747)

               at
java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:334)

               at java.util.concurrent.FutureTask.run(FutureTask.java:166)

               at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1110)

               at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:603)

               at java.lang.Thread.run(Thread.java:679)

24 Sep 2012 21:32:18,595 WARN  [node-shutdownHook]
(org.apache.flume.sink.hdfs.HDFSEventSink.stop:470)  - Exception while
closing hdfs://localhost:54310/user/flumeDump1//events. Exception follows.

java.io.IOException: Filesystem closed

               at
org.apache.hadoop.hdfs.DFSClient.checkOpen(DFSClient.java:264)

               at
org.apache.hadoop.hdfs.DFSClient.getFileInfo(DFSClient.java:873)

               at
org.apache.hadoop.hdfs.DistributedFileSystem.getFileStatus(DistributedFileSystem.java:513)

               at
org.apache.hadoop.fs.FileSystem.exists(FileSystem.java:768)

               at
org.apache.flume.sink.hdfs.BucketWriter.renameBucket(BucketWriter.java:375)

               at
org.apache.flume.sink.hdfs.BucketWriter.doClose(BucketWriter.java:275)

               at
org.apache.flume.sink.hdfs.BucketWriter.access$400(BucketWriter.java:50)

               at
org.apache.flume.sink.hdfs.BucketWriter$3.run(BucketWriter.java:242)

               at
org.apache.flume.sink.hdfs.BucketWriter$3.run(BucketWriter.java:239)

               at
org.apache.flume.sink.hdfs.BucketWriter.runPrivileged(BucketWriter.java:127)

               at
org.apache.flume.sink.hdfs.BucketWriter.close(BucketWriter.java:239)

               at
org.apache.flume.sink.hdfs.HDFSEventSink$3.call(HDFSEventSink.java:750)

               at
org.apache.flume.sink.hdfs.HDFSEventSink$3.call(HDFSEventSink.java:747)

               at
java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:334)

               at java.util.concurrent.FutureTask.run(FutureTask.java:166)

               at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1110)

               at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:603)

               at java.lang.Thread.run(Thread.java:679)

24 Sep 2012 21:32:18,600 INFO  [node-shutdownHook]
(org.apache.flume.instrumentation.MonitoredCounterGroup.stop:87)  -
Component type: SINK, name: hdfs-sink1 stopped

24 Sep 2012 21:32:18,600 INFO  [node-shutdownHook]
(org.apache.flume.node.nodemanager.DefaultLogicalNodeManager.stopAllComponents:92)
- Stopping Channel ch3

24 Sep 2012 21:32:18,601 INFO  [node-shutdownHook]
(org.apache.flume.lifecycle.LifecycleSupervisor.unsupervise:156)  -
Stopping component: org.apache.flume.channel.MemoryChannel{name: ch3}

24 Sep 2012 21:32:18,601 INFO  [node-shutdownHook]
(org.apache.flume.instrumentation.MonitoredCounterGroup.stop:87)  -
Component type: CHANNEL, name: ch3 stopped

24 Sep 2012 21:32:18,601 INFO  [node-shutdownHook]
(org.apache.flume.node.nodemanager.DefaultLogicalNodeManager.stopAllComponents:92)
- Stopping Channel ch2

24 Sep 2012 21:32:18,601 INFO  [node-shutdownHook]
(org.apache.flume.lifecycle.LifecycleSupervisor.unsupervise:156)  -
Stopping component: org.apache.flume.channel.MemoryChannel{name: ch2}

24 Sep 2012 21:32:18,601 INFO  [node-shutdownHook]
(org.apache.flume.instrumentation.MonitoredCounterGroup.stop:87)  -
Component type: CHANNEL, name: ch2 stopped

24 Sep 2012 21:32:18,601 INFO  [node-shutdownHook]
(org.apache.flume.node.nodemanager.DefaultLogicalNodeManager.stopAllComponents:92)
- Stopping Channel ch1

24 Sep 2012 21:32:18,601 INFO  [node-shutdownHook]
(org.apache.flume.lifecycle.LifecycleSupervisor.unsupervise:156)  -
Stopping component: org.apache.flume.channel.MemoryChannel{name: ch1}

24 Sep 2012 21:32:18,602 INFO  [node-shutdownHook]
(org.apache.flume.instrumentation.MonitoredCounterGroup.stop:87)  -
Component type: CHANNEL, name: ch1 stopped

24 Sep 2012 21:32:18,602 INFO  [node-shutdownHook]
(org.apache.flume.lifecycle.LifecycleSupervisor.stop:78)  - Stopping
lifecycle supervisor 8

24 Sep 2012 21:32:18,604 INFO  [node-shutdownHook]
(org.apache.flume.conf.file.AbstractFileConfigurationProvider.stop:91)  -
Configuration provider stopping


Thanks,

Harish

Re: HDFS Event Sink problems

Posted by Harish Mandala <mv...@gmail.com>.
Thanks for the patch. Yes it could be. I found some config file tweaks
which seemed to stop the issue from manifesting. I didn't look at it too
deeply, I intend to return to it later.

-Harish

On Tue, Sep 25, 2012 at 5:00 PM, Mike Percy <mp...@apache.org> wrote:

> Harish,
> What did you find on your side? Could it be related to
> https://issues.apache.org/jira/browse/FLUME-1610 ? I am looking at that
> issue right now.
>
> Regards,
> Mike
>
>
> On Tue, Sep 25, 2012 at 12:17 PM, Harish Mandala <mv...@gmail.com>wrote:
>
>> Thanks, but I understood why this is happening.
>>
>> On Mon, Sep 24, 2012 at 6:01 PM, Harish Mandala <mv...@gmail.com>wrote:
>>
>>> Hello,
>>>
>>>
>>> I’m having some trouble with the HDFS Event Sink. I’m using the latest
>>> version of flume NG, checked out today.
>>>
>>>
>>> I am using curloader to hit “MycustomSource”, which essentially takes in
>>> HTTP messages, and splits the content into 2 “kinds” of flume events
>>> (differentiated by header key-value). The first kind is sent to hdfs-sink1,
>>> and the second kind to hdfs-sink2 by a multiplexing selector as outlined in
>>> the configuration below. There’s also an hdfs-sink3 which can be ignored at
>>> present.
>>>
>>> I can’t really understand what’s going on. It seems related to some of
>>> the race condition issues outlined here:
>>>
>>> https://issues.apache.org/jira/browse/FLUME-1219
>>>
>>>
>>> Please let me know if you need more information.
>>>
>>>
>>> The following is my conf file. It is followed by flume.log.
>>>
>>>
>>> #### flume.conf ####
>>>
>>> agent1.channels = ch1 ch2 ch3
>>>
>>> agent1.sources = mycustom-source1
>>>
>>> agent1.sinks = hdfs-sink1 hdfs-sink2 hdfs-sink3
>>>
>>> # Define a memory channel called ch1 on agent1
>>>
>>> agent1.channels.ch1.type = memory
>>>
>>> agent1.channels.ch1.capacity = 200000
>>>
>>> agent1.channels.ch1.transactionCapacity = 20000
>>>
>>> agent1.channels.ch2.type = memory
>>>
>>> agent1.channels.ch2.capacity = 1000000
>>>
>>> agent1.channels.ch2.transactionCapacity = 100000
>>>
>>> agent1.channels.ch3.type = memory
>>>
>>> agent1.channels.ch3.capacity = 10000
>>>
>>> agent1.channels.ch3.transactionCapacity = 5000
>>>
>>>
>>>
>>> #agent1.channels.ch2.type = memory
>>>
>>> #agent1.channels.ch3.type = memory
>>>
>>>
>>>
>>> # Define an Mycustom custom source called mycustom-source1 on agent1 and
>>> tell it
>>>
>>> # to bind to 0.0.0.0:41414. Connect it to channel ch1.
>>>
>>> agent1.sources.mycustom-source1.channels = ch1 ch2 ch3
>>>
>>> agent1.sources.mycustom-source1.type =
>>> org.apache.flume.source.MycustomSource
>>>
>>> agent1.sources.mycustom-source1.bind = 127.0.0.1
>>>
>>> agent1.sources.mycustom-source1.port = 1234
>>>
>>> agent1.sources.mycustom-source1.serialization_method = json
>>>
>>> #agent1.sources.mycustom-source1.schema_filepath =
>>> /home/ubuntu/Software/flume/trunk/conf/AvroEventSchema.avpr
>>>
>>>
>>>
>>> # Define an HDFS sink
>>>
>>> agent1.sinks.hdfs-sink1.channel = ch1
>>>
>>> agent1.sinks.hdfs-sink1.type = hdfs
>>>
>>> agent1.sinks.hdfs-sink1.hdfs.path =
>>> hdfs://localhost:54310/user/flumeDump1
>>>
>>> agent1.sinks.hdfs-sink1.hdfs.filePrefix = events
>>>
>>> agent1.sinks.hdfs-sink1.hdfs.batchSize = 20000
>>>
>>> agent1.sinks.hdfs-sink1.hdfs.fileType = DataStream
>>>
>>> agent1.sinks.hdfs-sink1.hdfs.writeFormat = Text
>>>
>>> agent1.sinks.hdfs-sink1.hdfs.maxOpenFiles = 10000
>>>
>>> agent1.sinks.hdfs-sink1.hdfs.rollSize = 0
>>>
>>> agent1.sinks.hdfs-sink1.hdfs.rollInterval = 0
>>>
>>> agent1.sinks.hdfs-sink1.hdfs.rollCount = 20000
>>>
>>> agent1.sinks.hdfs-sink1.hdfs.hdfs.threadsPoolSize = 20
>>>
>>>
>>>
>>> agent1.sinks.hdfs-sink2.channel = ch2
>>>
>>> agent1.sinks.hdfs-sink2.type = hdfs
>>>
>>> agent1.sinks.hdfs-sink2.hdfs.path =
>>> hdfs://localhost:54310/user/flumeDump2
>>>
>>> agent1.sinks.hdfs-sink2.hdfs.filePrefix = events
>>>
>>> agent1.sinks.hdfs-sink2.hdfs.batchSize = 100000
>>>
>>> agent1.sinks.hdfs-sink2.hdfs.fileType = DataStream
>>>
>>> agent1.sinks.hdfs-sink2.hdfs.writeFormat = Text
>>>
>>> agent1.sinks.hdfs-sink2.hdfs.maxOpenFiles = 10000
>>>
>>> agent1.sinks.hdfs-sink2.hdfs.rollSize = 0
>>>
>>> agent1.sinks.hdfs-sink2.hdfs.rollInterval = 0
>>>
>>> agent1.sinks.hdfs-sink2.hdfs.rollCount = 100000
>>>
>>> agent1.sinks.hdfs-sink2.hdfs.hdfs.threadsPoolSize = 20
>>>
>>>
>>>
>>> agent1.sinks.hdfs-sink3.channel = ch3
>>>
>>> agent1.sinks.hdfs-sink3.type = hdfs
>>>
>>> agent1.sinks.hdfs-sink3.hdfs.path =
>>> hdfs://localhost:54310/user/flumeDump3
>>>
>>> agent1.sinks.hdfs-sink3.hdfs.filePrefix = events
>>>
>>> agent1.sinks.hdfs-sink3.hdfs.batchSize = 1000
>>>
>>> agent1.sinks.hdfs-sink3.hdfs.fileType = DataStream
>>>
>>> agent1.sinks.hdfs-sink3.hdfs.writeFormat = Text
>>>
>>> agent1.sinks.hdfs-sink3.hdfs.maxOpenFiles = 10000
>>>
>>> agent1.sinks.hdfs-sink3.hdfs.rollSize = 0
>>>
>>> agent1.sinks.hdfs-sink3.hdfs.rollInterval = 0
>>>
>>> agent1.sinks.hdfs-sink3.hdfs.rollCount = 1000
>>>
>>> agent1.sinks.hdfs-sink3.hdfs.hdfs.threadsPoolSize = 20
>>>
>>>
>>>
>>> agent1.sources.mycustom-source1.selector.type = multiplexing
>>>
>>> agent1.sources.mycustom-source1.selector.header = Type
>>>
>>> agent1.sources.mycustom-source1.selector.mapping.type1 = ch1
>>>
>>> agent1.sources.mycustom-source1.selector.mapping.type2 = ch2
>>>
>>> agent1.sources.mycustom-source1.selector.mapping.type3 = ch3
>>>
>>> agent1.sources.mycustom-source1.selector.default = ch1
>>>
>>>
>>>
>>> #### end of conf file ####
>>>
>>>
>>>
>>> Here are  the errors from flume.log.
>>>
>>>
>>> 24 Sep 2012 21:32:13,569 WARN  [SinkRunner-PollingRunner-DefaultSinkProcessor]
>>> (org.apache.flume.sink.hdfs.HDFSEventSink.callWithTimeout:366)  -
>>> Unexpected Exception null
>>>
>>> java.lang.InterruptedException
>>>
>>>                at
>>> java.util.concurrent.locks.AbstractQueuedSynchronizer.tryAcquireSharedNanos(AbstractQueuedSynchronizer.java:1325)
>>>
>>>                at
>>> java.util.concurrent.FutureTask$Sync.innerGet(FutureTask.java:257)
>>>
>>>                at
>>> java.util.concurrent.FutureTask.get(FutureTask.java:119)
>>>
>>>                at
>>> org.apache.flume.sink.hdfs.HDFSEventSink.callWithTimeout(HDFSEventSink.java:339)
>>>
>>>                at
>>> org.apache.flume.sink.hdfs.HDFSEventSink.flush(HDFSEventSink.java:732)
>>>
>>>                at
>>> org.apache.flume.sink.hdfs.HDFSEventSink.process(HDFSEventSink.java:430)
>>>
>>>                at
>>> org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:68)
>>>
>>>                at
>>> org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:147)
>>>
>>>                at java.lang.Thread.run(Thread.java:679)
>>>
>>> 24 Sep 2012 21:32:13,572 ERROR
>>> [SinkRunner-PollingRunner-DefaultSinkProcessor]
>>> (org.apache.flume.sink.hdfs.HDFSEventSink.process:450)  - process failed
>>>
>>> java.lang.InterruptedException
>>>
>>>                at
>>> java.util.concurrent.locks.AbstractQueuedSynchronizer.tryAcquireSharedNanos(AbstractQueuedSynchronizer.java:1325)
>>>
>>>                at
>>> java.util.concurrent.FutureTask$Sync.innerGet(FutureTask.java:257)
>>>
>>>                at
>>> java.util.concurrent.FutureTask.get(FutureTask.java:119)
>>>
>>>                at
>>> org.apache.flume.sink.hdfs.HDFSEventSink.callWithTimeout(HDFSEventSink.java:339)
>>>
>>>                at
>>> org.apache.flume.sink.hdfs.HDFSEventSink.flush(HDFSEventSink.java:732)
>>>
>>>                at
>>> org.apache.flume.sink.hdfs.HDFSEventSink.process(HDFSEventSink.java:430)
>>>
>>>                at
>>> org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:68)
>>>
>>>                at
>>> org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:147)
>>>
>>>                at java.lang.Thread.run(Thread.java:679)
>>>
>>> 24 Sep 2012 21:32:13,572 ERROR
>>> [SinkRunner-PollingRunner-DefaultSinkProcessor]
>>> (org.apache.flume.SinkRunner$PollingRunner.run:160)  - Unable to
>>> deliver event. Exception follows.
>>>
>>> org.apache.flume.EventDeliveryException: java.lang.InterruptedException
>>>
>>>                at
>>> org.apache.flume.sink.hdfs.HDFSEventSink.process(HDFSEventSink.java:454)
>>>
>>>                at
>>> org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:68)
>>>
>>>                at
>>> org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:147)
>>>
>>>                at java.lang.Thread.run(Thread.java:679)
>>>
>>> Caused by: java.lang.InterruptedException
>>>
>>>                at
>>> java.util.concurrent.locks.AbstractQueuedSynchronizer.tryAcquireSharedNanos(AbstractQueuedSynchronizer.java:1325)
>>>
>>>                at
>>> java.util.concurrent.FutureTask$Sync.innerGet(FutureTask.java:257)
>>>
>>>                at
>>> java.util.concurrent.FutureTask.get(FutureTask.java:119)
>>>
>>>                at
>>> org.apache.flume.sink.hdfs.HDFSEventSink.callWithTimeout(HDFSEventSink.java:339)
>>>
>>>                at
>>> org.apache.flume.sink.hdfs.HDFSEventSink.flush(HDFSEventSink.java:732)
>>>
>>>                at
>>> org.apache.flume.sink.hdfs.HDFSEventSink.process(HDFSEventSink.java:430)
>>>
>>>                ... 3 more
>>>
>>> 24 Sep 2012 21:32:16,350 WARN  [SinkRunner-PollingRunner-DefaultSinkProcessor]
>>> (org.apache.flume.sink.hdfs.HDFSEventSink.process:446)  - HDFS IO error
>>>
>>> java.io.IOException: Filesystem closed
>>>
>>>                at
>>> org.apache.hadoop.hdfs.DFSClient.checkOpen(DFSClient.java:264)
>>>
>>>                at
>>> org.apache.hadoop.hdfs.DFSClient.access$1200(DFSClient.java:74)
>>>
>>>                at
>>> org.apache.hadoop.hdfs.DFSClient$DFSOutputStream.sync(DFSClient.java:3664)
>>>
>>>                at
>>> org.apache.hadoop.fs.FSDataOutputStream.sync(FSDataOutputStream.java:97)
>>>
>>>                at
>>> org.apache.flume.sink.hdfs.HDFSDataStream.sync(HDFSDataStream.java:95)
>>>
>>>                at
>>> org.apache.flume.sink.hdfs.BucketWriter.doFlush(BucketWriter.java:298)
>>>
>>>                at
>>> org.apache.flume.sink.hdfs.BucketWriter.access$500(BucketWriter.java:50)
>>>
>>>                at
>>> org.apache.flume.sink.hdfs.BucketWriter$4.run(BucketWriter.java:287)
>>>
>>>                at
>>> org.apache.flume.sink.hdfs.BucketWriter$4.run(BucketWriter.java:284)
>>>
>>>                at
>>> org.apache.flume.sink.hdfs.BucketWriter.runPrivileged(BucketWriter.java:127)
>>>
>>>                at
>>> org.apache.flume.sink.hdfs.BucketWriter.flush(BucketWriter.java:284)
>>>
>>>                at
>>> org.apache.flume.sink.hdfs.HDFSEventSink$2.call(HDFSEventSink.java:735)
>>>
>>>                at
>>> org.apache.flume.sink.hdfs.HDFSEventSink$2.call(HDFSEventSink.java:732)
>>>
>>>                at
>>> java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:334)
>>>
>>>                at
>>> java.util.concurrent.FutureTask.run(FutureTask.java:166)
>>>
>>>                at
>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1110)
>>>
>>>                at
>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:603)
>>>
>>>                at java.lang.Thread.run(Thread.java:679)
>>>
>>> 24 Sep 2012 21:32:18,573 INFO  [node-shutdownHook]
>>> (org.apache.flume.sink.hdfs.HDFSEventSink.stop:465)  - Closing
>>> hdfs://localhost:54310/user/flumeDump2//events
>>>
>>> 24 Sep 2012 21:32:18,575 WARN  [hdfs-hdfs-sink2-call-runner-5]
>>> (org.apache.flume.sink.hdfs.BucketWriter.doClose:259)  - failed to
>>> close() HDFSWriter for file
>>> (hdfs://localhost:54310/user/flumeDump2//events.1348522332892.tmp).
>>> Exception follows.
>>>
>>> java.io.IOException: Filesystem closed
>>>
>>>                at
>>> org.apache.hadoop.hdfs.DFSClient.checkOpen(DFSClient.java:264)
>>>
>>>                at
>>> org.apache.hadoop.hdfs.DFSClient.access$1200(DFSClient.java:74)
>>>
>>>                at
>>> org.apache.hadoop.hdfs.DFSClient$DFSOutputStream.sync(DFSClient.java:3664)
>>>
>>>                at
>>> org.apache.hadoop.fs.FSDataOutputStream.sync(FSDataOutputStream.java:97)
>>>
>>>                at
>>> org.apache.flume.sink.hdfs.HDFSDataStream.close(HDFSDataStream.java:103)
>>>
>>>                at
>>> org.apache.flume.sink.hdfs.BucketWriter.doClose(BucketWriter.java:256)
>>>
>>>                at
>>> org.apache.flume.sink.hdfs.BucketWriter.access$400(BucketWriter.java:50)
>>>
>>>                at
>>> org.apache.flume.sink.hdfs.BucketWriter$3.run(BucketWriter.java:242)
>>>
>>>                at
>>> org.apache.flume.sink.hdfs.BucketWriter$3.run(BucketWriter.java:239)
>>>
>>>                at
>>> org.apache.flume.sink.hdfs.BucketWriter.runPrivileged(BucketWriter.java:127)
>>>
>>>                at
>>> org.apache.flume.sink.hdfs.BucketWriter.close(BucketWriter.java:239)
>>>
>>>                at
>>> org.apache.flume.sink.hdfs.HDFSEventSink$3.call(HDFSEventSink.java:750)
>>>
>>>                at
>>> org.apache.flume.sink.hdfs.HDFSEventSink$3.call(HDFSEventSink.java:747)
>>>
>>>                at
>>> java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:334)
>>>
>>>                at
>>> java.util.concurrent.FutureTask.run(FutureTask.java:166)
>>>
>>>                at
>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1110)
>>>
>>>                at
>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:603)
>>>
>>>                at java.lang.Thread.run(Thread.java:679)
>>>
>>> 24 Sep 2012 21:32:18,576 WARN  [node-shutdownHook]
>>> (org.apache.flume.sink.hdfs.HDFSEventSink.stop:470)  - Exception while
>>> closing hdfs://localhost:54310/user/flumeDump2//events. Exception follows.
>>>
>>> java.io.IOException: Filesystem closed
>>>
>>>                at
>>> org.apache.hadoop.hdfs.DFSClient.checkOpen(DFSClient.java:264)
>>>
>>>                at
>>> org.apache.hadoop.hdfs.DFSClient.getFileInfo(DFSClient.java:873)
>>>
>>>                at
>>> org.apache.hadoop.hdfs.DistributedFileSystem.getFileStatus(DistributedFileSystem.java:513)
>>>
>>>                at
>>> org.apache.hadoop.fs.FileSystem.exists(FileSystem.java:768)
>>>
>>>                at
>>> org.apache.flume.sink.hdfs.BucketWriter.renameBucket(BucketWriter.java:375)
>>>
>>>                at
>>> org.apache.flume.sink.hdfs.BucketWriter.doClose(BucketWriter.java:275)
>>>
>>>                at
>>> org.apache.flume.sink.hdfs.BucketWriter.access$400(BucketWriter.java:50)
>>>
>>>                at
>>> org.apache.flume.sink.hdfs.BucketWriter$3.run(BucketWriter.java:242)
>>>
>>>                at
>>> org.apache.flume.sink.hdfs.BucketWriter$3.run(BucketWriter.java:239)
>>>
>>>                at
>>> org.apache.flume.sink.hdfs.BucketWriter.runPrivileged(BucketWriter.java:127)
>>>
>>>                at
>>> org.apache.flume.sink.hdfs.BucketWriter.close(BucketWriter.java:239)
>>>
>>>                at
>>> org.apache.flume.sink.hdfs.HDFSEventSink$3.call(HDFSEventSink.java:750)
>>>
>>>                at
>>> org.apache.flume.sink.hdfs.HDFSEventSink$3.call(HDFSEventSink.java:747)
>>>
>>>                at
>>> java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:334)
>>>
>>>                at
>>> java.util.concurrent.FutureTask.run(FutureTask.java:166)
>>>
>>>                at
>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1110)
>>>
>>>                at
>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:603)
>>>
>>>                at java.lang.Thread.run(Thread.java:679)
>>>
>>> 24 Sep 2012 21:32:18,589 INFO  [node-shutdownHook]
>>> (org.apache.flume.instrumentation.MonitoredCounterGroup.stop:87)  -
>>> Component type: SINK, name: hdfs-sink2 stopped
>>>
>>> 24 Sep 2012 21:32:18,590 INFO  [node-shutdownHook]
>>> (org.apache.flume.node.nodemanager.DefaultLogicalNodeManager.stopAllComponents:82)
>>> - Stopping Sink hdfs-sink1
>>>
>>> 24 Sep 2012 21:32:18,590 INFO  [lifecycleSupervisor-1-4]
>>> (org.apache.flume.lifecycle.LifecycleSupervisor$MonitorRunnable.run:215)
>>> - Component has already been stopped SinkRunner: {
>>> policy:org.apache.flume.sink.DefaultSinkProcessor@49dc423fcounterGroup:{ name:null counters:{runner.backoffs.consecutive=4,
>>> runner.backoffs=4, runner.deliveryErrors=1} } }
>>>
>>> 24 Sep 2012 21:32:18,591 INFO  [node-shutdownHook]
>>> (org.apache.flume.lifecycle.LifecycleSupervisor.unsupervise:156)  -
>>> Stopping component: SinkRunner: {
>>> policy:org.apache.flume.sink.DefaultSinkProcessor@1b815bfbcounterGroup:{ name:null counters:{runner.backoffs.consecutive=5,
>>> runner.backoffs=5} } }
>>>
>>> 24 Sep 2012 21:32:18,592 INFO  [node-shutdownHook]
>>> (org.apache.flume.sink.hdfs.HDFSEventSink.stop:465)  - Closing
>>> hdfs://localhost:54310/user/flumeDump1//events
>>>
>>> 24 Sep 2012 21:32:18,594 WARN  [hdfs-hdfs-sink1-call-runner-3]
>>> (org.apache.flume.sink.hdfs.BucketWriter.doClose:259)  - failed to
>>> close() HDFSWriter for file
>>> (hdfs://localhost:54310/user/flumeDump1//events.1348522332892.tmp).
>>> Exception follows.
>>>
>>> java.io.IOException: Filesystem closed
>>>
>>>                at
>>> org.apache.hadoop.hdfs.DFSClient.checkOpen(DFSClient.java:264)
>>>
>>>                at
>>> org.apache.hadoop.hdfs.DFSClient.access$1200(DFSClient.java:74)
>>>
>>>                at
>>> org.apache.hadoop.hdfs.DFSClient$DFSOutputStream.sync(DFSClient.java:3664)
>>>
>>>                at
>>> org.apache.hadoop.fs.FSDataOutputStream.sync(FSDataOutputStream.java:97)
>>>
>>>                at
>>> org.apache.flume.sink.hdfs.HDFSDataStream.close(HDFSDataStream.java:103)
>>>
>>>                at
>>> org.apache.flume.sink.hdfs.BucketWriter.doClose(BucketWriter.java:256)
>>>
>>>                at
>>> org.apache.flume.sink.hdfs.BucketWriter.access$400(BucketWriter.java:50)
>>>
>>>                at
>>> org.apache.flume.sink.hdfs.BucketWriter$3.run(BucketWriter.java:242)
>>>
>>>                at
>>> org.apache.flume.sink.hdfs.BucketWriter$3.run(BucketWriter.java:239)
>>>
>>>                at
>>> org.apache.flume.sink.hdfs.BucketWriter.runPrivileged(BucketWriter.java:127)
>>>
>>>                at
>>> org.apache.flume.sink.hdfs.BucketWriter.close(BucketWriter.java:239)
>>>
>>>                at
>>> org.apache.flume.sink.hdfs.HDFSEventSink$3.call(HDFSEventSink.java:750)
>>>
>>>                at
>>> org.apache.flume.sink.hdfs.HDFSEventSink$3.call(HDFSEventSink.java:747)
>>>
>>>                at
>>> java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:334)
>>>
>>>                at
>>> java.util.concurrent.FutureTask.run(FutureTask.java:166)
>>>
>>>                at
>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1110)
>>>
>>>                at
>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:603)
>>>
>>>                at java.lang.Thread.run(Thread.java:679)
>>>
>>> 24 Sep 2012 21:32:18,595 WARN  [node-shutdownHook]
>>> (org.apache.flume.sink.hdfs.HDFSEventSink.stop:470)  - Exception while
>>> closing hdfs://localhost:54310/user/flumeDump1//events. Exception follows.
>>>
>>> java.io.IOException: Filesystem closed
>>>
>>>                at
>>> org.apache.hadoop.hdfs.DFSClient.checkOpen(DFSClient.java:264)
>>>
>>>                at
>>> org.apache.hadoop.hdfs.DFSClient.getFileInfo(DFSClient.java:873)
>>>
>>>                at
>>> org.apache.hadoop.hdfs.DistributedFileSystem.getFileStatus(DistributedFileSystem.java:513)
>>>
>>>                at
>>> org.apache.hadoop.fs.FileSystem.exists(FileSystem.java:768)
>>>
>>>                at
>>> org.apache.flume.sink.hdfs.BucketWriter.renameBucket(BucketWriter.java:375)
>>>
>>>                at
>>> org.apache.flume.sink.hdfs.BucketWriter.doClose(BucketWriter.java:275)
>>>
>>>                at
>>> org.apache.flume.sink.hdfs.BucketWriter.access$400(BucketWriter.java:50)
>>>
>>>                at
>>> org.apache.flume.sink.hdfs.BucketWriter$3.run(BucketWriter.java:242)
>>>
>>>                at
>>> org.apache.flume.sink.hdfs.BucketWriter$3.run(BucketWriter.java:239)
>>>
>>>                at
>>> org.apache.flume.sink.hdfs.BucketWriter.runPrivileged(BucketWriter.java:127)
>>>
>>>                at
>>> org.apache.flume.sink.hdfs.BucketWriter.close(BucketWriter.java:239)
>>>
>>>                at
>>> org.apache.flume.sink.hdfs.HDFSEventSink$3.call(HDFSEventSink.java:750)
>>>
>>>                at
>>> org.apache.flume.sink.hdfs.HDFSEventSink$3.call(HDFSEventSink.java:747)
>>>
>>>                at
>>> java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:334)
>>>
>>>                at
>>> java.util.concurrent.FutureTask.run(FutureTask.java:166)
>>>
>>>                at
>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1110)
>>>
>>>                at
>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:603)
>>>
>>>                at java.lang.Thread.run(Thread.java:679)
>>>
>>> 24 Sep 2012 21:32:18,600 INFO  [node-shutdownHook]
>>> (org.apache.flume.instrumentation.MonitoredCounterGroup.stop:87)  -
>>> Component type: SINK, name: hdfs-sink1 stopped
>>>
>>> 24 Sep 2012 21:32:18,600 INFO  [node-shutdownHook]
>>> (org.apache.flume.node.nodemanager.DefaultLogicalNodeManager.stopAllComponents:92)
>>> - Stopping Channel ch3
>>>
>>> 24 Sep 2012 21:32:18,601 INFO  [node-shutdownHook]
>>> (org.apache.flume.lifecycle.LifecycleSupervisor.unsupervise:156)  -
>>> Stopping component: org.apache.flume.channel.MemoryChannel{name: ch3}
>>>
>>> 24 Sep 2012 21:32:18,601 INFO  [node-shutdownHook]
>>> (org.apache.flume.instrumentation.MonitoredCounterGroup.stop:87)  -
>>> Component type: CHANNEL, name: ch3 stopped
>>>
>>> 24 Sep 2012 21:32:18,601 INFO  [node-shutdownHook]
>>> (org.apache.flume.node.nodemanager.DefaultLogicalNodeManager.stopAllComponents:92)
>>> - Stopping Channel ch2
>>>
>>> 24 Sep 2012 21:32:18,601 INFO  [node-shutdownHook]
>>> (org.apache.flume.lifecycle.LifecycleSupervisor.unsupervise:156)  -
>>> Stopping component: org.apache.flume.channel.MemoryChannel{name: ch2}
>>>
>>> 24 Sep 2012 21:32:18,601 INFO  [node-shutdownHook]
>>> (org.apache.flume.instrumentation.MonitoredCounterGroup.stop:87)  -
>>> Component type: CHANNEL, name: ch2 stopped
>>>
>>> 24 Sep 2012 21:32:18,601 INFO  [node-shutdownHook]
>>> (org.apache.flume.node.nodemanager.DefaultLogicalNodeManager.stopAllComponents:92)
>>> - Stopping Channel ch1
>>>
>>> 24 Sep 2012 21:32:18,601 INFO  [node-shutdownHook]
>>> (org.apache.flume.lifecycle.LifecycleSupervisor.unsupervise:156)  -
>>> Stopping component: org.apache.flume.channel.MemoryChannel{name: ch1}
>>>
>>> 24 Sep 2012 21:32:18,602 INFO  [node-shutdownHook]
>>> (org.apache.flume.instrumentation.MonitoredCounterGroup.stop:87)  -
>>> Component type: CHANNEL, name: ch1 stopped
>>>
>>> 24 Sep 2012 21:32:18,602 INFO  [node-shutdownHook]
>>> (org.apache.flume.lifecycle.LifecycleSupervisor.stop:78)  - Stopping
>>> lifecycle supervisor 8
>>>
>>> 24 Sep 2012 21:32:18,604 INFO  [node-shutdownHook]
>>> (org.apache.flume.conf.file.AbstractFileConfigurationProvider.stop:91)  -
>>> Configuration provider stopping
>>>
>>>
>>> Thanks,
>>>
>>> Harish
>>>
>>
>>
>

Re: HDFS Event Sink problems

Posted by Mike Percy <mp...@apache.org>.
Harish,
What did you find on your side? Could it be related to
https://issues.apache.org/jira/browse/FLUME-1610 ? I am looking at that
issue right now.

Regards,
Mike

On Tue, Sep 25, 2012 at 12:17 PM, Harish Mandala <mv...@gmail.com>wrote:

> Thanks, but I understood why this is happening.
>
> On Mon, Sep 24, 2012 at 6:01 PM, Harish Mandala <mv...@gmail.com>wrote:
>
>> Hello,
>>
>>
>> I’m having some trouble with the HDFS Event Sink. I’m using the latest
>> version of flume NG, checked out today.
>>
>>
>> I am using curloader to hit “MycustomSource”, which essentially takes in
>> HTTP messages, and splits the content into 2 “kinds” of flume events
>> (differentiated by header key-value). The first kind is sent to hdfs-sink1,
>> and the second kind to hdfs-sink2 by a multiplexing selector as outlined in
>> the configuration below. There’s also an hdfs-sink3 which can be ignored at
>> present.
>>
>> I can’t really understand what’s going on. It seems related to some of
>> the race condition issues outlined here:
>>
>> https://issues.apache.org/jira/browse/FLUME-1219
>>
>>
>> Please let me know if you need more information.
>>
>>
>> The following is my conf file. It is followed by flume.log.
>>
>>
>> #### flume.conf ####
>>
>> agent1.channels = ch1 ch2 ch3
>>
>> agent1.sources = mycustom-source1
>>
>> agent1.sinks = hdfs-sink1 hdfs-sink2 hdfs-sink3
>>
>> # Define a memory channel called ch1 on agent1
>>
>> agent1.channels.ch1.type = memory
>>
>> agent1.channels.ch1.capacity = 200000
>>
>> agent1.channels.ch1.transactionCapacity = 20000
>>
>> agent1.channels.ch2.type = memory
>>
>> agent1.channels.ch2.capacity = 1000000
>>
>> agent1.channels.ch2.transactionCapacity = 100000
>>
>> agent1.channels.ch3.type = memory
>>
>> agent1.channels.ch3.capacity = 10000
>>
>> agent1.channels.ch3.transactionCapacity = 5000
>>
>>
>>
>> #agent1.channels.ch2.type = memory
>>
>> #agent1.channels.ch3.type = memory
>>
>>
>>
>> # Define an Mycustom custom source called mycustom-source1 on agent1 and
>> tell it
>>
>> # to bind to 0.0.0.0:41414. Connect it to channel ch1.
>>
>> agent1.sources.mycustom-source1.channels = ch1 ch2 ch3
>>
>> agent1.sources.mycustom-source1.type =
>> org.apache.flume.source.MycustomSource
>>
>> agent1.sources.mycustom-source1.bind = 127.0.0.1
>>
>> agent1.sources.mycustom-source1.port = 1234
>>
>> agent1.sources.mycustom-source1.serialization_method = json
>>
>> #agent1.sources.mycustom-source1.schema_filepath =
>> /home/ubuntu/Software/flume/trunk/conf/AvroEventSchema.avpr
>>
>>
>>
>> # Define an HDFS sink
>>
>> agent1.sinks.hdfs-sink1.channel = ch1
>>
>> agent1.sinks.hdfs-sink1.type = hdfs
>>
>> agent1.sinks.hdfs-sink1.hdfs.path = hdfs://localhost:54310/user/flumeDump1
>>
>> agent1.sinks.hdfs-sink1.hdfs.filePrefix = events
>>
>> agent1.sinks.hdfs-sink1.hdfs.batchSize = 20000
>>
>> agent1.sinks.hdfs-sink1.hdfs.fileType = DataStream
>>
>> agent1.sinks.hdfs-sink1.hdfs.writeFormat = Text
>>
>> agent1.sinks.hdfs-sink1.hdfs.maxOpenFiles = 10000
>>
>> agent1.sinks.hdfs-sink1.hdfs.rollSize = 0
>>
>> agent1.sinks.hdfs-sink1.hdfs.rollInterval = 0
>>
>> agent1.sinks.hdfs-sink1.hdfs.rollCount = 20000
>>
>> agent1.sinks.hdfs-sink1.hdfs.hdfs.threadsPoolSize = 20
>>
>>
>>
>> agent1.sinks.hdfs-sink2.channel = ch2
>>
>> agent1.sinks.hdfs-sink2.type = hdfs
>>
>> agent1.sinks.hdfs-sink2.hdfs.path = hdfs://localhost:54310/user/flumeDump2
>>
>> agent1.sinks.hdfs-sink2.hdfs.filePrefix = events
>>
>> agent1.sinks.hdfs-sink2.hdfs.batchSize = 100000
>>
>> agent1.sinks.hdfs-sink2.hdfs.fileType = DataStream
>>
>> agent1.sinks.hdfs-sink2.hdfs.writeFormat = Text
>>
>> agent1.sinks.hdfs-sink2.hdfs.maxOpenFiles = 10000
>>
>> agent1.sinks.hdfs-sink2.hdfs.rollSize = 0
>>
>> agent1.sinks.hdfs-sink2.hdfs.rollInterval = 0
>>
>> agent1.sinks.hdfs-sink2.hdfs.rollCount = 100000
>>
>> agent1.sinks.hdfs-sink2.hdfs.hdfs.threadsPoolSize = 20
>>
>>
>>
>> agent1.sinks.hdfs-sink3.channel = ch3
>>
>> agent1.sinks.hdfs-sink3.type = hdfs
>>
>> agent1.sinks.hdfs-sink3.hdfs.path = hdfs://localhost:54310/user/flumeDump3
>>
>> agent1.sinks.hdfs-sink3.hdfs.filePrefix = events
>>
>> agent1.sinks.hdfs-sink3.hdfs.batchSize = 1000
>>
>> agent1.sinks.hdfs-sink3.hdfs.fileType = DataStream
>>
>> agent1.sinks.hdfs-sink3.hdfs.writeFormat = Text
>>
>> agent1.sinks.hdfs-sink3.hdfs.maxOpenFiles = 10000
>>
>> agent1.sinks.hdfs-sink3.hdfs.rollSize = 0
>>
>> agent1.sinks.hdfs-sink3.hdfs.rollInterval = 0
>>
>> agent1.sinks.hdfs-sink3.hdfs.rollCount = 1000
>>
>> agent1.sinks.hdfs-sink3.hdfs.hdfs.threadsPoolSize = 20
>>
>>
>>
>> agent1.sources.mycustom-source1.selector.type = multiplexing
>>
>> agent1.sources.mycustom-source1.selector.header = Type
>>
>> agent1.sources.mycustom-source1.selector.mapping.type1 = ch1
>>
>> agent1.sources.mycustom-source1.selector.mapping.type2 = ch2
>>
>> agent1.sources.mycustom-source1.selector.mapping.type3 = ch3
>>
>> agent1.sources.mycustom-source1.selector.default = ch1
>>
>>
>>
>> #### end of conf file ####
>>
>>
>>
>> Here are  the errors from flume.log.
>>
>>
>> 24 Sep 2012 21:32:13,569 WARN  [SinkRunner-PollingRunner-DefaultSinkProcessor]
>> (org.apache.flume.sink.hdfs.HDFSEventSink.callWithTimeout:366)  -
>> Unexpected Exception null
>>
>> java.lang.InterruptedException
>>
>>                at
>> java.util.concurrent.locks.AbstractQueuedSynchronizer.tryAcquireSharedNanos(AbstractQueuedSynchronizer.java:1325)
>>
>>                at
>> java.util.concurrent.FutureTask$Sync.innerGet(FutureTask.java:257)
>>
>>                at
>> java.util.concurrent.FutureTask.get(FutureTask.java:119)
>>
>>                at
>> org.apache.flume.sink.hdfs.HDFSEventSink.callWithTimeout(HDFSEventSink.java:339)
>>
>>                at
>> org.apache.flume.sink.hdfs.HDFSEventSink.flush(HDFSEventSink.java:732)
>>
>>                at
>> org.apache.flume.sink.hdfs.HDFSEventSink.process(HDFSEventSink.java:430)
>>
>>                at
>> org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:68)
>>
>>                at
>> org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:147)
>>
>>                at java.lang.Thread.run(Thread.java:679)
>>
>> 24 Sep 2012 21:32:13,572 ERROR
>> [SinkRunner-PollingRunner-DefaultSinkProcessor]
>> (org.apache.flume.sink.hdfs.HDFSEventSink.process:450)  - process failed
>>
>> java.lang.InterruptedException
>>
>>                at
>> java.util.concurrent.locks.AbstractQueuedSynchronizer.tryAcquireSharedNanos(AbstractQueuedSynchronizer.java:1325)
>>
>>                at
>> java.util.concurrent.FutureTask$Sync.innerGet(FutureTask.java:257)
>>
>>                at
>> java.util.concurrent.FutureTask.get(FutureTask.java:119)
>>
>>                at
>> org.apache.flume.sink.hdfs.HDFSEventSink.callWithTimeout(HDFSEventSink.java:339)
>>
>>                at
>> org.apache.flume.sink.hdfs.HDFSEventSink.flush(HDFSEventSink.java:732)
>>
>>                at
>> org.apache.flume.sink.hdfs.HDFSEventSink.process(HDFSEventSink.java:430)
>>
>>                at
>> org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:68)
>>
>>                at
>> org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:147)
>>
>>                at java.lang.Thread.run(Thread.java:679)
>>
>> 24 Sep 2012 21:32:13,572 ERROR
>> [SinkRunner-PollingRunner-DefaultSinkProcessor]
>> (org.apache.flume.SinkRunner$PollingRunner.run:160)  - Unable to deliver
>> event. Exception follows.
>>
>> org.apache.flume.EventDeliveryException: java.lang.InterruptedException
>>
>>                at
>> org.apache.flume.sink.hdfs.HDFSEventSink.process(HDFSEventSink.java:454)
>>
>>                at
>> org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:68)
>>
>>                at
>> org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:147)
>>
>>                at java.lang.Thread.run(Thread.java:679)
>>
>> Caused by: java.lang.InterruptedException
>>
>>                at
>> java.util.concurrent.locks.AbstractQueuedSynchronizer.tryAcquireSharedNanos(AbstractQueuedSynchronizer.java:1325)
>>
>>                at
>> java.util.concurrent.FutureTask$Sync.innerGet(FutureTask.java:257)
>>
>>                at
>> java.util.concurrent.FutureTask.get(FutureTask.java:119)
>>
>>                at
>> org.apache.flume.sink.hdfs.HDFSEventSink.callWithTimeout(HDFSEventSink.java:339)
>>
>>                at
>> org.apache.flume.sink.hdfs.HDFSEventSink.flush(HDFSEventSink.java:732)
>>
>>                at
>> org.apache.flume.sink.hdfs.HDFSEventSink.process(HDFSEventSink.java:430)
>>
>>                ... 3 more
>>
>> 24 Sep 2012 21:32:16,350 WARN  [SinkRunner-PollingRunner-DefaultSinkProcessor]
>> (org.apache.flume.sink.hdfs.HDFSEventSink.process:446)  - HDFS IO error
>>
>> java.io.IOException: Filesystem closed
>>
>>                at
>> org.apache.hadoop.hdfs.DFSClient.checkOpen(DFSClient.java:264)
>>
>>                at
>> org.apache.hadoop.hdfs.DFSClient.access$1200(DFSClient.java:74)
>>
>>                at
>> org.apache.hadoop.hdfs.DFSClient$DFSOutputStream.sync(DFSClient.java:3664)
>>
>>                at
>> org.apache.hadoop.fs.FSDataOutputStream.sync(FSDataOutputStream.java:97)
>>
>>                at
>> org.apache.flume.sink.hdfs.HDFSDataStream.sync(HDFSDataStream.java:95)
>>
>>                at
>> org.apache.flume.sink.hdfs.BucketWriter.doFlush(BucketWriter.java:298)
>>
>>                at
>> org.apache.flume.sink.hdfs.BucketWriter.access$500(BucketWriter.java:50)
>>
>>                at
>> org.apache.flume.sink.hdfs.BucketWriter$4.run(BucketWriter.java:287)
>>
>>                at
>> org.apache.flume.sink.hdfs.BucketWriter$4.run(BucketWriter.java:284)
>>
>>                at
>> org.apache.flume.sink.hdfs.BucketWriter.runPrivileged(BucketWriter.java:127)
>>
>>                at
>> org.apache.flume.sink.hdfs.BucketWriter.flush(BucketWriter.java:284)
>>
>>                at
>> org.apache.flume.sink.hdfs.HDFSEventSink$2.call(HDFSEventSink.java:735)
>>
>>                at
>> org.apache.flume.sink.hdfs.HDFSEventSink$2.call(HDFSEventSink.java:732)
>>
>>                at
>> java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:334)
>>
>>                at
>> java.util.concurrent.FutureTask.run(FutureTask.java:166)
>>
>>                at
>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1110)
>>
>>                at
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:603)
>>
>>                at java.lang.Thread.run(Thread.java:679)
>>
>> 24 Sep 2012 21:32:18,573 INFO  [node-shutdownHook]
>> (org.apache.flume.sink.hdfs.HDFSEventSink.stop:465)  - Closing
>> hdfs://localhost:54310/user/flumeDump2//events
>>
>> 24 Sep 2012 21:32:18,575 WARN  [hdfs-hdfs-sink2-call-runner-5]
>> (org.apache.flume.sink.hdfs.BucketWriter.doClose:259)  - failed to
>> close() HDFSWriter for file
>> (hdfs://localhost:54310/user/flumeDump2//events.1348522332892.tmp).
>> Exception follows.
>>
>> java.io.IOException: Filesystem closed
>>
>>                at
>> org.apache.hadoop.hdfs.DFSClient.checkOpen(DFSClient.java:264)
>>
>>                at
>> org.apache.hadoop.hdfs.DFSClient.access$1200(DFSClient.java:74)
>>
>>                at
>> org.apache.hadoop.hdfs.DFSClient$DFSOutputStream.sync(DFSClient.java:3664)
>>
>>                at
>> org.apache.hadoop.fs.FSDataOutputStream.sync(FSDataOutputStream.java:97)
>>
>>                at
>> org.apache.flume.sink.hdfs.HDFSDataStream.close(HDFSDataStream.java:103)
>>
>>                at
>> org.apache.flume.sink.hdfs.BucketWriter.doClose(BucketWriter.java:256)
>>
>>                at
>> org.apache.flume.sink.hdfs.BucketWriter.access$400(BucketWriter.java:50)
>>
>>                at
>> org.apache.flume.sink.hdfs.BucketWriter$3.run(BucketWriter.java:242)
>>
>>                at
>> org.apache.flume.sink.hdfs.BucketWriter$3.run(BucketWriter.java:239)
>>
>>                at
>> org.apache.flume.sink.hdfs.BucketWriter.runPrivileged(BucketWriter.java:127)
>>
>>                at
>> org.apache.flume.sink.hdfs.BucketWriter.close(BucketWriter.java:239)
>>
>>                at
>> org.apache.flume.sink.hdfs.HDFSEventSink$3.call(HDFSEventSink.java:750)
>>
>>                at
>> org.apache.flume.sink.hdfs.HDFSEventSink$3.call(HDFSEventSink.java:747)
>>
>>                at
>> java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:334)
>>
>>                at
>> java.util.concurrent.FutureTask.run(FutureTask.java:166)
>>
>>                at
>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1110)
>>
>>                at
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:603)
>>
>>                at java.lang.Thread.run(Thread.java:679)
>>
>> 24 Sep 2012 21:32:18,576 WARN  [node-shutdownHook]
>> (org.apache.flume.sink.hdfs.HDFSEventSink.stop:470)  - Exception while
>> closing hdfs://localhost:54310/user/flumeDump2//events. Exception follows.
>>
>> java.io.IOException: Filesystem closed
>>
>>                at
>> org.apache.hadoop.hdfs.DFSClient.checkOpen(DFSClient.java:264)
>>
>>                at
>> org.apache.hadoop.hdfs.DFSClient.getFileInfo(DFSClient.java:873)
>>
>>                at
>> org.apache.hadoop.hdfs.DistributedFileSystem.getFileStatus(DistributedFileSystem.java:513)
>>
>>                at
>> org.apache.hadoop.fs.FileSystem.exists(FileSystem.java:768)
>>
>>                at
>> org.apache.flume.sink.hdfs.BucketWriter.renameBucket(BucketWriter.java:375)
>>
>>                at
>> org.apache.flume.sink.hdfs.BucketWriter.doClose(BucketWriter.java:275)
>>
>>                at
>> org.apache.flume.sink.hdfs.BucketWriter.access$400(BucketWriter.java:50)
>>
>>                at
>> org.apache.flume.sink.hdfs.BucketWriter$3.run(BucketWriter.java:242)
>>
>>                at
>> org.apache.flume.sink.hdfs.BucketWriter$3.run(BucketWriter.java:239)
>>
>>                at
>> org.apache.flume.sink.hdfs.BucketWriter.runPrivileged(BucketWriter.java:127)
>>
>>                at
>> org.apache.flume.sink.hdfs.BucketWriter.close(BucketWriter.java:239)
>>
>>                at
>> org.apache.flume.sink.hdfs.HDFSEventSink$3.call(HDFSEventSink.java:750)
>>
>>                at
>> org.apache.flume.sink.hdfs.HDFSEventSink$3.call(HDFSEventSink.java:747)
>>
>>                at
>> java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:334)
>>
>>                at
>> java.util.concurrent.FutureTask.run(FutureTask.java:166)
>>
>>                at
>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1110)
>>
>>                at
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:603)
>>
>>                at java.lang.Thread.run(Thread.java:679)
>>
>> 24 Sep 2012 21:32:18,589 INFO  [node-shutdownHook]
>> (org.apache.flume.instrumentation.MonitoredCounterGroup.stop:87)  -
>> Component type: SINK, name: hdfs-sink2 stopped
>>
>> 24 Sep 2012 21:32:18,590 INFO  [node-shutdownHook]
>> (org.apache.flume.node.nodemanager.DefaultLogicalNodeManager.stopAllComponents:82)
>> - Stopping Sink hdfs-sink1
>>
>> 24 Sep 2012 21:32:18,590 INFO  [lifecycleSupervisor-1-4]
>> (org.apache.flume.lifecycle.LifecycleSupervisor$MonitorRunnable.run:215)
>> - Component has already been stopped SinkRunner: {
>> policy:org.apache.flume.sink.DefaultSinkProcessor@49dc423fcounterGroup:{ name:null counters:{runner.backoffs.consecutive=4,
>> runner.backoffs=4, runner.deliveryErrors=1} } }
>>
>> 24 Sep 2012 21:32:18,591 INFO  [node-shutdownHook]
>> (org.apache.flume.lifecycle.LifecycleSupervisor.unsupervise:156)  -
>> Stopping component: SinkRunner: {
>> policy:org.apache.flume.sink.DefaultSinkProcessor@1b815bfbcounterGroup:{ name:null counters:{runner.backoffs.consecutive=5,
>> runner.backoffs=5} } }
>>
>> 24 Sep 2012 21:32:18,592 INFO  [node-shutdownHook]
>> (org.apache.flume.sink.hdfs.HDFSEventSink.stop:465)  - Closing
>> hdfs://localhost:54310/user/flumeDump1//events
>>
>> 24 Sep 2012 21:32:18,594 WARN  [hdfs-hdfs-sink1-call-runner-3]
>> (org.apache.flume.sink.hdfs.BucketWriter.doClose:259)  - failed to
>> close() HDFSWriter for file
>> (hdfs://localhost:54310/user/flumeDump1//events.1348522332892.tmp).
>> Exception follows.
>>
>> java.io.IOException: Filesystem closed
>>
>>                at
>> org.apache.hadoop.hdfs.DFSClient.checkOpen(DFSClient.java:264)
>>
>>                at
>> org.apache.hadoop.hdfs.DFSClient.access$1200(DFSClient.java:74)
>>
>>                at
>> org.apache.hadoop.hdfs.DFSClient$DFSOutputStream.sync(DFSClient.java:3664)
>>
>>                at
>> org.apache.hadoop.fs.FSDataOutputStream.sync(FSDataOutputStream.java:97)
>>
>>                at
>> org.apache.flume.sink.hdfs.HDFSDataStream.close(HDFSDataStream.java:103)
>>
>>                at
>> org.apache.flume.sink.hdfs.BucketWriter.doClose(BucketWriter.java:256)
>>
>>                at
>> org.apache.flume.sink.hdfs.BucketWriter.access$400(BucketWriter.java:50)
>>
>>                at
>> org.apache.flume.sink.hdfs.BucketWriter$3.run(BucketWriter.java:242)
>>
>>                at
>> org.apache.flume.sink.hdfs.BucketWriter$3.run(BucketWriter.java:239)
>>
>>                at
>> org.apache.flume.sink.hdfs.BucketWriter.runPrivileged(BucketWriter.java:127)
>>
>>                at
>> org.apache.flume.sink.hdfs.BucketWriter.close(BucketWriter.java:239)
>>
>>                at
>> org.apache.flume.sink.hdfs.HDFSEventSink$3.call(HDFSEventSink.java:750)
>>
>>                at
>> org.apache.flume.sink.hdfs.HDFSEventSink$3.call(HDFSEventSink.java:747)
>>
>>                at
>> java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:334)
>>
>>                at
>> java.util.concurrent.FutureTask.run(FutureTask.java:166)
>>
>>                at
>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1110)
>>
>>                at
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:603)
>>
>>                at java.lang.Thread.run(Thread.java:679)
>>
>> 24 Sep 2012 21:32:18,595 WARN  [node-shutdownHook]
>> (org.apache.flume.sink.hdfs.HDFSEventSink.stop:470)  - Exception while
>> closing hdfs://localhost:54310/user/flumeDump1//events. Exception follows.
>>
>> java.io.IOException: Filesystem closed
>>
>>                at
>> org.apache.hadoop.hdfs.DFSClient.checkOpen(DFSClient.java:264)
>>
>>                at
>> org.apache.hadoop.hdfs.DFSClient.getFileInfo(DFSClient.java:873)
>>
>>                at
>> org.apache.hadoop.hdfs.DistributedFileSystem.getFileStatus(DistributedFileSystem.java:513)
>>
>>                at
>> org.apache.hadoop.fs.FileSystem.exists(FileSystem.java:768)
>>
>>                at
>> org.apache.flume.sink.hdfs.BucketWriter.renameBucket(BucketWriter.java:375)
>>
>>                at
>> org.apache.flume.sink.hdfs.BucketWriter.doClose(BucketWriter.java:275)
>>
>>                at
>> org.apache.flume.sink.hdfs.BucketWriter.access$400(BucketWriter.java:50)
>>
>>                at
>> org.apache.flume.sink.hdfs.BucketWriter$3.run(BucketWriter.java:242)
>>
>>                at
>> org.apache.flume.sink.hdfs.BucketWriter$3.run(BucketWriter.java:239)
>>
>>                at
>> org.apache.flume.sink.hdfs.BucketWriter.runPrivileged(BucketWriter.java:127)
>>
>>                at
>> org.apache.flume.sink.hdfs.BucketWriter.close(BucketWriter.java:239)
>>
>>                at
>> org.apache.flume.sink.hdfs.HDFSEventSink$3.call(HDFSEventSink.java:750)
>>
>>                at
>> org.apache.flume.sink.hdfs.HDFSEventSink$3.call(HDFSEventSink.java:747)
>>
>>                at
>> java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:334)
>>
>>                at
>> java.util.concurrent.FutureTask.run(FutureTask.java:166)
>>
>>                at
>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1110)
>>
>>                at
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:603)
>>
>>                at java.lang.Thread.run(Thread.java:679)
>>
>> 24 Sep 2012 21:32:18,600 INFO  [node-shutdownHook]
>> (org.apache.flume.instrumentation.MonitoredCounterGroup.stop:87)  -
>> Component type: SINK, name: hdfs-sink1 stopped
>>
>> 24 Sep 2012 21:32:18,600 INFO  [node-shutdownHook]
>> (org.apache.flume.node.nodemanager.DefaultLogicalNodeManager.stopAllComponents:92)
>> - Stopping Channel ch3
>>
>> 24 Sep 2012 21:32:18,601 INFO  [node-shutdownHook]
>> (org.apache.flume.lifecycle.LifecycleSupervisor.unsupervise:156)  -
>> Stopping component: org.apache.flume.channel.MemoryChannel{name: ch3}
>>
>> 24 Sep 2012 21:32:18,601 INFO  [node-shutdownHook]
>> (org.apache.flume.instrumentation.MonitoredCounterGroup.stop:87)  -
>> Component type: CHANNEL, name: ch3 stopped
>>
>> 24 Sep 2012 21:32:18,601 INFO  [node-shutdownHook]
>> (org.apache.flume.node.nodemanager.DefaultLogicalNodeManager.stopAllComponents:92)
>> - Stopping Channel ch2
>>
>> 24 Sep 2012 21:32:18,601 INFO  [node-shutdownHook]
>> (org.apache.flume.lifecycle.LifecycleSupervisor.unsupervise:156)  -
>> Stopping component: org.apache.flume.channel.MemoryChannel{name: ch2}
>>
>> 24 Sep 2012 21:32:18,601 INFO  [node-shutdownHook]
>> (org.apache.flume.instrumentation.MonitoredCounterGroup.stop:87)  -
>> Component type: CHANNEL, name: ch2 stopped
>>
>> 24 Sep 2012 21:32:18,601 INFO  [node-shutdownHook]
>> (org.apache.flume.node.nodemanager.DefaultLogicalNodeManager.stopAllComponents:92)
>> - Stopping Channel ch1
>>
>> 24 Sep 2012 21:32:18,601 INFO  [node-shutdownHook]
>> (org.apache.flume.lifecycle.LifecycleSupervisor.unsupervise:156)  -
>> Stopping component: org.apache.flume.channel.MemoryChannel{name: ch1}
>>
>> 24 Sep 2012 21:32:18,602 INFO  [node-shutdownHook]
>> (org.apache.flume.instrumentation.MonitoredCounterGroup.stop:87)  -
>> Component type: CHANNEL, name: ch1 stopped
>>
>> 24 Sep 2012 21:32:18,602 INFO  [node-shutdownHook]
>> (org.apache.flume.lifecycle.LifecycleSupervisor.stop:78)  - Stopping
>> lifecycle supervisor 8
>>
>> 24 Sep 2012 21:32:18,604 INFO  [node-shutdownHook]
>> (org.apache.flume.conf.file.AbstractFileConfigurationProvider.stop:91)  -
>> Configuration provider stopping
>>
>>
>> Thanks,
>>
>> Harish
>>
>
>

Re: HDFS Event Sink problems

Posted by Harish Mandala <mv...@gmail.com>.
Thanks, but I understood why this is happening.

On Mon, Sep 24, 2012 at 6:01 PM, Harish Mandala <mv...@gmail.com>wrote:

> Hello,
>
>
> I’m having some trouble with the HDFS Event Sink. I’m using the latest
> version of flume NG, checked out today.
>
>
> I am using curloader to hit “MycustomSource”, which essentially takes in
> HTTP messages, and splits the content into 2 “kinds” of flume events
> (differentiated by header key-value). The first kind is sent to hdfs-sink1,
> and the second kind to hdfs-sink2 by a multiplexing selector as outlined in
> the configuration below. There’s also an hdfs-sink3 which can be ignored at
> present.
>
> I can’t really understand what’s going on. It seems related to some of the
> race condition issues outlined here:
>
> https://issues.apache.org/jira/browse/FLUME-1219
>
>
> Please let me know if you need more information.
>
>
> The following is my conf file. It is followed by flume.log.
>
>
> #### flume.conf ####
>
> agent1.channels = ch1 ch2 ch3
>
> agent1.sources = mycustom-source1
>
> agent1.sinks = hdfs-sink1 hdfs-sink2 hdfs-sink3
>
> # Define a memory channel called ch1 on agent1
>
> agent1.channels.ch1.type = memory
>
> agent1.channels.ch1.capacity = 200000
>
> agent1.channels.ch1.transactionCapacity = 20000
>
> agent1.channels.ch2.type = memory
>
> agent1.channels.ch2.capacity = 1000000
>
> agent1.channels.ch2.transactionCapacity = 100000
>
> agent1.channels.ch3.type = memory
>
> agent1.channels.ch3.capacity = 10000
>
> agent1.channels.ch3.transactionCapacity = 5000
>
>
>
> #agent1.channels.ch2.type = memory
>
> #agent1.channels.ch3.type = memory
>
>
>
> # Define an Mycustom custom source called mycustom-source1 on agent1 and
> tell it
>
> # to bind to 0.0.0.0:41414. Connect it to channel ch1.
>
> agent1.sources.mycustom-source1.channels = ch1 ch2 ch3
>
> agent1.sources.mycustom-source1.type =
> org.apache.flume.source.MycustomSource
>
> agent1.sources.mycustom-source1.bind = 127.0.0.1
>
> agent1.sources.mycustom-source1.port = 1234
>
> agent1.sources.mycustom-source1.serialization_method = json
>
> #agent1.sources.mycustom-source1.schema_filepath =
> /home/ubuntu/Software/flume/trunk/conf/AvroEventSchema.avpr
>
>
>
> # Define an HDFS sink
>
> agent1.sinks.hdfs-sink1.channel = ch1
>
> agent1.sinks.hdfs-sink1.type = hdfs
>
> agent1.sinks.hdfs-sink1.hdfs.path = hdfs://localhost:54310/user/flumeDump1
>
> agent1.sinks.hdfs-sink1.hdfs.filePrefix = events
>
> agent1.sinks.hdfs-sink1.hdfs.batchSize = 20000
>
> agent1.sinks.hdfs-sink1.hdfs.fileType = DataStream
>
> agent1.sinks.hdfs-sink1.hdfs.writeFormat = Text
>
> agent1.sinks.hdfs-sink1.hdfs.maxOpenFiles = 10000
>
> agent1.sinks.hdfs-sink1.hdfs.rollSize = 0
>
> agent1.sinks.hdfs-sink1.hdfs.rollInterval = 0
>
> agent1.sinks.hdfs-sink1.hdfs.rollCount = 20000
>
> agent1.sinks.hdfs-sink1.hdfs.hdfs.threadsPoolSize = 20
>
>
>
> agent1.sinks.hdfs-sink2.channel = ch2
>
> agent1.sinks.hdfs-sink2.type = hdfs
>
> agent1.sinks.hdfs-sink2.hdfs.path = hdfs://localhost:54310/user/flumeDump2
>
> agent1.sinks.hdfs-sink2.hdfs.filePrefix = events
>
> agent1.sinks.hdfs-sink2.hdfs.batchSize = 100000
>
> agent1.sinks.hdfs-sink2.hdfs.fileType = DataStream
>
> agent1.sinks.hdfs-sink2.hdfs.writeFormat = Text
>
> agent1.sinks.hdfs-sink2.hdfs.maxOpenFiles = 10000
>
> agent1.sinks.hdfs-sink2.hdfs.rollSize = 0
>
> agent1.sinks.hdfs-sink2.hdfs.rollInterval = 0
>
> agent1.sinks.hdfs-sink2.hdfs.rollCount = 100000
>
> agent1.sinks.hdfs-sink2.hdfs.hdfs.threadsPoolSize = 20
>
>
>
> agent1.sinks.hdfs-sink3.channel = ch3
>
> agent1.sinks.hdfs-sink3.type = hdfs
>
> agent1.sinks.hdfs-sink3.hdfs.path = hdfs://localhost:54310/user/flumeDump3
>
> agent1.sinks.hdfs-sink3.hdfs.filePrefix = events
>
> agent1.sinks.hdfs-sink3.hdfs.batchSize = 1000
>
> agent1.sinks.hdfs-sink3.hdfs.fileType = DataStream
>
> agent1.sinks.hdfs-sink3.hdfs.writeFormat = Text
>
> agent1.sinks.hdfs-sink3.hdfs.maxOpenFiles = 10000
>
> agent1.sinks.hdfs-sink3.hdfs.rollSize = 0
>
> agent1.sinks.hdfs-sink3.hdfs.rollInterval = 0
>
> agent1.sinks.hdfs-sink3.hdfs.rollCount = 1000
>
> agent1.sinks.hdfs-sink3.hdfs.hdfs.threadsPoolSize = 20
>
>
>
> agent1.sources.mycustom-source1.selector.type = multiplexing
>
> agent1.sources.mycustom-source1.selector.header = Type
>
> agent1.sources.mycustom-source1.selector.mapping.type1 = ch1
>
> agent1.sources.mycustom-source1.selector.mapping.type2 = ch2
>
> agent1.sources.mycustom-source1.selector.mapping.type3 = ch3
>
> agent1.sources.mycustom-source1.selector.default = ch1
>
>
>
> #### end of conf file ####
>
>
>
> Here are  the errors from flume.log.
>
>
> 24 Sep 2012 21:32:13,569 WARN  [SinkRunner-PollingRunner-DefaultSinkProcessor]
> (org.apache.flume.sink.hdfs.HDFSEventSink.callWithTimeout:366)  -
> Unexpected Exception null
>
> java.lang.InterruptedException
>
>                at
> java.util.concurrent.locks.AbstractQueuedSynchronizer.tryAcquireSharedNanos(AbstractQueuedSynchronizer.java:1325)
>
>                at
> java.util.concurrent.FutureTask$Sync.innerGet(FutureTask.java:257)
>
>                at java.util.concurrent.FutureTask.get(FutureTask.java:119)
>
>                at
> org.apache.flume.sink.hdfs.HDFSEventSink.callWithTimeout(HDFSEventSink.java:339)
>
>                at
> org.apache.flume.sink.hdfs.HDFSEventSink.flush(HDFSEventSink.java:732)
>
>                at
> org.apache.flume.sink.hdfs.HDFSEventSink.process(HDFSEventSink.java:430)
>
>                at
> org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:68)
>
>                at
> org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:147)
>
>                at java.lang.Thread.run(Thread.java:679)
>
> 24 Sep 2012 21:32:13,572 ERROR
> [SinkRunner-PollingRunner-DefaultSinkProcessor]
> (org.apache.flume.sink.hdfs.HDFSEventSink.process:450)  - process failed
>
> java.lang.InterruptedException
>
>                at
> java.util.concurrent.locks.AbstractQueuedSynchronizer.tryAcquireSharedNanos(AbstractQueuedSynchronizer.java:1325)
>
>                at
> java.util.concurrent.FutureTask$Sync.innerGet(FutureTask.java:257)
>
>                at java.util.concurrent.FutureTask.get(FutureTask.java:119)
>
>                at
> org.apache.flume.sink.hdfs.HDFSEventSink.callWithTimeout(HDFSEventSink.java:339)
>
>                at
> org.apache.flume.sink.hdfs.HDFSEventSink.flush(HDFSEventSink.java:732)
>
>                at
> org.apache.flume.sink.hdfs.HDFSEventSink.process(HDFSEventSink.java:430)
>
>                at
> org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:68)
>
>                at
> org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:147)
>
>                at java.lang.Thread.run(Thread.java:679)
>
> 24 Sep 2012 21:32:13,572 ERROR
> [SinkRunner-PollingRunner-DefaultSinkProcessor]
> (org.apache.flume.SinkRunner$PollingRunner.run:160)  - Unable to deliver
> event. Exception follows.
>
> org.apache.flume.EventDeliveryException: java.lang.InterruptedException
>
>                at
> org.apache.flume.sink.hdfs.HDFSEventSink.process(HDFSEventSink.java:454)
>
>                at
> org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:68)
>
>                at
> org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:147)
>
>                at java.lang.Thread.run(Thread.java:679)
>
> Caused by: java.lang.InterruptedException
>
>                at
> java.util.concurrent.locks.AbstractQueuedSynchronizer.tryAcquireSharedNanos(AbstractQueuedSynchronizer.java:1325)
>
>                at
> java.util.concurrent.FutureTask$Sync.innerGet(FutureTask.java:257)
>
>                at java.util.concurrent.FutureTask.get(FutureTask.java:119)
>
>                at
> org.apache.flume.sink.hdfs.HDFSEventSink.callWithTimeout(HDFSEventSink.java:339)
>
>                at
> org.apache.flume.sink.hdfs.HDFSEventSink.flush(HDFSEventSink.java:732)
>
>                at
> org.apache.flume.sink.hdfs.HDFSEventSink.process(HDFSEventSink.java:430)
>
>                ... 3 more
>
> 24 Sep 2012 21:32:16,350 WARN  [SinkRunner-PollingRunner-DefaultSinkProcessor]
> (org.apache.flume.sink.hdfs.HDFSEventSink.process:446)  - HDFS IO error
>
> java.io.IOException: Filesystem closed
>
>                at
> org.apache.hadoop.hdfs.DFSClient.checkOpen(DFSClient.java:264)
>
>                at
> org.apache.hadoop.hdfs.DFSClient.access$1200(DFSClient.java:74)
>
>                at
> org.apache.hadoop.hdfs.DFSClient$DFSOutputStream.sync(DFSClient.java:3664)
>
>                at
> org.apache.hadoop.fs.FSDataOutputStream.sync(FSDataOutputStream.java:97)
>
>                at
> org.apache.flume.sink.hdfs.HDFSDataStream.sync(HDFSDataStream.java:95)
>
>                at
> org.apache.flume.sink.hdfs.BucketWriter.doFlush(BucketWriter.java:298)
>
>                at
> org.apache.flume.sink.hdfs.BucketWriter.access$500(BucketWriter.java:50)
>
>                at
> org.apache.flume.sink.hdfs.BucketWriter$4.run(BucketWriter.java:287)
>
>                at
> org.apache.flume.sink.hdfs.BucketWriter$4.run(BucketWriter.java:284)
>
>                at
> org.apache.flume.sink.hdfs.BucketWriter.runPrivileged(BucketWriter.java:127)
>
>                at
> org.apache.flume.sink.hdfs.BucketWriter.flush(BucketWriter.java:284)
>
>                at
> org.apache.flume.sink.hdfs.HDFSEventSink$2.call(HDFSEventSink.java:735)
>
>                at
> org.apache.flume.sink.hdfs.HDFSEventSink$2.call(HDFSEventSink.java:732)
>
>                at
> java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:334)
>
>                at java.util.concurrent.FutureTask.run(FutureTask.java:166)
>
>                at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1110)
>
>                at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:603)
>
>                at java.lang.Thread.run(Thread.java:679)
>
> 24 Sep 2012 21:32:18,573 INFO  [node-shutdownHook]
> (org.apache.flume.sink.hdfs.HDFSEventSink.stop:465)  - Closing
> hdfs://localhost:54310/user/flumeDump2//events
>
> 24 Sep 2012 21:32:18,575 WARN  [hdfs-hdfs-sink2-call-runner-5]
> (org.apache.flume.sink.hdfs.BucketWriter.doClose:259)  - failed to
> close() HDFSWriter for file
> (hdfs://localhost:54310/user/flumeDump2//events.1348522332892.tmp).
> Exception follows.
>
> java.io.IOException: Filesystem closed
>
>                at
> org.apache.hadoop.hdfs.DFSClient.checkOpen(DFSClient.java:264)
>
>                at
> org.apache.hadoop.hdfs.DFSClient.access$1200(DFSClient.java:74)
>
>                at
> org.apache.hadoop.hdfs.DFSClient$DFSOutputStream.sync(DFSClient.java:3664)
>
>                at
> org.apache.hadoop.fs.FSDataOutputStream.sync(FSDataOutputStream.java:97)
>
>                at
> org.apache.flume.sink.hdfs.HDFSDataStream.close(HDFSDataStream.java:103)
>
>                at
> org.apache.flume.sink.hdfs.BucketWriter.doClose(BucketWriter.java:256)
>
>                at
> org.apache.flume.sink.hdfs.BucketWriter.access$400(BucketWriter.java:50)
>
>                at
> org.apache.flume.sink.hdfs.BucketWriter$3.run(BucketWriter.java:242)
>
>                at
> org.apache.flume.sink.hdfs.BucketWriter$3.run(BucketWriter.java:239)
>
>                at
> org.apache.flume.sink.hdfs.BucketWriter.runPrivileged(BucketWriter.java:127)
>
>                at
> org.apache.flume.sink.hdfs.BucketWriter.close(BucketWriter.java:239)
>
>                at
> org.apache.flume.sink.hdfs.HDFSEventSink$3.call(HDFSEventSink.java:750)
>
>                at
> org.apache.flume.sink.hdfs.HDFSEventSink$3.call(HDFSEventSink.java:747)
>
>                at
> java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:334)
>
>                at java.util.concurrent.FutureTask.run(FutureTask.java:166)
>
>                at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1110)
>
>                at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:603)
>
>                at java.lang.Thread.run(Thread.java:679)
>
> 24 Sep 2012 21:32:18,576 WARN  [node-shutdownHook]
> (org.apache.flume.sink.hdfs.HDFSEventSink.stop:470)  - Exception while
> closing hdfs://localhost:54310/user/flumeDump2//events. Exception follows.
>
> java.io.IOException: Filesystem closed
>
>                at
> org.apache.hadoop.hdfs.DFSClient.checkOpen(DFSClient.java:264)
>
>                at
> org.apache.hadoop.hdfs.DFSClient.getFileInfo(DFSClient.java:873)
>
>                at
> org.apache.hadoop.hdfs.DistributedFileSystem.getFileStatus(DistributedFileSystem.java:513)
>
>                at
> org.apache.hadoop.fs.FileSystem.exists(FileSystem.java:768)
>
>                at
> org.apache.flume.sink.hdfs.BucketWriter.renameBucket(BucketWriter.java:375)
>
>                at
> org.apache.flume.sink.hdfs.BucketWriter.doClose(BucketWriter.java:275)
>
>                at
> org.apache.flume.sink.hdfs.BucketWriter.access$400(BucketWriter.java:50)
>
>                at
> org.apache.flume.sink.hdfs.BucketWriter$3.run(BucketWriter.java:242)
>
>                at
> org.apache.flume.sink.hdfs.BucketWriter$3.run(BucketWriter.java:239)
>
>                at
> org.apache.flume.sink.hdfs.BucketWriter.runPrivileged(BucketWriter.java:127)
>
>                at
> org.apache.flume.sink.hdfs.BucketWriter.close(BucketWriter.java:239)
>
>                at
> org.apache.flume.sink.hdfs.HDFSEventSink$3.call(HDFSEventSink.java:750)
>
>                at
> org.apache.flume.sink.hdfs.HDFSEventSink$3.call(HDFSEventSink.java:747)
>
>                at
> java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:334)
>
>                at java.util.concurrent.FutureTask.run(FutureTask.java:166)
>
>                at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1110)
>
>                at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:603)
>
>                at java.lang.Thread.run(Thread.java:679)
>
> 24 Sep 2012 21:32:18,589 INFO  [node-shutdownHook]
> (org.apache.flume.instrumentation.MonitoredCounterGroup.stop:87)  -
> Component type: SINK, name: hdfs-sink2 stopped
>
> 24 Sep 2012 21:32:18,590 INFO  [node-shutdownHook]
> (org.apache.flume.node.nodemanager.DefaultLogicalNodeManager.stopAllComponents:82)
> - Stopping Sink hdfs-sink1
>
> 24 Sep 2012 21:32:18,590 INFO  [lifecycleSupervisor-1-4]
> (org.apache.flume.lifecycle.LifecycleSupervisor$MonitorRunnable.run:215)  -
> Component has already been stopped SinkRunner: {
> policy:org.apache.flume.sink.DefaultSinkProcessor@49dc423f counterGroup:{
> name:null counters:{runner.backoffs.consecutive=4, runner.backoffs=4,
> runner.deliveryErrors=1} } }
>
> 24 Sep 2012 21:32:18,591 INFO  [node-shutdownHook]
> (org.apache.flume.lifecycle.LifecycleSupervisor.unsupervise:156)  -
> Stopping component: SinkRunner: {
> policy:org.apache.flume.sink.DefaultSinkProcessor@1b815bfb counterGroup:{
> name:null counters:{runner.backoffs.consecutive=5, runner.backoffs=5} } }
>
> 24 Sep 2012 21:32:18,592 INFO  [node-shutdownHook]
> (org.apache.flume.sink.hdfs.HDFSEventSink.stop:465)  - Closing
> hdfs://localhost:54310/user/flumeDump1//events
>
> 24 Sep 2012 21:32:18,594 WARN  [hdfs-hdfs-sink1-call-runner-3]
> (org.apache.flume.sink.hdfs.BucketWriter.doClose:259)  - failed to
> close() HDFSWriter for file
> (hdfs://localhost:54310/user/flumeDump1//events.1348522332892.tmp).
> Exception follows.
>
> java.io.IOException: Filesystem closed
>
>                at
> org.apache.hadoop.hdfs.DFSClient.checkOpen(DFSClient.java:264)
>
>                at
> org.apache.hadoop.hdfs.DFSClient.access$1200(DFSClient.java:74)
>
>                at
> org.apache.hadoop.hdfs.DFSClient$DFSOutputStream.sync(DFSClient.java:3664)
>
>                at
> org.apache.hadoop.fs.FSDataOutputStream.sync(FSDataOutputStream.java:97)
>
>                at
> org.apache.flume.sink.hdfs.HDFSDataStream.close(HDFSDataStream.java:103)
>
>                at
> org.apache.flume.sink.hdfs.BucketWriter.doClose(BucketWriter.java:256)
>
>                at
> org.apache.flume.sink.hdfs.BucketWriter.access$400(BucketWriter.java:50)
>
>                at
> org.apache.flume.sink.hdfs.BucketWriter$3.run(BucketWriter.java:242)
>
>                at
> org.apache.flume.sink.hdfs.BucketWriter$3.run(BucketWriter.java:239)
>
>                at
> org.apache.flume.sink.hdfs.BucketWriter.runPrivileged(BucketWriter.java:127)
>
>                at
> org.apache.flume.sink.hdfs.BucketWriter.close(BucketWriter.java:239)
>
>                at
> org.apache.flume.sink.hdfs.HDFSEventSink$3.call(HDFSEventSink.java:750)
>
>                at
> org.apache.flume.sink.hdfs.HDFSEventSink$3.call(HDFSEventSink.java:747)
>
>                at
> java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:334)
>
>                at java.util.concurrent.FutureTask.run(FutureTask.java:166)
>
>                at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1110)
>
>                at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:603)
>
>                at java.lang.Thread.run(Thread.java:679)
>
> 24 Sep 2012 21:32:18,595 WARN  [node-shutdownHook]
> (org.apache.flume.sink.hdfs.HDFSEventSink.stop:470)  - Exception while
> closing hdfs://localhost:54310/user/flumeDump1//events. Exception follows.
>
> java.io.IOException: Filesystem closed
>
>                at
> org.apache.hadoop.hdfs.DFSClient.checkOpen(DFSClient.java:264)
>
>                at
> org.apache.hadoop.hdfs.DFSClient.getFileInfo(DFSClient.java:873)
>
>                at
> org.apache.hadoop.hdfs.DistributedFileSystem.getFileStatus(DistributedFileSystem.java:513)
>
>                at
> org.apache.hadoop.fs.FileSystem.exists(FileSystem.java:768)
>
>                at
> org.apache.flume.sink.hdfs.BucketWriter.renameBucket(BucketWriter.java:375)
>
>                at
> org.apache.flume.sink.hdfs.BucketWriter.doClose(BucketWriter.java:275)
>
>                at
> org.apache.flume.sink.hdfs.BucketWriter.access$400(BucketWriter.java:50)
>
>                at
> org.apache.flume.sink.hdfs.BucketWriter$3.run(BucketWriter.java:242)
>
>                at
> org.apache.flume.sink.hdfs.BucketWriter$3.run(BucketWriter.java:239)
>
>                at
> org.apache.flume.sink.hdfs.BucketWriter.runPrivileged(BucketWriter.java:127)
>
>                at
> org.apache.flume.sink.hdfs.BucketWriter.close(BucketWriter.java:239)
>
>                at
> org.apache.flume.sink.hdfs.HDFSEventSink$3.call(HDFSEventSink.java:750)
>
>                at
> org.apache.flume.sink.hdfs.HDFSEventSink$3.call(HDFSEventSink.java:747)
>
>                at
> java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:334)
>
>                at java.util.concurrent.FutureTask.run(FutureTask.java:166)
>
>                at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1110)
>
>                at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:603)
>
>                at java.lang.Thread.run(Thread.java:679)
>
> 24 Sep 2012 21:32:18,600 INFO  [node-shutdownHook]
> (org.apache.flume.instrumentation.MonitoredCounterGroup.stop:87)  -
> Component type: SINK, name: hdfs-sink1 stopped
>
> 24 Sep 2012 21:32:18,600 INFO  [node-shutdownHook]
> (org.apache.flume.node.nodemanager.DefaultLogicalNodeManager.stopAllComponents:92)
> - Stopping Channel ch3
>
> 24 Sep 2012 21:32:18,601 INFO  [node-shutdownHook]
> (org.apache.flume.lifecycle.LifecycleSupervisor.unsupervise:156)  -
> Stopping component: org.apache.flume.channel.MemoryChannel{name: ch3}
>
> 24 Sep 2012 21:32:18,601 INFO  [node-shutdownHook]
> (org.apache.flume.instrumentation.MonitoredCounterGroup.stop:87)  -
> Component type: CHANNEL, name: ch3 stopped
>
> 24 Sep 2012 21:32:18,601 INFO  [node-shutdownHook]
> (org.apache.flume.node.nodemanager.DefaultLogicalNodeManager.stopAllComponents:92)
> - Stopping Channel ch2
>
> 24 Sep 2012 21:32:18,601 INFO  [node-shutdownHook]
> (org.apache.flume.lifecycle.LifecycleSupervisor.unsupervise:156)  -
> Stopping component: org.apache.flume.channel.MemoryChannel{name: ch2}
>
> 24 Sep 2012 21:32:18,601 INFO  [node-shutdownHook]
> (org.apache.flume.instrumentation.MonitoredCounterGroup.stop:87)  -
> Component type: CHANNEL, name: ch2 stopped
>
> 24 Sep 2012 21:32:18,601 INFO  [node-shutdownHook]
> (org.apache.flume.node.nodemanager.DefaultLogicalNodeManager.stopAllComponents:92)
> - Stopping Channel ch1
>
> 24 Sep 2012 21:32:18,601 INFO  [node-shutdownHook]
> (org.apache.flume.lifecycle.LifecycleSupervisor.unsupervise:156)  -
> Stopping component: org.apache.flume.channel.MemoryChannel{name: ch1}
>
> 24 Sep 2012 21:32:18,602 INFO  [node-shutdownHook]
> (org.apache.flume.instrumentation.MonitoredCounterGroup.stop:87)  -
> Component type: CHANNEL, name: ch1 stopped
>
> 24 Sep 2012 21:32:18,602 INFO  [node-shutdownHook]
> (org.apache.flume.lifecycle.LifecycleSupervisor.stop:78)  - Stopping
> lifecycle supervisor 8
>
> 24 Sep 2012 21:32:18,604 INFO  [node-shutdownHook]
> (org.apache.flume.conf.file.AbstractFileConfigurationProvider.stop:91)  -
> Configuration provider stopping
>
>
> Thanks,
>
> Harish
>