You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flume.apache.org by Harish Mandala <mv...@gmail.com> on 2012/09/25 00:01:04 UTC
HDFS Event Sink problems
Hello,
I’m having some trouble with the HDFS Event Sink. I’m using the latest
version of flume NG, checked out today.
I am using curloader to hit “MycustomSource”, which essentially takes in
HTTP messages, and splits the content into 2 “kinds” of flume events
(differentiated by header key-value). The first kind is sent to hdfs-sink1,
and the second kind to hdfs-sink2 by a multiplexing selector as outlined in
the configuration below. There’s also an hdfs-sink3 which can be ignored at
present.
I can’t really understand what’s going on. It seems related to some of the
race condition issues outlined here:
https://issues.apache.org/jira/browse/FLUME-1219
Please let me know if you need more information.
The following is my conf file. It is followed by flume.log.
#### flume.conf ####
agent1.channels = ch1 ch2 ch3
agent1.sources = mycustom-source1
agent1.sinks = hdfs-sink1 hdfs-sink2 hdfs-sink3
# Define a memory channel called ch1 on agent1
agent1.channels.ch1.type = memory
agent1.channels.ch1.capacity = 200000
agent1.channels.ch1.transactionCapacity = 20000
agent1.channels.ch2.type = memory
agent1.channels.ch2.capacity = 1000000
agent1.channels.ch2.transactionCapacity = 100000
agent1.channels.ch3.type = memory
agent1.channels.ch3.capacity = 10000
agent1.channels.ch3.transactionCapacity = 5000
#agent1.channels.ch2.type = memory
#agent1.channels.ch3.type = memory
# Define an Mycustom custom source called mycustom-source1 on agent1 and
tell it
# to bind to 0.0.0.0:41414. Connect it to channel ch1.
agent1.sources.mycustom-source1.channels = ch1 ch2 ch3
agent1.sources.mycustom-source1.type =
org.apache.flume.source.MycustomSource
agent1.sources.mycustom-source1.bind = 127.0.0.1
agent1.sources.mycustom-source1.port = 1234
agent1.sources.mycustom-source1.serialization_method = json
#agent1.sources.mycustom-source1.schema_filepath =
/home/ubuntu/Software/flume/trunk/conf/AvroEventSchema.avpr
# Define an HDFS sink
agent1.sinks.hdfs-sink1.channel = ch1
agent1.sinks.hdfs-sink1.type = hdfs
agent1.sinks.hdfs-sink1.hdfs.path = hdfs://localhost:54310/user/flumeDump1
agent1.sinks.hdfs-sink1.hdfs.filePrefix = events
agent1.sinks.hdfs-sink1.hdfs.batchSize = 20000
agent1.sinks.hdfs-sink1.hdfs.fileType = DataStream
agent1.sinks.hdfs-sink1.hdfs.writeFormat = Text
agent1.sinks.hdfs-sink1.hdfs.maxOpenFiles = 10000
agent1.sinks.hdfs-sink1.hdfs.rollSize = 0
agent1.sinks.hdfs-sink1.hdfs.rollInterval = 0
agent1.sinks.hdfs-sink1.hdfs.rollCount = 20000
agent1.sinks.hdfs-sink1.hdfs.hdfs.threadsPoolSize = 20
agent1.sinks.hdfs-sink2.channel = ch2
agent1.sinks.hdfs-sink2.type = hdfs
agent1.sinks.hdfs-sink2.hdfs.path = hdfs://localhost:54310/user/flumeDump2
agent1.sinks.hdfs-sink2.hdfs.filePrefix = events
agent1.sinks.hdfs-sink2.hdfs.batchSize = 100000
agent1.sinks.hdfs-sink2.hdfs.fileType = DataStream
agent1.sinks.hdfs-sink2.hdfs.writeFormat = Text
agent1.sinks.hdfs-sink2.hdfs.maxOpenFiles = 10000
agent1.sinks.hdfs-sink2.hdfs.rollSize = 0
agent1.sinks.hdfs-sink2.hdfs.rollInterval = 0
agent1.sinks.hdfs-sink2.hdfs.rollCount = 100000
agent1.sinks.hdfs-sink2.hdfs.hdfs.threadsPoolSize = 20
agent1.sinks.hdfs-sink3.channel = ch3
agent1.sinks.hdfs-sink3.type = hdfs
agent1.sinks.hdfs-sink3.hdfs.path = hdfs://localhost:54310/user/flumeDump3
agent1.sinks.hdfs-sink3.hdfs.filePrefix = events
agent1.sinks.hdfs-sink3.hdfs.batchSize = 1000
agent1.sinks.hdfs-sink3.hdfs.fileType = DataStream
agent1.sinks.hdfs-sink3.hdfs.writeFormat = Text
agent1.sinks.hdfs-sink3.hdfs.maxOpenFiles = 10000
agent1.sinks.hdfs-sink3.hdfs.rollSize = 0
agent1.sinks.hdfs-sink3.hdfs.rollInterval = 0
agent1.sinks.hdfs-sink3.hdfs.rollCount = 1000
agent1.sinks.hdfs-sink3.hdfs.hdfs.threadsPoolSize = 20
agent1.sources.mycustom-source1.selector.type = multiplexing
agent1.sources.mycustom-source1.selector.header = Type
agent1.sources.mycustom-source1.selector.mapping.type1 = ch1
agent1.sources.mycustom-source1.selector.mapping.type2 = ch2
agent1.sources.mycustom-source1.selector.mapping.type3 = ch3
agent1.sources.mycustom-source1.selector.default = ch1
#### end of conf file ####
Here are the errors from flume.log.
24 Sep 2012 21:32:13,569 WARN [SinkRunner-PollingRunner-DefaultSinkProcessor]
(org.apache.flume.sink.hdfs.HDFSEventSink.callWithTimeout:366) -
Unexpected Exception null
java.lang.InterruptedException
at
java.util.concurrent.locks.AbstractQueuedSynchronizer.tryAcquireSharedNanos(AbstractQueuedSynchronizer.java:1325)
at
java.util.concurrent.FutureTask$Sync.innerGet(FutureTask.java:257)
at java.util.concurrent.FutureTask.get(FutureTask.java:119)
at
org.apache.flume.sink.hdfs.HDFSEventSink.callWithTimeout(HDFSEventSink.java:339)
at
org.apache.flume.sink.hdfs.HDFSEventSink.flush(HDFSEventSink.java:732)
at
org.apache.flume.sink.hdfs.HDFSEventSink.process(HDFSEventSink.java:430)
at
org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:68)
at
org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:147)
at java.lang.Thread.run(Thread.java:679)
24 Sep 2012 21:32:13,572 ERROR
[SinkRunner-PollingRunner-DefaultSinkProcessor]
(org.apache.flume.sink.hdfs.HDFSEventSink.process:450) - process failed
java.lang.InterruptedException
at
java.util.concurrent.locks.AbstractQueuedSynchronizer.tryAcquireSharedNanos(AbstractQueuedSynchronizer.java:1325)
at
java.util.concurrent.FutureTask$Sync.innerGet(FutureTask.java:257)
at java.util.concurrent.FutureTask.get(FutureTask.java:119)
at
org.apache.flume.sink.hdfs.HDFSEventSink.callWithTimeout(HDFSEventSink.java:339)
at
org.apache.flume.sink.hdfs.HDFSEventSink.flush(HDFSEventSink.java:732)
at
org.apache.flume.sink.hdfs.HDFSEventSink.process(HDFSEventSink.java:430)
at
org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:68)
at
org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:147)
at java.lang.Thread.run(Thread.java:679)
24 Sep 2012 21:32:13,572 ERROR
[SinkRunner-PollingRunner-DefaultSinkProcessor]
(org.apache.flume.SinkRunner$PollingRunner.run:160) - Unable to deliver
event. Exception follows.
org.apache.flume.EventDeliveryException: java.lang.InterruptedException
at
org.apache.flume.sink.hdfs.HDFSEventSink.process(HDFSEventSink.java:454)
at
org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:68)
at
org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:147)
at java.lang.Thread.run(Thread.java:679)
Caused by: java.lang.InterruptedException
at
java.util.concurrent.locks.AbstractQueuedSynchronizer.tryAcquireSharedNanos(AbstractQueuedSynchronizer.java:1325)
at
java.util.concurrent.FutureTask$Sync.innerGet(FutureTask.java:257)
at java.util.concurrent.FutureTask.get(FutureTask.java:119)
at
org.apache.flume.sink.hdfs.HDFSEventSink.callWithTimeout(HDFSEventSink.java:339)
at
org.apache.flume.sink.hdfs.HDFSEventSink.flush(HDFSEventSink.java:732)
at
org.apache.flume.sink.hdfs.HDFSEventSink.process(HDFSEventSink.java:430)
... 3 more
24 Sep 2012 21:32:16,350 WARN [SinkRunner-PollingRunner-DefaultSinkProcessor]
(org.apache.flume.sink.hdfs.HDFSEventSink.process:446) - HDFS IO error
java.io.IOException: Filesystem closed
at
org.apache.hadoop.hdfs.DFSClient.checkOpen(DFSClient.java:264)
at
org.apache.hadoop.hdfs.DFSClient.access$1200(DFSClient.java:74)
at
org.apache.hadoop.hdfs.DFSClient$DFSOutputStream.sync(DFSClient.java:3664)
at
org.apache.hadoop.fs.FSDataOutputStream.sync(FSDataOutputStream.java:97)
at
org.apache.flume.sink.hdfs.HDFSDataStream.sync(HDFSDataStream.java:95)
at
org.apache.flume.sink.hdfs.BucketWriter.doFlush(BucketWriter.java:298)
at
org.apache.flume.sink.hdfs.BucketWriter.access$500(BucketWriter.java:50)
at
org.apache.flume.sink.hdfs.BucketWriter$4.run(BucketWriter.java:287)
at
org.apache.flume.sink.hdfs.BucketWriter$4.run(BucketWriter.java:284)
at
org.apache.flume.sink.hdfs.BucketWriter.runPrivileged(BucketWriter.java:127)
at
org.apache.flume.sink.hdfs.BucketWriter.flush(BucketWriter.java:284)
at
org.apache.flume.sink.hdfs.HDFSEventSink$2.call(HDFSEventSink.java:735)
at
org.apache.flume.sink.hdfs.HDFSEventSink$2.call(HDFSEventSink.java:732)
at
java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:334)
at java.util.concurrent.FutureTask.run(FutureTask.java:166)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1110)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:603)
at java.lang.Thread.run(Thread.java:679)
24 Sep 2012 21:32:18,573 INFO [node-shutdownHook]
(org.apache.flume.sink.hdfs.HDFSEventSink.stop:465) - Closing
hdfs://localhost:54310/user/flumeDump2//events
24 Sep 2012 21:32:18,575 WARN [hdfs-hdfs-sink2-call-runner-5]
(org.apache.flume.sink.hdfs.BucketWriter.doClose:259) - failed to close()
HDFSWriter for file
(hdfs://localhost:54310/user/flumeDump2//events.1348522332892.tmp).
Exception follows.
java.io.IOException: Filesystem closed
at
org.apache.hadoop.hdfs.DFSClient.checkOpen(DFSClient.java:264)
at
org.apache.hadoop.hdfs.DFSClient.access$1200(DFSClient.java:74)
at
org.apache.hadoop.hdfs.DFSClient$DFSOutputStream.sync(DFSClient.java:3664)
at
org.apache.hadoop.fs.FSDataOutputStream.sync(FSDataOutputStream.java:97)
at
org.apache.flume.sink.hdfs.HDFSDataStream.close(HDFSDataStream.java:103)
at
org.apache.flume.sink.hdfs.BucketWriter.doClose(BucketWriter.java:256)
at
org.apache.flume.sink.hdfs.BucketWriter.access$400(BucketWriter.java:50)
at
org.apache.flume.sink.hdfs.BucketWriter$3.run(BucketWriter.java:242)
at
org.apache.flume.sink.hdfs.BucketWriter$3.run(BucketWriter.java:239)
at
org.apache.flume.sink.hdfs.BucketWriter.runPrivileged(BucketWriter.java:127)
at
org.apache.flume.sink.hdfs.BucketWriter.close(BucketWriter.java:239)
at
org.apache.flume.sink.hdfs.HDFSEventSink$3.call(HDFSEventSink.java:750)
at
org.apache.flume.sink.hdfs.HDFSEventSink$3.call(HDFSEventSink.java:747)
at
java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:334)
at java.util.concurrent.FutureTask.run(FutureTask.java:166)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1110)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:603)
at java.lang.Thread.run(Thread.java:679)
24 Sep 2012 21:32:18,576 WARN [node-shutdownHook]
(org.apache.flume.sink.hdfs.HDFSEventSink.stop:470) - Exception while
closing hdfs://localhost:54310/user/flumeDump2//events. Exception follows.
java.io.IOException: Filesystem closed
at
org.apache.hadoop.hdfs.DFSClient.checkOpen(DFSClient.java:264)
at
org.apache.hadoop.hdfs.DFSClient.getFileInfo(DFSClient.java:873)
at
org.apache.hadoop.hdfs.DistributedFileSystem.getFileStatus(DistributedFileSystem.java:513)
at
org.apache.hadoop.fs.FileSystem.exists(FileSystem.java:768)
at
org.apache.flume.sink.hdfs.BucketWriter.renameBucket(BucketWriter.java:375)
at
org.apache.flume.sink.hdfs.BucketWriter.doClose(BucketWriter.java:275)
at
org.apache.flume.sink.hdfs.BucketWriter.access$400(BucketWriter.java:50)
at
org.apache.flume.sink.hdfs.BucketWriter$3.run(BucketWriter.java:242)
at
org.apache.flume.sink.hdfs.BucketWriter$3.run(BucketWriter.java:239)
at
org.apache.flume.sink.hdfs.BucketWriter.runPrivileged(BucketWriter.java:127)
at
org.apache.flume.sink.hdfs.BucketWriter.close(BucketWriter.java:239)
at
org.apache.flume.sink.hdfs.HDFSEventSink$3.call(HDFSEventSink.java:750)
at
org.apache.flume.sink.hdfs.HDFSEventSink$3.call(HDFSEventSink.java:747)
at
java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:334)
at java.util.concurrent.FutureTask.run(FutureTask.java:166)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1110)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:603)
at java.lang.Thread.run(Thread.java:679)
24 Sep 2012 21:32:18,589 INFO [node-shutdownHook]
(org.apache.flume.instrumentation.MonitoredCounterGroup.stop:87) -
Component type: SINK, name: hdfs-sink2 stopped
24 Sep 2012 21:32:18,590 INFO [node-shutdownHook]
(org.apache.flume.node.nodemanager.DefaultLogicalNodeManager.stopAllComponents:82)
- Stopping Sink hdfs-sink1
24 Sep 2012 21:32:18,590 INFO [lifecycleSupervisor-1-4]
(org.apache.flume.lifecycle.LifecycleSupervisor$MonitorRunnable.run:215) -
Component has already been stopped SinkRunner: {
policy:org.apache.flume.sink.DefaultSinkProcessor@49dc423f counterGroup:{
name:null counters:{runner.backoffs.consecutive=4, runner.backoffs=4,
runner.deliveryErrors=1} } }
24 Sep 2012 21:32:18,591 INFO [node-shutdownHook]
(org.apache.flume.lifecycle.LifecycleSupervisor.unsupervise:156) -
Stopping component: SinkRunner: {
policy:org.apache.flume.sink.DefaultSinkProcessor@1b815bfb counterGroup:{
name:null counters:{runner.backoffs.consecutive=5, runner.backoffs=5} } }
24 Sep 2012 21:32:18,592 INFO [node-shutdownHook]
(org.apache.flume.sink.hdfs.HDFSEventSink.stop:465) - Closing
hdfs://localhost:54310/user/flumeDump1//events
24 Sep 2012 21:32:18,594 WARN [hdfs-hdfs-sink1-call-runner-3]
(org.apache.flume.sink.hdfs.BucketWriter.doClose:259) - failed to close()
HDFSWriter for file
(hdfs://localhost:54310/user/flumeDump1//events.1348522332892.tmp).
Exception follows.
java.io.IOException: Filesystem closed
at
org.apache.hadoop.hdfs.DFSClient.checkOpen(DFSClient.java:264)
at
org.apache.hadoop.hdfs.DFSClient.access$1200(DFSClient.java:74)
at
org.apache.hadoop.hdfs.DFSClient$DFSOutputStream.sync(DFSClient.java:3664)
at
org.apache.hadoop.fs.FSDataOutputStream.sync(FSDataOutputStream.java:97)
at
org.apache.flume.sink.hdfs.HDFSDataStream.close(HDFSDataStream.java:103)
at
org.apache.flume.sink.hdfs.BucketWriter.doClose(BucketWriter.java:256)
at
org.apache.flume.sink.hdfs.BucketWriter.access$400(BucketWriter.java:50)
at
org.apache.flume.sink.hdfs.BucketWriter$3.run(BucketWriter.java:242)
at
org.apache.flume.sink.hdfs.BucketWriter$3.run(BucketWriter.java:239)
at
org.apache.flume.sink.hdfs.BucketWriter.runPrivileged(BucketWriter.java:127)
at
org.apache.flume.sink.hdfs.BucketWriter.close(BucketWriter.java:239)
at
org.apache.flume.sink.hdfs.HDFSEventSink$3.call(HDFSEventSink.java:750)
at
org.apache.flume.sink.hdfs.HDFSEventSink$3.call(HDFSEventSink.java:747)
at
java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:334)
at java.util.concurrent.FutureTask.run(FutureTask.java:166)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1110)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:603)
at java.lang.Thread.run(Thread.java:679)
24 Sep 2012 21:32:18,595 WARN [node-shutdownHook]
(org.apache.flume.sink.hdfs.HDFSEventSink.stop:470) - Exception while
closing hdfs://localhost:54310/user/flumeDump1//events. Exception follows.
java.io.IOException: Filesystem closed
at
org.apache.hadoop.hdfs.DFSClient.checkOpen(DFSClient.java:264)
at
org.apache.hadoop.hdfs.DFSClient.getFileInfo(DFSClient.java:873)
at
org.apache.hadoop.hdfs.DistributedFileSystem.getFileStatus(DistributedFileSystem.java:513)
at
org.apache.hadoop.fs.FileSystem.exists(FileSystem.java:768)
at
org.apache.flume.sink.hdfs.BucketWriter.renameBucket(BucketWriter.java:375)
at
org.apache.flume.sink.hdfs.BucketWriter.doClose(BucketWriter.java:275)
at
org.apache.flume.sink.hdfs.BucketWriter.access$400(BucketWriter.java:50)
at
org.apache.flume.sink.hdfs.BucketWriter$3.run(BucketWriter.java:242)
at
org.apache.flume.sink.hdfs.BucketWriter$3.run(BucketWriter.java:239)
at
org.apache.flume.sink.hdfs.BucketWriter.runPrivileged(BucketWriter.java:127)
at
org.apache.flume.sink.hdfs.BucketWriter.close(BucketWriter.java:239)
at
org.apache.flume.sink.hdfs.HDFSEventSink$3.call(HDFSEventSink.java:750)
at
org.apache.flume.sink.hdfs.HDFSEventSink$3.call(HDFSEventSink.java:747)
at
java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:334)
at java.util.concurrent.FutureTask.run(FutureTask.java:166)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1110)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:603)
at java.lang.Thread.run(Thread.java:679)
24 Sep 2012 21:32:18,600 INFO [node-shutdownHook]
(org.apache.flume.instrumentation.MonitoredCounterGroup.stop:87) -
Component type: SINK, name: hdfs-sink1 stopped
24 Sep 2012 21:32:18,600 INFO [node-shutdownHook]
(org.apache.flume.node.nodemanager.DefaultLogicalNodeManager.stopAllComponents:92)
- Stopping Channel ch3
24 Sep 2012 21:32:18,601 INFO [node-shutdownHook]
(org.apache.flume.lifecycle.LifecycleSupervisor.unsupervise:156) -
Stopping component: org.apache.flume.channel.MemoryChannel{name: ch3}
24 Sep 2012 21:32:18,601 INFO [node-shutdownHook]
(org.apache.flume.instrumentation.MonitoredCounterGroup.stop:87) -
Component type: CHANNEL, name: ch3 stopped
24 Sep 2012 21:32:18,601 INFO [node-shutdownHook]
(org.apache.flume.node.nodemanager.DefaultLogicalNodeManager.stopAllComponents:92)
- Stopping Channel ch2
24 Sep 2012 21:32:18,601 INFO [node-shutdownHook]
(org.apache.flume.lifecycle.LifecycleSupervisor.unsupervise:156) -
Stopping component: org.apache.flume.channel.MemoryChannel{name: ch2}
24 Sep 2012 21:32:18,601 INFO [node-shutdownHook]
(org.apache.flume.instrumentation.MonitoredCounterGroup.stop:87) -
Component type: CHANNEL, name: ch2 stopped
24 Sep 2012 21:32:18,601 INFO [node-shutdownHook]
(org.apache.flume.node.nodemanager.DefaultLogicalNodeManager.stopAllComponents:92)
- Stopping Channel ch1
24 Sep 2012 21:32:18,601 INFO [node-shutdownHook]
(org.apache.flume.lifecycle.LifecycleSupervisor.unsupervise:156) -
Stopping component: org.apache.flume.channel.MemoryChannel{name: ch1}
24 Sep 2012 21:32:18,602 INFO [node-shutdownHook]
(org.apache.flume.instrumentation.MonitoredCounterGroup.stop:87) -
Component type: CHANNEL, name: ch1 stopped
24 Sep 2012 21:32:18,602 INFO [node-shutdownHook]
(org.apache.flume.lifecycle.LifecycleSupervisor.stop:78) - Stopping
lifecycle supervisor 8
24 Sep 2012 21:32:18,604 INFO [node-shutdownHook]
(org.apache.flume.conf.file.AbstractFileConfigurationProvider.stop:91) -
Configuration provider stopping
Thanks,
Harish
Re: HDFS Event Sink problems
Posted by Harish Mandala <mv...@gmail.com>.
Thanks for the patch. Yes it could be. I found some config file tweaks
which seemed to stop the issue from manifesting. I didn't look at it too
deeply, I intend to return to it later.
-Harish
On Tue, Sep 25, 2012 at 5:00 PM, Mike Percy <mp...@apache.org> wrote:
> Harish,
> What did you find on your side? Could it be related to
> https://issues.apache.org/jira/browse/FLUME-1610 ? I am looking at that
> issue right now.
>
> Regards,
> Mike
>
>
> On Tue, Sep 25, 2012 at 12:17 PM, Harish Mandala <mv...@gmail.com>wrote:
>
>> Thanks, but I understood why this is happening.
>>
>> On Mon, Sep 24, 2012 at 6:01 PM, Harish Mandala <mv...@gmail.com>wrote:
>>
>>> Hello,
>>>
>>>
>>> I’m having some trouble with the HDFS Event Sink. I’m using the latest
>>> version of flume NG, checked out today.
>>>
>>>
>>> I am using curloader to hit “MycustomSource”, which essentially takes in
>>> HTTP messages, and splits the content into 2 “kinds” of flume events
>>> (differentiated by header key-value). The first kind is sent to hdfs-sink1,
>>> and the second kind to hdfs-sink2 by a multiplexing selector as outlined in
>>> the configuration below. There’s also an hdfs-sink3 which can be ignored at
>>> present.
>>>
>>> I can’t really understand what’s going on. It seems related to some of
>>> the race condition issues outlined here:
>>>
>>> https://issues.apache.org/jira/browse/FLUME-1219
>>>
>>>
>>> Please let me know if you need more information.
>>>
>>>
>>> The following is my conf file. It is followed by flume.log.
>>>
>>>
>>> #### flume.conf ####
>>>
>>> agent1.channels = ch1 ch2 ch3
>>>
>>> agent1.sources = mycustom-source1
>>>
>>> agent1.sinks = hdfs-sink1 hdfs-sink2 hdfs-sink3
>>>
>>> # Define a memory channel called ch1 on agent1
>>>
>>> agent1.channels.ch1.type = memory
>>>
>>> agent1.channels.ch1.capacity = 200000
>>>
>>> agent1.channels.ch1.transactionCapacity = 20000
>>>
>>> agent1.channels.ch2.type = memory
>>>
>>> agent1.channels.ch2.capacity = 1000000
>>>
>>> agent1.channels.ch2.transactionCapacity = 100000
>>>
>>> agent1.channels.ch3.type = memory
>>>
>>> agent1.channels.ch3.capacity = 10000
>>>
>>> agent1.channels.ch3.transactionCapacity = 5000
>>>
>>>
>>>
>>> #agent1.channels.ch2.type = memory
>>>
>>> #agent1.channels.ch3.type = memory
>>>
>>>
>>>
>>> # Define an Mycustom custom source called mycustom-source1 on agent1 and
>>> tell it
>>>
>>> # to bind to 0.0.0.0:41414. Connect it to channel ch1.
>>>
>>> agent1.sources.mycustom-source1.channels = ch1 ch2 ch3
>>>
>>> agent1.sources.mycustom-source1.type =
>>> org.apache.flume.source.MycustomSource
>>>
>>> agent1.sources.mycustom-source1.bind = 127.0.0.1
>>>
>>> agent1.sources.mycustom-source1.port = 1234
>>>
>>> agent1.sources.mycustom-source1.serialization_method = json
>>>
>>> #agent1.sources.mycustom-source1.schema_filepath =
>>> /home/ubuntu/Software/flume/trunk/conf/AvroEventSchema.avpr
>>>
>>>
>>>
>>> # Define an HDFS sink
>>>
>>> agent1.sinks.hdfs-sink1.channel = ch1
>>>
>>> agent1.sinks.hdfs-sink1.type = hdfs
>>>
>>> agent1.sinks.hdfs-sink1.hdfs.path =
>>> hdfs://localhost:54310/user/flumeDump1
>>>
>>> agent1.sinks.hdfs-sink1.hdfs.filePrefix = events
>>>
>>> agent1.sinks.hdfs-sink1.hdfs.batchSize = 20000
>>>
>>> agent1.sinks.hdfs-sink1.hdfs.fileType = DataStream
>>>
>>> agent1.sinks.hdfs-sink1.hdfs.writeFormat = Text
>>>
>>> agent1.sinks.hdfs-sink1.hdfs.maxOpenFiles = 10000
>>>
>>> agent1.sinks.hdfs-sink1.hdfs.rollSize = 0
>>>
>>> agent1.sinks.hdfs-sink1.hdfs.rollInterval = 0
>>>
>>> agent1.sinks.hdfs-sink1.hdfs.rollCount = 20000
>>>
>>> agent1.sinks.hdfs-sink1.hdfs.hdfs.threadsPoolSize = 20
>>>
>>>
>>>
>>> agent1.sinks.hdfs-sink2.channel = ch2
>>>
>>> agent1.sinks.hdfs-sink2.type = hdfs
>>>
>>> agent1.sinks.hdfs-sink2.hdfs.path =
>>> hdfs://localhost:54310/user/flumeDump2
>>>
>>> agent1.sinks.hdfs-sink2.hdfs.filePrefix = events
>>>
>>> agent1.sinks.hdfs-sink2.hdfs.batchSize = 100000
>>>
>>> agent1.sinks.hdfs-sink2.hdfs.fileType = DataStream
>>>
>>> agent1.sinks.hdfs-sink2.hdfs.writeFormat = Text
>>>
>>> agent1.sinks.hdfs-sink2.hdfs.maxOpenFiles = 10000
>>>
>>> agent1.sinks.hdfs-sink2.hdfs.rollSize = 0
>>>
>>> agent1.sinks.hdfs-sink2.hdfs.rollInterval = 0
>>>
>>> agent1.sinks.hdfs-sink2.hdfs.rollCount = 100000
>>>
>>> agent1.sinks.hdfs-sink2.hdfs.hdfs.threadsPoolSize = 20
>>>
>>>
>>>
>>> agent1.sinks.hdfs-sink3.channel = ch3
>>>
>>> agent1.sinks.hdfs-sink3.type = hdfs
>>>
>>> agent1.sinks.hdfs-sink3.hdfs.path =
>>> hdfs://localhost:54310/user/flumeDump3
>>>
>>> agent1.sinks.hdfs-sink3.hdfs.filePrefix = events
>>>
>>> agent1.sinks.hdfs-sink3.hdfs.batchSize = 1000
>>>
>>> agent1.sinks.hdfs-sink3.hdfs.fileType = DataStream
>>>
>>> agent1.sinks.hdfs-sink3.hdfs.writeFormat = Text
>>>
>>> agent1.sinks.hdfs-sink3.hdfs.maxOpenFiles = 10000
>>>
>>> agent1.sinks.hdfs-sink3.hdfs.rollSize = 0
>>>
>>> agent1.sinks.hdfs-sink3.hdfs.rollInterval = 0
>>>
>>> agent1.sinks.hdfs-sink3.hdfs.rollCount = 1000
>>>
>>> agent1.sinks.hdfs-sink3.hdfs.hdfs.threadsPoolSize = 20
>>>
>>>
>>>
>>> agent1.sources.mycustom-source1.selector.type = multiplexing
>>>
>>> agent1.sources.mycustom-source1.selector.header = Type
>>>
>>> agent1.sources.mycustom-source1.selector.mapping.type1 = ch1
>>>
>>> agent1.sources.mycustom-source1.selector.mapping.type2 = ch2
>>>
>>> agent1.sources.mycustom-source1.selector.mapping.type3 = ch3
>>>
>>> agent1.sources.mycustom-source1.selector.default = ch1
>>>
>>>
>>>
>>> #### end of conf file ####
>>>
>>>
>>>
>>> Here are the errors from flume.log.
>>>
>>>
>>> 24 Sep 2012 21:32:13,569 WARN [SinkRunner-PollingRunner-DefaultSinkProcessor]
>>> (org.apache.flume.sink.hdfs.HDFSEventSink.callWithTimeout:366) -
>>> Unexpected Exception null
>>>
>>> java.lang.InterruptedException
>>>
>>> at
>>> java.util.concurrent.locks.AbstractQueuedSynchronizer.tryAcquireSharedNanos(AbstractQueuedSynchronizer.java:1325)
>>>
>>> at
>>> java.util.concurrent.FutureTask$Sync.innerGet(FutureTask.java:257)
>>>
>>> at
>>> java.util.concurrent.FutureTask.get(FutureTask.java:119)
>>>
>>> at
>>> org.apache.flume.sink.hdfs.HDFSEventSink.callWithTimeout(HDFSEventSink.java:339)
>>>
>>> at
>>> org.apache.flume.sink.hdfs.HDFSEventSink.flush(HDFSEventSink.java:732)
>>>
>>> at
>>> org.apache.flume.sink.hdfs.HDFSEventSink.process(HDFSEventSink.java:430)
>>>
>>> at
>>> org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:68)
>>>
>>> at
>>> org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:147)
>>>
>>> at java.lang.Thread.run(Thread.java:679)
>>>
>>> 24 Sep 2012 21:32:13,572 ERROR
>>> [SinkRunner-PollingRunner-DefaultSinkProcessor]
>>> (org.apache.flume.sink.hdfs.HDFSEventSink.process:450) - process failed
>>>
>>> java.lang.InterruptedException
>>>
>>> at
>>> java.util.concurrent.locks.AbstractQueuedSynchronizer.tryAcquireSharedNanos(AbstractQueuedSynchronizer.java:1325)
>>>
>>> at
>>> java.util.concurrent.FutureTask$Sync.innerGet(FutureTask.java:257)
>>>
>>> at
>>> java.util.concurrent.FutureTask.get(FutureTask.java:119)
>>>
>>> at
>>> org.apache.flume.sink.hdfs.HDFSEventSink.callWithTimeout(HDFSEventSink.java:339)
>>>
>>> at
>>> org.apache.flume.sink.hdfs.HDFSEventSink.flush(HDFSEventSink.java:732)
>>>
>>> at
>>> org.apache.flume.sink.hdfs.HDFSEventSink.process(HDFSEventSink.java:430)
>>>
>>> at
>>> org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:68)
>>>
>>> at
>>> org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:147)
>>>
>>> at java.lang.Thread.run(Thread.java:679)
>>>
>>> 24 Sep 2012 21:32:13,572 ERROR
>>> [SinkRunner-PollingRunner-DefaultSinkProcessor]
>>> (org.apache.flume.SinkRunner$PollingRunner.run:160) - Unable to
>>> deliver event. Exception follows.
>>>
>>> org.apache.flume.EventDeliveryException: java.lang.InterruptedException
>>>
>>> at
>>> org.apache.flume.sink.hdfs.HDFSEventSink.process(HDFSEventSink.java:454)
>>>
>>> at
>>> org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:68)
>>>
>>> at
>>> org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:147)
>>>
>>> at java.lang.Thread.run(Thread.java:679)
>>>
>>> Caused by: java.lang.InterruptedException
>>>
>>> at
>>> java.util.concurrent.locks.AbstractQueuedSynchronizer.tryAcquireSharedNanos(AbstractQueuedSynchronizer.java:1325)
>>>
>>> at
>>> java.util.concurrent.FutureTask$Sync.innerGet(FutureTask.java:257)
>>>
>>> at
>>> java.util.concurrent.FutureTask.get(FutureTask.java:119)
>>>
>>> at
>>> org.apache.flume.sink.hdfs.HDFSEventSink.callWithTimeout(HDFSEventSink.java:339)
>>>
>>> at
>>> org.apache.flume.sink.hdfs.HDFSEventSink.flush(HDFSEventSink.java:732)
>>>
>>> at
>>> org.apache.flume.sink.hdfs.HDFSEventSink.process(HDFSEventSink.java:430)
>>>
>>> ... 3 more
>>>
>>> 24 Sep 2012 21:32:16,350 WARN [SinkRunner-PollingRunner-DefaultSinkProcessor]
>>> (org.apache.flume.sink.hdfs.HDFSEventSink.process:446) - HDFS IO error
>>>
>>> java.io.IOException: Filesystem closed
>>>
>>> at
>>> org.apache.hadoop.hdfs.DFSClient.checkOpen(DFSClient.java:264)
>>>
>>> at
>>> org.apache.hadoop.hdfs.DFSClient.access$1200(DFSClient.java:74)
>>>
>>> at
>>> org.apache.hadoop.hdfs.DFSClient$DFSOutputStream.sync(DFSClient.java:3664)
>>>
>>> at
>>> org.apache.hadoop.fs.FSDataOutputStream.sync(FSDataOutputStream.java:97)
>>>
>>> at
>>> org.apache.flume.sink.hdfs.HDFSDataStream.sync(HDFSDataStream.java:95)
>>>
>>> at
>>> org.apache.flume.sink.hdfs.BucketWriter.doFlush(BucketWriter.java:298)
>>>
>>> at
>>> org.apache.flume.sink.hdfs.BucketWriter.access$500(BucketWriter.java:50)
>>>
>>> at
>>> org.apache.flume.sink.hdfs.BucketWriter$4.run(BucketWriter.java:287)
>>>
>>> at
>>> org.apache.flume.sink.hdfs.BucketWriter$4.run(BucketWriter.java:284)
>>>
>>> at
>>> org.apache.flume.sink.hdfs.BucketWriter.runPrivileged(BucketWriter.java:127)
>>>
>>> at
>>> org.apache.flume.sink.hdfs.BucketWriter.flush(BucketWriter.java:284)
>>>
>>> at
>>> org.apache.flume.sink.hdfs.HDFSEventSink$2.call(HDFSEventSink.java:735)
>>>
>>> at
>>> org.apache.flume.sink.hdfs.HDFSEventSink$2.call(HDFSEventSink.java:732)
>>>
>>> at
>>> java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:334)
>>>
>>> at
>>> java.util.concurrent.FutureTask.run(FutureTask.java:166)
>>>
>>> at
>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1110)
>>>
>>> at
>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:603)
>>>
>>> at java.lang.Thread.run(Thread.java:679)
>>>
>>> 24 Sep 2012 21:32:18,573 INFO [node-shutdownHook]
>>> (org.apache.flume.sink.hdfs.HDFSEventSink.stop:465) - Closing
>>> hdfs://localhost:54310/user/flumeDump2//events
>>>
>>> 24 Sep 2012 21:32:18,575 WARN [hdfs-hdfs-sink2-call-runner-5]
>>> (org.apache.flume.sink.hdfs.BucketWriter.doClose:259) - failed to
>>> close() HDFSWriter for file
>>> (hdfs://localhost:54310/user/flumeDump2//events.1348522332892.tmp).
>>> Exception follows.
>>>
>>> java.io.IOException: Filesystem closed
>>>
>>> at
>>> org.apache.hadoop.hdfs.DFSClient.checkOpen(DFSClient.java:264)
>>>
>>> at
>>> org.apache.hadoop.hdfs.DFSClient.access$1200(DFSClient.java:74)
>>>
>>> at
>>> org.apache.hadoop.hdfs.DFSClient$DFSOutputStream.sync(DFSClient.java:3664)
>>>
>>> at
>>> org.apache.hadoop.fs.FSDataOutputStream.sync(FSDataOutputStream.java:97)
>>>
>>> at
>>> org.apache.flume.sink.hdfs.HDFSDataStream.close(HDFSDataStream.java:103)
>>>
>>> at
>>> org.apache.flume.sink.hdfs.BucketWriter.doClose(BucketWriter.java:256)
>>>
>>> at
>>> org.apache.flume.sink.hdfs.BucketWriter.access$400(BucketWriter.java:50)
>>>
>>> at
>>> org.apache.flume.sink.hdfs.BucketWriter$3.run(BucketWriter.java:242)
>>>
>>> at
>>> org.apache.flume.sink.hdfs.BucketWriter$3.run(BucketWriter.java:239)
>>>
>>> at
>>> org.apache.flume.sink.hdfs.BucketWriter.runPrivileged(BucketWriter.java:127)
>>>
>>> at
>>> org.apache.flume.sink.hdfs.BucketWriter.close(BucketWriter.java:239)
>>>
>>> at
>>> org.apache.flume.sink.hdfs.HDFSEventSink$3.call(HDFSEventSink.java:750)
>>>
>>> at
>>> org.apache.flume.sink.hdfs.HDFSEventSink$3.call(HDFSEventSink.java:747)
>>>
>>> at
>>> java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:334)
>>>
>>> at
>>> java.util.concurrent.FutureTask.run(FutureTask.java:166)
>>>
>>> at
>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1110)
>>>
>>> at
>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:603)
>>>
>>> at java.lang.Thread.run(Thread.java:679)
>>>
>>> 24 Sep 2012 21:32:18,576 WARN [node-shutdownHook]
>>> (org.apache.flume.sink.hdfs.HDFSEventSink.stop:470) - Exception while
>>> closing hdfs://localhost:54310/user/flumeDump2//events. Exception follows.
>>>
>>> java.io.IOException: Filesystem closed
>>>
>>> at
>>> org.apache.hadoop.hdfs.DFSClient.checkOpen(DFSClient.java:264)
>>>
>>> at
>>> org.apache.hadoop.hdfs.DFSClient.getFileInfo(DFSClient.java:873)
>>>
>>> at
>>> org.apache.hadoop.hdfs.DistributedFileSystem.getFileStatus(DistributedFileSystem.java:513)
>>>
>>> at
>>> org.apache.hadoop.fs.FileSystem.exists(FileSystem.java:768)
>>>
>>> at
>>> org.apache.flume.sink.hdfs.BucketWriter.renameBucket(BucketWriter.java:375)
>>>
>>> at
>>> org.apache.flume.sink.hdfs.BucketWriter.doClose(BucketWriter.java:275)
>>>
>>> at
>>> org.apache.flume.sink.hdfs.BucketWriter.access$400(BucketWriter.java:50)
>>>
>>> at
>>> org.apache.flume.sink.hdfs.BucketWriter$3.run(BucketWriter.java:242)
>>>
>>> at
>>> org.apache.flume.sink.hdfs.BucketWriter$3.run(BucketWriter.java:239)
>>>
>>> at
>>> org.apache.flume.sink.hdfs.BucketWriter.runPrivileged(BucketWriter.java:127)
>>>
>>> at
>>> org.apache.flume.sink.hdfs.BucketWriter.close(BucketWriter.java:239)
>>>
>>> at
>>> org.apache.flume.sink.hdfs.HDFSEventSink$3.call(HDFSEventSink.java:750)
>>>
>>> at
>>> org.apache.flume.sink.hdfs.HDFSEventSink$3.call(HDFSEventSink.java:747)
>>>
>>> at
>>> java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:334)
>>>
>>> at
>>> java.util.concurrent.FutureTask.run(FutureTask.java:166)
>>>
>>> at
>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1110)
>>>
>>> at
>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:603)
>>>
>>> at java.lang.Thread.run(Thread.java:679)
>>>
>>> 24 Sep 2012 21:32:18,589 INFO [node-shutdownHook]
>>> (org.apache.flume.instrumentation.MonitoredCounterGroup.stop:87) -
>>> Component type: SINK, name: hdfs-sink2 stopped
>>>
>>> 24 Sep 2012 21:32:18,590 INFO [node-shutdownHook]
>>> (org.apache.flume.node.nodemanager.DefaultLogicalNodeManager.stopAllComponents:82)
>>> - Stopping Sink hdfs-sink1
>>>
>>> 24 Sep 2012 21:32:18,590 INFO [lifecycleSupervisor-1-4]
>>> (org.apache.flume.lifecycle.LifecycleSupervisor$MonitorRunnable.run:215)
>>> - Component has already been stopped SinkRunner: {
>>> policy:org.apache.flume.sink.DefaultSinkProcessor@49dc423fcounterGroup:{ name:null counters:{runner.backoffs.consecutive=4,
>>> runner.backoffs=4, runner.deliveryErrors=1} } }
>>>
>>> 24 Sep 2012 21:32:18,591 INFO [node-shutdownHook]
>>> (org.apache.flume.lifecycle.LifecycleSupervisor.unsupervise:156) -
>>> Stopping component: SinkRunner: {
>>> policy:org.apache.flume.sink.DefaultSinkProcessor@1b815bfbcounterGroup:{ name:null counters:{runner.backoffs.consecutive=5,
>>> runner.backoffs=5} } }
>>>
>>> 24 Sep 2012 21:32:18,592 INFO [node-shutdownHook]
>>> (org.apache.flume.sink.hdfs.HDFSEventSink.stop:465) - Closing
>>> hdfs://localhost:54310/user/flumeDump1//events
>>>
>>> 24 Sep 2012 21:32:18,594 WARN [hdfs-hdfs-sink1-call-runner-3]
>>> (org.apache.flume.sink.hdfs.BucketWriter.doClose:259) - failed to
>>> close() HDFSWriter for file
>>> (hdfs://localhost:54310/user/flumeDump1//events.1348522332892.tmp).
>>> Exception follows.
>>>
>>> java.io.IOException: Filesystem closed
>>>
>>> at
>>> org.apache.hadoop.hdfs.DFSClient.checkOpen(DFSClient.java:264)
>>>
>>> at
>>> org.apache.hadoop.hdfs.DFSClient.access$1200(DFSClient.java:74)
>>>
>>> at
>>> org.apache.hadoop.hdfs.DFSClient$DFSOutputStream.sync(DFSClient.java:3664)
>>>
>>> at
>>> org.apache.hadoop.fs.FSDataOutputStream.sync(FSDataOutputStream.java:97)
>>>
>>> at
>>> org.apache.flume.sink.hdfs.HDFSDataStream.close(HDFSDataStream.java:103)
>>>
>>> at
>>> org.apache.flume.sink.hdfs.BucketWriter.doClose(BucketWriter.java:256)
>>>
>>> at
>>> org.apache.flume.sink.hdfs.BucketWriter.access$400(BucketWriter.java:50)
>>>
>>> at
>>> org.apache.flume.sink.hdfs.BucketWriter$3.run(BucketWriter.java:242)
>>>
>>> at
>>> org.apache.flume.sink.hdfs.BucketWriter$3.run(BucketWriter.java:239)
>>>
>>> at
>>> org.apache.flume.sink.hdfs.BucketWriter.runPrivileged(BucketWriter.java:127)
>>>
>>> at
>>> org.apache.flume.sink.hdfs.BucketWriter.close(BucketWriter.java:239)
>>>
>>> at
>>> org.apache.flume.sink.hdfs.HDFSEventSink$3.call(HDFSEventSink.java:750)
>>>
>>> at
>>> org.apache.flume.sink.hdfs.HDFSEventSink$3.call(HDFSEventSink.java:747)
>>>
>>> at
>>> java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:334)
>>>
>>> at
>>> java.util.concurrent.FutureTask.run(FutureTask.java:166)
>>>
>>> at
>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1110)
>>>
>>> at
>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:603)
>>>
>>> at java.lang.Thread.run(Thread.java:679)
>>>
>>> 24 Sep 2012 21:32:18,595 WARN [node-shutdownHook]
>>> (org.apache.flume.sink.hdfs.HDFSEventSink.stop:470) - Exception while
>>> closing hdfs://localhost:54310/user/flumeDump1//events. Exception follows.
>>>
>>> java.io.IOException: Filesystem closed
>>>
>>> at
>>> org.apache.hadoop.hdfs.DFSClient.checkOpen(DFSClient.java:264)
>>>
>>> at
>>> org.apache.hadoop.hdfs.DFSClient.getFileInfo(DFSClient.java:873)
>>>
>>> at
>>> org.apache.hadoop.hdfs.DistributedFileSystem.getFileStatus(DistributedFileSystem.java:513)
>>>
>>> at
>>> org.apache.hadoop.fs.FileSystem.exists(FileSystem.java:768)
>>>
>>> at
>>> org.apache.flume.sink.hdfs.BucketWriter.renameBucket(BucketWriter.java:375)
>>>
>>> at
>>> org.apache.flume.sink.hdfs.BucketWriter.doClose(BucketWriter.java:275)
>>>
>>> at
>>> org.apache.flume.sink.hdfs.BucketWriter.access$400(BucketWriter.java:50)
>>>
>>> at
>>> org.apache.flume.sink.hdfs.BucketWriter$3.run(BucketWriter.java:242)
>>>
>>> at
>>> org.apache.flume.sink.hdfs.BucketWriter$3.run(BucketWriter.java:239)
>>>
>>> at
>>> org.apache.flume.sink.hdfs.BucketWriter.runPrivileged(BucketWriter.java:127)
>>>
>>> at
>>> org.apache.flume.sink.hdfs.BucketWriter.close(BucketWriter.java:239)
>>>
>>> at
>>> org.apache.flume.sink.hdfs.HDFSEventSink$3.call(HDFSEventSink.java:750)
>>>
>>> at
>>> org.apache.flume.sink.hdfs.HDFSEventSink$3.call(HDFSEventSink.java:747)
>>>
>>> at
>>> java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:334)
>>>
>>> at
>>> java.util.concurrent.FutureTask.run(FutureTask.java:166)
>>>
>>> at
>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1110)
>>>
>>> at
>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:603)
>>>
>>> at java.lang.Thread.run(Thread.java:679)
>>>
>>> 24 Sep 2012 21:32:18,600 INFO [node-shutdownHook]
>>> (org.apache.flume.instrumentation.MonitoredCounterGroup.stop:87) -
>>> Component type: SINK, name: hdfs-sink1 stopped
>>>
>>> 24 Sep 2012 21:32:18,600 INFO [node-shutdownHook]
>>> (org.apache.flume.node.nodemanager.DefaultLogicalNodeManager.stopAllComponents:92)
>>> - Stopping Channel ch3
>>>
>>> 24 Sep 2012 21:32:18,601 INFO [node-shutdownHook]
>>> (org.apache.flume.lifecycle.LifecycleSupervisor.unsupervise:156) -
>>> Stopping component: org.apache.flume.channel.MemoryChannel{name: ch3}
>>>
>>> 24 Sep 2012 21:32:18,601 INFO [node-shutdownHook]
>>> (org.apache.flume.instrumentation.MonitoredCounterGroup.stop:87) -
>>> Component type: CHANNEL, name: ch3 stopped
>>>
>>> 24 Sep 2012 21:32:18,601 INFO [node-shutdownHook]
>>> (org.apache.flume.node.nodemanager.DefaultLogicalNodeManager.stopAllComponents:92)
>>> - Stopping Channel ch2
>>>
>>> 24 Sep 2012 21:32:18,601 INFO [node-shutdownHook]
>>> (org.apache.flume.lifecycle.LifecycleSupervisor.unsupervise:156) -
>>> Stopping component: org.apache.flume.channel.MemoryChannel{name: ch2}
>>>
>>> 24 Sep 2012 21:32:18,601 INFO [node-shutdownHook]
>>> (org.apache.flume.instrumentation.MonitoredCounterGroup.stop:87) -
>>> Component type: CHANNEL, name: ch2 stopped
>>>
>>> 24 Sep 2012 21:32:18,601 INFO [node-shutdownHook]
>>> (org.apache.flume.node.nodemanager.DefaultLogicalNodeManager.stopAllComponents:92)
>>> - Stopping Channel ch1
>>>
>>> 24 Sep 2012 21:32:18,601 INFO [node-shutdownHook]
>>> (org.apache.flume.lifecycle.LifecycleSupervisor.unsupervise:156) -
>>> Stopping component: org.apache.flume.channel.MemoryChannel{name: ch1}
>>>
>>> 24 Sep 2012 21:32:18,602 INFO [node-shutdownHook]
>>> (org.apache.flume.instrumentation.MonitoredCounterGroup.stop:87) -
>>> Component type: CHANNEL, name: ch1 stopped
>>>
>>> 24 Sep 2012 21:32:18,602 INFO [node-shutdownHook]
>>> (org.apache.flume.lifecycle.LifecycleSupervisor.stop:78) - Stopping
>>> lifecycle supervisor 8
>>>
>>> 24 Sep 2012 21:32:18,604 INFO [node-shutdownHook]
>>> (org.apache.flume.conf.file.AbstractFileConfigurationProvider.stop:91) -
>>> Configuration provider stopping
>>>
>>>
>>> Thanks,
>>>
>>> Harish
>>>
>>
>>
>
Re: HDFS Event Sink problems
Posted by Mike Percy <mp...@apache.org>.
Harish,
What did you find on your side? Could it be related to
https://issues.apache.org/jira/browse/FLUME-1610 ? I am looking at that
issue right now.
Regards,
Mike
On Tue, Sep 25, 2012 at 12:17 PM, Harish Mandala <mv...@gmail.com>wrote:
> Thanks, but I understood why this is happening.
>
> On Mon, Sep 24, 2012 at 6:01 PM, Harish Mandala <mv...@gmail.com>wrote:
>
>> Hello,
>>
>>
>> I’m having some trouble with the HDFS Event Sink. I’m using the latest
>> version of flume NG, checked out today.
>>
>>
>> I am using curloader to hit “MycustomSource”, which essentially takes in
>> HTTP messages, and splits the content into 2 “kinds” of flume events
>> (differentiated by header key-value). The first kind is sent to hdfs-sink1,
>> and the second kind to hdfs-sink2 by a multiplexing selector as outlined in
>> the configuration below. There’s also an hdfs-sink3 which can be ignored at
>> present.
>>
>> I can’t really understand what’s going on. It seems related to some of
>> the race condition issues outlined here:
>>
>> https://issues.apache.org/jira/browse/FLUME-1219
>>
>>
>> Please let me know if you need more information.
>>
>>
>> The following is my conf file. It is followed by flume.log.
>>
>>
>> #### flume.conf ####
>>
>> agent1.channels = ch1 ch2 ch3
>>
>> agent1.sources = mycustom-source1
>>
>> agent1.sinks = hdfs-sink1 hdfs-sink2 hdfs-sink3
>>
>> # Define a memory channel called ch1 on agent1
>>
>> agent1.channels.ch1.type = memory
>>
>> agent1.channels.ch1.capacity = 200000
>>
>> agent1.channels.ch1.transactionCapacity = 20000
>>
>> agent1.channels.ch2.type = memory
>>
>> agent1.channels.ch2.capacity = 1000000
>>
>> agent1.channels.ch2.transactionCapacity = 100000
>>
>> agent1.channels.ch3.type = memory
>>
>> agent1.channels.ch3.capacity = 10000
>>
>> agent1.channels.ch3.transactionCapacity = 5000
>>
>>
>>
>> #agent1.channels.ch2.type = memory
>>
>> #agent1.channels.ch3.type = memory
>>
>>
>>
>> # Define an Mycustom custom source called mycustom-source1 on agent1 and
>> tell it
>>
>> # to bind to 0.0.0.0:41414. Connect it to channel ch1.
>>
>> agent1.sources.mycustom-source1.channels = ch1 ch2 ch3
>>
>> agent1.sources.mycustom-source1.type =
>> org.apache.flume.source.MycustomSource
>>
>> agent1.sources.mycustom-source1.bind = 127.0.0.1
>>
>> agent1.sources.mycustom-source1.port = 1234
>>
>> agent1.sources.mycustom-source1.serialization_method = json
>>
>> #agent1.sources.mycustom-source1.schema_filepath =
>> /home/ubuntu/Software/flume/trunk/conf/AvroEventSchema.avpr
>>
>>
>>
>> # Define an HDFS sink
>>
>> agent1.sinks.hdfs-sink1.channel = ch1
>>
>> agent1.sinks.hdfs-sink1.type = hdfs
>>
>> agent1.sinks.hdfs-sink1.hdfs.path = hdfs://localhost:54310/user/flumeDump1
>>
>> agent1.sinks.hdfs-sink1.hdfs.filePrefix = events
>>
>> agent1.sinks.hdfs-sink1.hdfs.batchSize = 20000
>>
>> agent1.sinks.hdfs-sink1.hdfs.fileType = DataStream
>>
>> agent1.sinks.hdfs-sink1.hdfs.writeFormat = Text
>>
>> agent1.sinks.hdfs-sink1.hdfs.maxOpenFiles = 10000
>>
>> agent1.sinks.hdfs-sink1.hdfs.rollSize = 0
>>
>> agent1.sinks.hdfs-sink1.hdfs.rollInterval = 0
>>
>> agent1.sinks.hdfs-sink1.hdfs.rollCount = 20000
>>
>> agent1.sinks.hdfs-sink1.hdfs.hdfs.threadsPoolSize = 20
>>
>>
>>
>> agent1.sinks.hdfs-sink2.channel = ch2
>>
>> agent1.sinks.hdfs-sink2.type = hdfs
>>
>> agent1.sinks.hdfs-sink2.hdfs.path = hdfs://localhost:54310/user/flumeDump2
>>
>> agent1.sinks.hdfs-sink2.hdfs.filePrefix = events
>>
>> agent1.sinks.hdfs-sink2.hdfs.batchSize = 100000
>>
>> agent1.sinks.hdfs-sink2.hdfs.fileType = DataStream
>>
>> agent1.sinks.hdfs-sink2.hdfs.writeFormat = Text
>>
>> agent1.sinks.hdfs-sink2.hdfs.maxOpenFiles = 10000
>>
>> agent1.sinks.hdfs-sink2.hdfs.rollSize = 0
>>
>> agent1.sinks.hdfs-sink2.hdfs.rollInterval = 0
>>
>> agent1.sinks.hdfs-sink2.hdfs.rollCount = 100000
>>
>> agent1.sinks.hdfs-sink2.hdfs.hdfs.threadsPoolSize = 20
>>
>>
>>
>> agent1.sinks.hdfs-sink3.channel = ch3
>>
>> agent1.sinks.hdfs-sink3.type = hdfs
>>
>> agent1.sinks.hdfs-sink3.hdfs.path = hdfs://localhost:54310/user/flumeDump3
>>
>> agent1.sinks.hdfs-sink3.hdfs.filePrefix = events
>>
>> agent1.sinks.hdfs-sink3.hdfs.batchSize = 1000
>>
>> agent1.sinks.hdfs-sink3.hdfs.fileType = DataStream
>>
>> agent1.sinks.hdfs-sink3.hdfs.writeFormat = Text
>>
>> agent1.sinks.hdfs-sink3.hdfs.maxOpenFiles = 10000
>>
>> agent1.sinks.hdfs-sink3.hdfs.rollSize = 0
>>
>> agent1.sinks.hdfs-sink3.hdfs.rollInterval = 0
>>
>> agent1.sinks.hdfs-sink3.hdfs.rollCount = 1000
>>
>> agent1.sinks.hdfs-sink3.hdfs.hdfs.threadsPoolSize = 20
>>
>>
>>
>> agent1.sources.mycustom-source1.selector.type = multiplexing
>>
>> agent1.sources.mycustom-source1.selector.header = Type
>>
>> agent1.sources.mycustom-source1.selector.mapping.type1 = ch1
>>
>> agent1.sources.mycustom-source1.selector.mapping.type2 = ch2
>>
>> agent1.sources.mycustom-source1.selector.mapping.type3 = ch3
>>
>> agent1.sources.mycustom-source1.selector.default = ch1
>>
>>
>>
>> #### end of conf file ####
>>
>>
>>
>> Here are the errors from flume.log.
>>
>>
>> 24 Sep 2012 21:32:13,569 WARN [SinkRunner-PollingRunner-DefaultSinkProcessor]
>> (org.apache.flume.sink.hdfs.HDFSEventSink.callWithTimeout:366) -
>> Unexpected Exception null
>>
>> java.lang.InterruptedException
>>
>> at
>> java.util.concurrent.locks.AbstractQueuedSynchronizer.tryAcquireSharedNanos(AbstractQueuedSynchronizer.java:1325)
>>
>> at
>> java.util.concurrent.FutureTask$Sync.innerGet(FutureTask.java:257)
>>
>> at
>> java.util.concurrent.FutureTask.get(FutureTask.java:119)
>>
>> at
>> org.apache.flume.sink.hdfs.HDFSEventSink.callWithTimeout(HDFSEventSink.java:339)
>>
>> at
>> org.apache.flume.sink.hdfs.HDFSEventSink.flush(HDFSEventSink.java:732)
>>
>> at
>> org.apache.flume.sink.hdfs.HDFSEventSink.process(HDFSEventSink.java:430)
>>
>> at
>> org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:68)
>>
>> at
>> org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:147)
>>
>> at java.lang.Thread.run(Thread.java:679)
>>
>> 24 Sep 2012 21:32:13,572 ERROR
>> [SinkRunner-PollingRunner-DefaultSinkProcessor]
>> (org.apache.flume.sink.hdfs.HDFSEventSink.process:450) - process failed
>>
>> java.lang.InterruptedException
>>
>> at
>> java.util.concurrent.locks.AbstractQueuedSynchronizer.tryAcquireSharedNanos(AbstractQueuedSynchronizer.java:1325)
>>
>> at
>> java.util.concurrent.FutureTask$Sync.innerGet(FutureTask.java:257)
>>
>> at
>> java.util.concurrent.FutureTask.get(FutureTask.java:119)
>>
>> at
>> org.apache.flume.sink.hdfs.HDFSEventSink.callWithTimeout(HDFSEventSink.java:339)
>>
>> at
>> org.apache.flume.sink.hdfs.HDFSEventSink.flush(HDFSEventSink.java:732)
>>
>> at
>> org.apache.flume.sink.hdfs.HDFSEventSink.process(HDFSEventSink.java:430)
>>
>> at
>> org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:68)
>>
>> at
>> org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:147)
>>
>> at java.lang.Thread.run(Thread.java:679)
>>
>> 24 Sep 2012 21:32:13,572 ERROR
>> [SinkRunner-PollingRunner-DefaultSinkProcessor]
>> (org.apache.flume.SinkRunner$PollingRunner.run:160) - Unable to deliver
>> event. Exception follows.
>>
>> org.apache.flume.EventDeliveryException: java.lang.InterruptedException
>>
>> at
>> org.apache.flume.sink.hdfs.HDFSEventSink.process(HDFSEventSink.java:454)
>>
>> at
>> org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:68)
>>
>> at
>> org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:147)
>>
>> at java.lang.Thread.run(Thread.java:679)
>>
>> Caused by: java.lang.InterruptedException
>>
>> at
>> java.util.concurrent.locks.AbstractQueuedSynchronizer.tryAcquireSharedNanos(AbstractQueuedSynchronizer.java:1325)
>>
>> at
>> java.util.concurrent.FutureTask$Sync.innerGet(FutureTask.java:257)
>>
>> at
>> java.util.concurrent.FutureTask.get(FutureTask.java:119)
>>
>> at
>> org.apache.flume.sink.hdfs.HDFSEventSink.callWithTimeout(HDFSEventSink.java:339)
>>
>> at
>> org.apache.flume.sink.hdfs.HDFSEventSink.flush(HDFSEventSink.java:732)
>>
>> at
>> org.apache.flume.sink.hdfs.HDFSEventSink.process(HDFSEventSink.java:430)
>>
>> ... 3 more
>>
>> 24 Sep 2012 21:32:16,350 WARN [SinkRunner-PollingRunner-DefaultSinkProcessor]
>> (org.apache.flume.sink.hdfs.HDFSEventSink.process:446) - HDFS IO error
>>
>> java.io.IOException: Filesystem closed
>>
>> at
>> org.apache.hadoop.hdfs.DFSClient.checkOpen(DFSClient.java:264)
>>
>> at
>> org.apache.hadoop.hdfs.DFSClient.access$1200(DFSClient.java:74)
>>
>> at
>> org.apache.hadoop.hdfs.DFSClient$DFSOutputStream.sync(DFSClient.java:3664)
>>
>> at
>> org.apache.hadoop.fs.FSDataOutputStream.sync(FSDataOutputStream.java:97)
>>
>> at
>> org.apache.flume.sink.hdfs.HDFSDataStream.sync(HDFSDataStream.java:95)
>>
>> at
>> org.apache.flume.sink.hdfs.BucketWriter.doFlush(BucketWriter.java:298)
>>
>> at
>> org.apache.flume.sink.hdfs.BucketWriter.access$500(BucketWriter.java:50)
>>
>> at
>> org.apache.flume.sink.hdfs.BucketWriter$4.run(BucketWriter.java:287)
>>
>> at
>> org.apache.flume.sink.hdfs.BucketWriter$4.run(BucketWriter.java:284)
>>
>> at
>> org.apache.flume.sink.hdfs.BucketWriter.runPrivileged(BucketWriter.java:127)
>>
>> at
>> org.apache.flume.sink.hdfs.BucketWriter.flush(BucketWriter.java:284)
>>
>> at
>> org.apache.flume.sink.hdfs.HDFSEventSink$2.call(HDFSEventSink.java:735)
>>
>> at
>> org.apache.flume.sink.hdfs.HDFSEventSink$2.call(HDFSEventSink.java:732)
>>
>> at
>> java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:334)
>>
>> at
>> java.util.concurrent.FutureTask.run(FutureTask.java:166)
>>
>> at
>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1110)
>>
>> at
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:603)
>>
>> at java.lang.Thread.run(Thread.java:679)
>>
>> 24 Sep 2012 21:32:18,573 INFO [node-shutdownHook]
>> (org.apache.flume.sink.hdfs.HDFSEventSink.stop:465) - Closing
>> hdfs://localhost:54310/user/flumeDump2//events
>>
>> 24 Sep 2012 21:32:18,575 WARN [hdfs-hdfs-sink2-call-runner-5]
>> (org.apache.flume.sink.hdfs.BucketWriter.doClose:259) - failed to
>> close() HDFSWriter for file
>> (hdfs://localhost:54310/user/flumeDump2//events.1348522332892.tmp).
>> Exception follows.
>>
>> java.io.IOException: Filesystem closed
>>
>> at
>> org.apache.hadoop.hdfs.DFSClient.checkOpen(DFSClient.java:264)
>>
>> at
>> org.apache.hadoop.hdfs.DFSClient.access$1200(DFSClient.java:74)
>>
>> at
>> org.apache.hadoop.hdfs.DFSClient$DFSOutputStream.sync(DFSClient.java:3664)
>>
>> at
>> org.apache.hadoop.fs.FSDataOutputStream.sync(FSDataOutputStream.java:97)
>>
>> at
>> org.apache.flume.sink.hdfs.HDFSDataStream.close(HDFSDataStream.java:103)
>>
>> at
>> org.apache.flume.sink.hdfs.BucketWriter.doClose(BucketWriter.java:256)
>>
>> at
>> org.apache.flume.sink.hdfs.BucketWriter.access$400(BucketWriter.java:50)
>>
>> at
>> org.apache.flume.sink.hdfs.BucketWriter$3.run(BucketWriter.java:242)
>>
>> at
>> org.apache.flume.sink.hdfs.BucketWriter$3.run(BucketWriter.java:239)
>>
>> at
>> org.apache.flume.sink.hdfs.BucketWriter.runPrivileged(BucketWriter.java:127)
>>
>> at
>> org.apache.flume.sink.hdfs.BucketWriter.close(BucketWriter.java:239)
>>
>> at
>> org.apache.flume.sink.hdfs.HDFSEventSink$3.call(HDFSEventSink.java:750)
>>
>> at
>> org.apache.flume.sink.hdfs.HDFSEventSink$3.call(HDFSEventSink.java:747)
>>
>> at
>> java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:334)
>>
>> at
>> java.util.concurrent.FutureTask.run(FutureTask.java:166)
>>
>> at
>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1110)
>>
>> at
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:603)
>>
>> at java.lang.Thread.run(Thread.java:679)
>>
>> 24 Sep 2012 21:32:18,576 WARN [node-shutdownHook]
>> (org.apache.flume.sink.hdfs.HDFSEventSink.stop:470) - Exception while
>> closing hdfs://localhost:54310/user/flumeDump2//events. Exception follows.
>>
>> java.io.IOException: Filesystem closed
>>
>> at
>> org.apache.hadoop.hdfs.DFSClient.checkOpen(DFSClient.java:264)
>>
>> at
>> org.apache.hadoop.hdfs.DFSClient.getFileInfo(DFSClient.java:873)
>>
>> at
>> org.apache.hadoop.hdfs.DistributedFileSystem.getFileStatus(DistributedFileSystem.java:513)
>>
>> at
>> org.apache.hadoop.fs.FileSystem.exists(FileSystem.java:768)
>>
>> at
>> org.apache.flume.sink.hdfs.BucketWriter.renameBucket(BucketWriter.java:375)
>>
>> at
>> org.apache.flume.sink.hdfs.BucketWriter.doClose(BucketWriter.java:275)
>>
>> at
>> org.apache.flume.sink.hdfs.BucketWriter.access$400(BucketWriter.java:50)
>>
>> at
>> org.apache.flume.sink.hdfs.BucketWriter$3.run(BucketWriter.java:242)
>>
>> at
>> org.apache.flume.sink.hdfs.BucketWriter$3.run(BucketWriter.java:239)
>>
>> at
>> org.apache.flume.sink.hdfs.BucketWriter.runPrivileged(BucketWriter.java:127)
>>
>> at
>> org.apache.flume.sink.hdfs.BucketWriter.close(BucketWriter.java:239)
>>
>> at
>> org.apache.flume.sink.hdfs.HDFSEventSink$3.call(HDFSEventSink.java:750)
>>
>> at
>> org.apache.flume.sink.hdfs.HDFSEventSink$3.call(HDFSEventSink.java:747)
>>
>> at
>> java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:334)
>>
>> at
>> java.util.concurrent.FutureTask.run(FutureTask.java:166)
>>
>> at
>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1110)
>>
>> at
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:603)
>>
>> at java.lang.Thread.run(Thread.java:679)
>>
>> 24 Sep 2012 21:32:18,589 INFO [node-shutdownHook]
>> (org.apache.flume.instrumentation.MonitoredCounterGroup.stop:87) -
>> Component type: SINK, name: hdfs-sink2 stopped
>>
>> 24 Sep 2012 21:32:18,590 INFO [node-shutdownHook]
>> (org.apache.flume.node.nodemanager.DefaultLogicalNodeManager.stopAllComponents:82)
>> - Stopping Sink hdfs-sink1
>>
>> 24 Sep 2012 21:32:18,590 INFO [lifecycleSupervisor-1-4]
>> (org.apache.flume.lifecycle.LifecycleSupervisor$MonitorRunnable.run:215)
>> - Component has already been stopped SinkRunner: {
>> policy:org.apache.flume.sink.DefaultSinkProcessor@49dc423fcounterGroup:{ name:null counters:{runner.backoffs.consecutive=4,
>> runner.backoffs=4, runner.deliveryErrors=1} } }
>>
>> 24 Sep 2012 21:32:18,591 INFO [node-shutdownHook]
>> (org.apache.flume.lifecycle.LifecycleSupervisor.unsupervise:156) -
>> Stopping component: SinkRunner: {
>> policy:org.apache.flume.sink.DefaultSinkProcessor@1b815bfbcounterGroup:{ name:null counters:{runner.backoffs.consecutive=5,
>> runner.backoffs=5} } }
>>
>> 24 Sep 2012 21:32:18,592 INFO [node-shutdownHook]
>> (org.apache.flume.sink.hdfs.HDFSEventSink.stop:465) - Closing
>> hdfs://localhost:54310/user/flumeDump1//events
>>
>> 24 Sep 2012 21:32:18,594 WARN [hdfs-hdfs-sink1-call-runner-3]
>> (org.apache.flume.sink.hdfs.BucketWriter.doClose:259) - failed to
>> close() HDFSWriter for file
>> (hdfs://localhost:54310/user/flumeDump1//events.1348522332892.tmp).
>> Exception follows.
>>
>> java.io.IOException: Filesystem closed
>>
>> at
>> org.apache.hadoop.hdfs.DFSClient.checkOpen(DFSClient.java:264)
>>
>> at
>> org.apache.hadoop.hdfs.DFSClient.access$1200(DFSClient.java:74)
>>
>> at
>> org.apache.hadoop.hdfs.DFSClient$DFSOutputStream.sync(DFSClient.java:3664)
>>
>> at
>> org.apache.hadoop.fs.FSDataOutputStream.sync(FSDataOutputStream.java:97)
>>
>> at
>> org.apache.flume.sink.hdfs.HDFSDataStream.close(HDFSDataStream.java:103)
>>
>> at
>> org.apache.flume.sink.hdfs.BucketWriter.doClose(BucketWriter.java:256)
>>
>> at
>> org.apache.flume.sink.hdfs.BucketWriter.access$400(BucketWriter.java:50)
>>
>> at
>> org.apache.flume.sink.hdfs.BucketWriter$3.run(BucketWriter.java:242)
>>
>> at
>> org.apache.flume.sink.hdfs.BucketWriter$3.run(BucketWriter.java:239)
>>
>> at
>> org.apache.flume.sink.hdfs.BucketWriter.runPrivileged(BucketWriter.java:127)
>>
>> at
>> org.apache.flume.sink.hdfs.BucketWriter.close(BucketWriter.java:239)
>>
>> at
>> org.apache.flume.sink.hdfs.HDFSEventSink$3.call(HDFSEventSink.java:750)
>>
>> at
>> org.apache.flume.sink.hdfs.HDFSEventSink$3.call(HDFSEventSink.java:747)
>>
>> at
>> java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:334)
>>
>> at
>> java.util.concurrent.FutureTask.run(FutureTask.java:166)
>>
>> at
>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1110)
>>
>> at
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:603)
>>
>> at java.lang.Thread.run(Thread.java:679)
>>
>> 24 Sep 2012 21:32:18,595 WARN [node-shutdownHook]
>> (org.apache.flume.sink.hdfs.HDFSEventSink.stop:470) - Exception while
>> closing hdfs://localhost:54310/user/flumeDump1//events. Exception follows.
>>
>> java.io.IOException: Filesystem closed
>>
>> at
>> org.apache.hadoop.hdfs.DFSClient.checkOpen(DFSClient.java:264)
>>
>> at
>> org.apache.hadoop.hdfs.DFSClient.getFileInfo(DFSClient.java:873)
>>
>> at
>> org.apache.hadoop.hdfs.DistributedFileSystem.getFileStatus(DistributedFileSystem.java:513)
>>
>> at
>> org.apache.hadoop.fs.FileSystem.exists(FileSystem.java:768)
>>
>> at
>> org.apache.flume.sink.hdfs.BucketWriter.renameBucket(BucketWriter.java:375)
>>
>> at
>> org.apache.flume.sink.hdfs.BucketWriter.doClose(BucketWriter.java:275)
>>
>> at
>> org.apache.flume.sink.hdfs.BucketWriter.access$400(BucketWriter.java:50)
>>
>> at
>> org.apache.flume.sink.hdfs.BucketWriter$3.run(BucketWriter.java:242)
>>
>> at
>> org.apache.flume.sink.hdfs.BucketWriter$3.run(BucketWriter.java:239)
>>
>> at
>> org.apache.flume.sink.hdfs.BucketWriter.runPrivileged(BucketWriter.java:127)
>>
>> at
>> org.apache.flume.sink.hdfs.BucketWriter.close(BucketWriter.java:239)
>>
>> at
>> org.apache.flume.sink.hdfs.HDFSEventSink$3.call(HDFSEventSink.java:750)
>>
>> at
>> org.apache.flume.sink.hdfs.HDFSEventSink$3.call(HDFSEventSink.java:747)
>>
>> at
>> java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:334)
>>
>> at
>> java.util.concurrent.FutureTask.run(FutureTask.java:166)
>>
>> at
>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1110)
>>
>> at
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:603)
>>
>> at java.lang.Thread.run(Thread.java:679)
>>
>> 24 Sep 2012 21:32:18,600 INFO [node-shutdownHook]
>> (org.apache.flume.instrumentation.MonitoredCounterGroup.stop:87) -
>> Component type: SINK, name: hdfs-sink1 stopped
>>
>> 24 Sep 2012 21:32:18,600 INFO [node-shutdownHook]
>> (org.apache.flume.node.nodemanager.DefaultLogicalNodeManager.stopAllComponents:92)
>> - Stopping Channel ch3
>>
>> 24 Sep 2012 21:32:18,601 INFO [node-shutdownHook]
>> (org.apache.flume.lifecycle.LifecycleSupervisor.unsupervise:156) -
>> Stopping component: org.apache.flume.channel.MemoryChannel{name: ch3}
>>
>> 24 Sep 2012 21:32:18,601 INFO [node-shutdownHook]
>> (org.apache.flume.instrumentation.MonitoredCounterGroup.stop:87) -
>> Component type: CHANNEL, name: ch3 stopped
>>
>> 24 Sep 2012 21:32:18,601 INFO [node-shutdownHook]
>> (org.apache.flume.node.nodemanager.DefaultLogicalNodeManager.stopAllComponents:92)
>> - Stopping Channel ch2
>>
>> 24 Sep 2012 21:32:18,601 INFO [node-shutdownHook]
>> (org.apache.flume.lifecycle.LifecycleSupervisor.unsupervise:156) -
>> Stopping component: org.apache.flume.channel.MemoryChannel{name: ch2}
>>
>> 24 Sep 2012 21:32:18,601 INFO [node-shutdownHook]
>> (org.apache.flume.instrumentation.MonitoredCounterGroup.stop:87) -
>> Component type: CHANNEL, name: ch2 stopped
>>
>> 24 Sep 2012 21:32:18,601 INFO [node-shutdownHook]
>> (org.apache.flume.node.nodemanager.DefaultLogicalNodeManager.stopAllComponents:92)
>> - Stopping Channel ch1
>>
>> 24 Sep 2012 21:32:18,601 INFO [node-shutdownHook]
>> (org.apache.flume.lifecycle.LifecycleSupervisor.unsupervise:156) -
>> Stopping component: org.apache.flume.channel.MemoryChannel{name: ch1}
>>
>> 24 Sep 2012 21:32:18,602 INFO [node-shutdownHook]
>> (org.apache.flume.instrumentation.MonitoredCounterGroup.stop:87) -
>> Component type: CHANNEL, name: ch1 stopped
>>
>> 24 Sep 2012 21:32:18,602 INFO [node-shutdownHook]
>> (org.apache.flume.lifecycle.LifecycleSupervisor.stop:78) - Stopping
>> lifecycle supervisor 8
>>
>> 24 Sep 2012 21:32:18,604 INFO [node-shutdownHook]
>> (org.apache.flume.conf.file.AbstractFileConfigurationProvider.stop:91) -
>> Configuration provider stopping
>>
>>
>> Thanks,
>>
>> Harish
>>
>
>
Re: HDFS Event Sink problems
Posted by Harish Mandala <mv...@gmail.com>.
Thanks, but I understood why this is happening.
On Mon, Sep 24, 2012 at 6:01 PM, Harish Mandala <mv...@gmail.com>wrote:
> Hello,
>
>
> I’m having some trouble with the HDFS Event Sink. I’m using the latest
> version of flume NG, checked out today.
>
>
> I am using curloader to hit “MycustomSource”, which essentially takes in
> HTTP messages, and splits the content into 2 “kinds” of flume events
> (differentiated by header key-value). The first kind is sent to hdfs-sink1,
> and the second kind to hdfs-sink2 by a multiplexing selector as outlined in
> the configuration below. There’s also an hdfs-sink3 which can be ignored at
> present.
>
> I can’t really understand what’s going on. It seems related to some of the
> race condition issues outlined here:
>
> https://issues.apache.org/jira/browse/FLUME-1219
>
>
> Please let me know if you need more information.
>
>
> The following is my conf file. It is followed by flume.log.
>
>
> #### flume.conf ####
>
> agent1.channels = ch1 ch2 ch3
>
> agent1.sources = mycustom-source1
>
> agent1.sinks = hdfs-sink1 hdfs-sink2 hdfs-sink3
>
> # Define a memory channel called ch1 on agent1
>
> agent1.channels.ch1.type = memory
>
> agent1.channels.ch1.capacity = 200000
>
> agent1.channels.ch1.transactionCapacity = 20000
>
> agent1.channels.ch2.type = memory
>
> agent1.channels.ch2.capacity = 1000000
>
> agent1.channels.ch2.transactionCapacity = 100000
>
> agent1.channels.ch3.type = memory
>
> agent1.channels.ch3.capacity = 10000
>
> agent1.channels.ch3.transactionCapacity = 5000
>
>
>
> #agent1.channels.ch2.type = memory
>
> #agent1.channels.ch3.type = memory
>
>
>
> # Define an Mycustom custom source called mycustom-source1 on agent1 and
> tell it
>
> # to bind to 0.0.0.0:41414. Connect it to channel ch1.
>
> agent1.sources.mycustom-source1.channels = ch1 ch2 ch3
>
> agent1.sources.mycustom-source1.type =
> org.apache.flume.source.MycustomSource
>
> agent1.sources.mycustom-source1.bind = 127.0.0.1
>
> agent1.sources.mycustom-source1.port = 1234
>
> agent1.sources.mycustom-source1.serialization_method = json
>
> #agent1.sources.mycustom-source1.schema_filepath =
> /home/ubuntu/Software/flume/trunk/conf/AvroEventSchema.avpr
>
>
>
> # Define an HDFS sink
>
> agent1.sinks.hdfs-sink1.channel = ch1
>
> agent1.sinks.hdfs-sink1.type = hdfs
>
> agent1.sinks.hdfs-sink1.hdfs.path = hdfs://localhost:54310/user/flumeDump1
>
> agent1.sinks.hdfs-sink1.hdfs.filePrefix = events
>
> agent1.sinks.hdfs-sink1.hdfs.batchSize = 20000
>
> agent1.sinks.hdfs-sink1.hdfs.fileType = DataStream
>
> agent1.sinks.hdfs-sink1.hdfs.writeFormat = Text
>
> agent1.sinks.hdfs-sink1.hdfs.maxOpenFiles = 10000
>
> agent1.sinks.hdfs-sink1.hdfs.rollSize = 0
>
> agent1.sinks.hdfs-sink1.hdfs.rollInterval = 0
>
> agent1.sinks.hdfs-sink1.hdfs.rollCount = 20000
>
> agent1.sinks.hdfs-sink1.hdfs.hdfs.threadsPoolSize = 20
>
>
>
> agent1.sinks.hdfs-sink2.channel = ch2
>
> agent1.sinks.hdfs-sink2.type = hdfs
>
> agent1.sinks.hdfs-sink2.hdfs.path = hdfs://localhost:54310/user/flumeDump2
>
> agent1.sinks.hdfs-sink2.hdfs.filePrefix = events
>
> agent1.sinks.hdfs-sink2.hdfs.batchSize = 100000
>
> agent1.sinks.hdfs-sink2.hdfs.fileType = DataStream
>
> agent1.sinks.hdfs-sink2.hdfs.writeFormat = Text
>
> agent1.sinks.hdfs-sink2.hdfs.maxOpenFiles = 10000
>
> agent1.sinks.hdfs-sink2.hdfs.rollSize = 0
>
> agent1.sinks.hdfs-sink2.hdfs.rollInterval = 0
>
> agent1.sinks.hdfs-sink2.hdfs.rollCount = 100000
>
> agent1.sinks.hdfs-sink2.hdfs.hdfs.threadsPoolSize = 20
>
>
>
> agent1.sinks.hdfs-sink3.channel = ch3
>
> agent1.sinks.hdfs-sink3.type = hdfs
>
> agent1.sinks.hdfs-sink3.hdfs.path = hdfs://localhost:54310/user/flumeDump3
>
> agent1.sinks.hdfs-sink3.hdfs.filePrefix = events
>
> agent1.sinks.hdfs-sink3.hdfs.batchSize = 1000
>
> agent1.sinks.hdfs-sink3.hdfs.fileType = DataStream
>
> agent1.sinks.hdfs-sink3.hdfs.writeFormat = Text
>
> agent1.sinks.hdfs-sink3.hdfs.maxOpenFiles = 10000
>
> agent1.sinks.hdfs-sink3.hdfs.rollSize = 0
>
> agent1.sinks.hdfs-sink3.hdfs.rollInterval = 0
>
> agent1.sinks.hdfs-sink3.hdfs.rollCount = 1000
>
> agent1.sinks.hdfs-sink3.hdfs.hdfs.threadsPoolSize = 20
>
>
>
> agent1.sources.mycustom-source1.selector.type = multiplexing
>
> agent1.sources.mycustom-source1.selector.header = Type
>
> agent1.sources.mycustom-source1.selector.mapping.type1 = ch1
>
> agent1.sources.mycustom-source1.selector.mapping.type2 = ch2
>
> agent1.sources.mycustom-source1.selector.mapping.type3 = ch3
>
> agent1.sources.mycustom-source1.selector.default = ch1
>
>
>
> #### end of conf file ####
>
>
>
> Here are the errors from flume.log.
>
>
> 24 Sep 2012 21:32:13,569 WARN [SinkRunner-PollingRunner-DefaultSinkProcessor]
> (org.apache.flume.sink.hdfs.HDFSEventSink.callWithTimeout:366) -
> Unexpected Exception null
>
> java.lang.InterruptedException
>
> at
> java.util.concurrent.locks.AbstractQueuedSynchronizer.tryAcquireSharedNanos(AbstractQueuedSynchronizer.java:1325)
>
> at
> java.util.concurrent.FutureTask$Sync.innerGet(FutureTask.java:257)
>
> at java.util.concurrent.FutureTask.get(FutureTask.java:119)
>
> at
> org.apache.flume.sink.hdfs.HDFSEventSink.callWithTimeout(HDFSEventSink.java:339)
>
> at
> org.apache.flume.sink.hdfs.HDFSEventSink.flush(HDFSEventSink.java:732)
>
> at
> org.apache.flume.sink.hdfs.HDFSEventSink.process(HDFSEventSink.java:430)
>
> at
> org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:68)
>
> at
> org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:147)
>
> at java.lang.Thread.run(Thread.java:679)
>
> 24 Sep 2012 21:32:13,572 ERROR
> [SinkRunner-PollingRunner-DefaultSinkProcessor]
> (org.apache.flume.sink.hdfs.HDFSEventSink.process:450) - process failed
>
> java.lang.InterruptedException
>
> at
> java.util.concurrent.locks.AbstractQueuedSynchronizer.tryAcquireSharedNanos(AbstractQueuedSynchronizer.java:1325)
>
> at
> java.util.concurrent.FutureTask$Sync.innerGet(FutureTask.java:257)
>
> at java.util.concurrent.FutureTask.get(FutureTask.java:119)
>
> at
> org.apache.flume.sink.hdfs.HDFSEventSink.callWithTimeout(HDFSEventSink.java:339)
>
> at
> org.apache.flume.sink.hdfs.HDFSEventSink.flush(HDFSEventSink.java:732)
>
> at
> org.apache.flume.sink.hdfs.HDFSEventSink.process(HDFSEventSink.java:430)
>
> at
> org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:68)
>
> at
> org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:147)
>
> at java.lang.Thread.run(Thread.java:679)
>
> 24 Sep 2012 21:32:13,572 ERROR
> [SinkRunner-PollingRunner-DefaultSinkProcessor]
> (org.apache.flume.SinkRunner$PollingRunner.run:160) - Unable to deliver
> event. Exception follows.
>
> org.apache.flume.EventDeliveryException: java.lang.InterruptedException
>
> at
> org.apache.flume.sink.hdfs.HDFSEventSink.process(HDFSEventSink.java:454)
>
> at
> org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:68)
>
> at
> org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:147)
>
> at java.lang.Thread.run(Thread.java:679)
>
> Caused by: java.lang.InterruptedException
>
> at
> java.util.concurrent.locks.AbstractQueuedSynchronizer.tryAcquireSharedNanos(AbstractQueuedSynchronizer.java:1325)
>
> at
> java.util.concurrent.FutureTask$Sync.innerGet(FutureTask.java:257)
>
> at java.util.concurrent.FutureTask.get(FutureTask.java:119)
>
> at
> org.apache.flume.sink.hdfs.HDFSEventSink.callWithTimeout(HDFSEventSink.java:339)
>
> at
> org.apache.flume.sink.hdfs.HDFSEventSink.flush(HDFSEventSink.java:732)
>
> at
> org.apache.flume.sink.hdfs.HDFSEventSink.process(HDFSEventSink.java:430)
>
> ... 3 more
>
> 24 Sep 2012 21:32:16,350 WARN [SinkRunner-PollingRunner-DefaultSinkProcessor]
> (org.apache.flume.sink.hdfs.HDFSEventSink.process:446) - HDFS IO error
>
> java.io.IOException: Filesystem closed
>
> at
> org.apache.hadoop.hdfs.DFSClient.checkOpen(DFSClient.java:264)
>
> at
> org.apache.hadoop.hdfs.DFSClient.access$1200(DFSClient.java:74)
>
> at
> org.apache.hadoop.hdfs.DFSClient$DFSOutputStream.sync(DFSClient.java:3664)
>
> at
> org.apache.hadoop.fs.FSDataOutputStream.sync(FSDataOutputStream.java:97)
>
> at
> org.apache.flume.sink.hdfs.HDFSDataStream.sync(HDFSDataStream.java:95)
>
> at
> org.apache.flume.sink.hdfs.BucketWriter.doFlush(BucketWriter.java:298)
>
> at
> org.apache.flume.sink.hdfs.BucketWriter.access$500(BucketWriter.java:50)
>
> at
> org.apache.flume.sink.hdfs.BucketWriter$4.run(BucketWriter.java:287)
>
> at
> org.apache.flume.sink.hdfs.BucketWriter$4.run(BucketWriter.java:284)
>
> at
> org.apache.flume.sink.hdfs.BucketWriter.runPrivileged(BucketWriter.java:127)
>
> at
> org.apache.flume.sink.hdfs.BucketWriter.flush(BucketWriter.java:284)
>
> at
> org.apache.flume.sink.hdfs.HDFSEventSink$2.call(HDFSEventSink.java:735)
>
> at
> org.apache.flume.sink.hdfs.HDFSEventSink$2.call(HDFSEventSink.java:732)
>
> at
> java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:334)
>
> at java.util.concurrent.FutureTask.run(FutureTask.java:166)
>
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1110)
>
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:603)
>
> at java.lang.Thread.run(Thread.java:679)
>
> 24 Sep 2012 21:32:18,573 INFO [node-shutdownHook]
> (org.apache.flume.sink.hdfs.HDFSEventSink.stop:465) - Closing
> hdfs://localhost:54310/user/flumeDump2//events
>
> 24 Sep 2012 21:32:18,575 WARN [hdfs-hdfs-sink2-call-runner-5]
> (org.apache.flume.sink.hdfs.BucketWriter.doClose:259) - failed to
> close() HDFSWriter for file
> (hdfs://localhost:54310/user/flumeDump2//events.1348522332892.tmp).
> Exception follows.
>
> java.io.IOException: Filesystem closed
>
> at
> org.apache.hadoop.hdfs.DFSClient.checkOpen(DFSClient.java:264)
>
> at
> org.apache.hadoop.hdfs.DFSClient.access$1200(DFSClient.java:74)
>
> at
> org.apache.hadoop.hdfs.DFSClient$DFSOutputStream.sync(DFSClient.java:3664)
>
> at
> org.apache.hadoop.fs.FSDataOutputStream.sync(FSDataOutputStream.java:97)
>
> at
> org.apache.flume.sink.hdfs.HDFSDataStream.close(HDFSDataStream.java:103)
>
> at
> org.apache.flume.sink.hdfs.BucketWriter.doClose(BucketWriter.java:256)
>
> at
> org.apache.flume.sink.hdfs.BucketWriter.access$400(BucketWriter.java:50)
>
> at
> org.apache.flume.sink.hdfs.BucketWriter$3.run(BucketWriter.java:242)
>
> at
> org.apache.flume.sink.hdfs.BucketWriter$3.run(BucketWriter.java:239)
>
> at
> org.apache.flume.sink.hdfs.BucketWriter.runPrivileged(BucketWriter.java:127)
>
> at
> org.apache.flume.sink.hdfs.BucketWriter.close(BucketWriter.java:239)
>
> at
> org.apache.flume.sink.hdfs.HDFSEventSink$3.call(HDFSEventSink.java:750)
>
> at
> org.apache.flume.sink.hdfs.HDFSEventSink$3.call(HDFSEventSink.java:747)
>
> at
> java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:334)
>
> at java.util.concurrent.FutureTask.run(FutureTask.java:166)
>
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1110)
>
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:603)
>
> at java.lang.Thread.run(Thread.java:679)
>
> 24 Sep 2012 21:32:18,576 WARN [node-shutdownHook]
> (org.apache.flume.sink.hdfs.HDFSEventSink.stop:470) - Exception while
> closing hdfs://localhost:54310/user/flumeDump2//events. Exception follows.
>
> java.io.IOException: Filesystem closed
>
> at
> org.apache.hadoop.hdfs.DFSClient.checkOpen(DFSClient.java:264)
>
> at
> org.apache.hadoop.hdfs.DFSClient.getFileInfo(DFSClient.java:873)
>
> at
> org.apache.hadoop.hdfs.DistributedFileSystem.getFileStatus(DistributedFileSystem.java:513)
>
> at
> org.apache.hadoop.fs.FileSystem.exists(FileSystem.java:768)
>
> at
> org.apache.flume.sink.hdfs.BucketWriter.renameBucket(BucketWriter.java:375)
>
> at
> org.apache.flume.sink.hdfs.BucketWriter.doClose(BucketWriter.java:275)
>
> at
> org.apache.flume.sink.hdfs.BucketWriter.access$400(BucketWriter.java:50)
>
> at
> org.apache.flume.sink.hdfs.BucketWriter$3.run(BucketWriter.java:242)
>
> at
> org.apache.flume.sink.hdfs.BucketWriter$3.run(BucketWriter.java:239)
>
> at
> org.apache.flume.sink.hdfs.BucketWriter.runPrivileged(BucketWriter.java:127)
>
> at
> org.apache.flume.sink.hdfs.BucketWriter.close(BucketWriter.java:239)
>
> at
> org.apache.flume.sink.hdfs.HDFSEventSink$3.call(HDFSEventSink.java:750)
>
> at
> org.apache.flume.sink.hdfs.HDFSEventSink$3.call(HDFSEventSink.java:747)
>
> at
> java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:334)
>
> at java.util.concurrent.FutureTask.run(FutureTask.java:166)
>
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1110)
>
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:603)
>
> at java.lang.Thread.run(Thread.java:679)
>
> 24 Sep 2012 21:32:18,589 INFO [node-shutdownHook]
> (org.apache.flume.instrumentation.MonitoredCounterGroup.stop:87) -
> Component type: SINK, name: hdfs-sink2 stopped
>
> 24 Sep 2012 21:32:18,590 INFO [node-shutdownHook]
> (org.apache.flume.node.nodemanager.DefaultLogicalNodeManager.stopAllComponents:82)
> - Stopping Sink hdfs-sink1
>
> 24 Sep 2012 21:32:18,590 INFO [lifecycleSupervisor-1-4]
> (org.apache.flume.lifecycle.LifecycleSupervisor$MonitorRunnable.run:215) -
> Component has already been stopped SinkRunner: {
> policy:org.apache.flume.sink.DefaultSinkProcessor@49dc423f counterGroup:{
> name:null counters:{runner.backoffs.consecutive=4, runner.backoffs=4,
> runner.deliveryErrors=1} } }
>
> 24 Sep 2012 21:32:18,591 INFO [node-shutdownHook]
> (org.apache.flume.lifecycle.LifecycleSupervisor.unsupervise:156) -
> Stopping component: SinkRunner: {
> policy:org.apache.flume.sink.DefaultSinkProcessor@1b815bfb counterGroup:{
> name:null counters:{runner.backoffs.consecutive=5, runner.backoffs=5} } }
>
> 24 Sep 2012 21:32:18,592 INFO [node-shutdownHook]
> (org.apache.flume.sink.hdfs.HDFSEventSink.stop:465) - Closing
> hdfs://localhost:54310/user/flumeDump1//events
>
> 24 Sep 2012 21:32:18,594 WARN [hdfs-hdfs-sink1-call-runner-3]
> (org.apache.flume.sink.hdfs.BucketWriter.doClose:259) - failed to
> close() HDFSWriter for file
> (hdfs://localhost:54310/user/flumeDump1//events.1348522332892.tmp).
> Exception follows.
>
> java.io.IOException: Filesystem closed
>
> at
> org.apache.hadoop.hdfs.DFSClient.checkOpen(DFSClient.java:264)
>
> at
> org.apache.hadoop.hdfs.DFSClient.access$1200(DFSClient.java:74)
>
> at
> org.apache.hadoop.hdfs.DFSClient$DFSOutputStream.sync(DFSClient.java:3664)
>
> at
> org.apache.hadoop.fs.FSDataOutputStream.sync(FSDataOutputStream.java:97)
>
> at
> org.apache.flume.sink.hdfs.HDFSDataStream.close(HDFSDataStream.java:103)
>
> at
> org.apache.flume.sink.hdfs.BucketWriter.doClose(BucketWriter.java:256)
>
> at
> org.apache.flume.sink.hdfs.BucketWriter.access$400(BucketWriter.java:50)
>
> at
> org.apache.flume.sink.hdfs.BucketWriter$3.run(BucketWriter.java:242)
>
> at
> org.apache.flume.sink.hdfs.BucketWriter$3.run(BucketWriter.java:239)
>
> at
> org.apache.flume.sink.hdfs.BucketWriter.runPrivileged(BucketWriter.java:127)
>
> at
> org.apache.flume.sink.hdfs.BucketWriter.close(BucketWriter.java:239)
>
> at
> org.apache.flume.sink.hdfs.HDFSEventSink$3.call(HDFSEventSink.java:750)
>
> at
> org.apache.flume.sink.hdfs.HDFSEventSink$3.call(HDFSEventSink.java:747)
>
> at
> java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:334)
>
> at java.util.concurrent.FutureTask.run(FutureTask.java:166)
>
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1110)
>
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:603)
>
> at java.lang.Thread.run(Thread.java:679)
>
> 24 Sep 2012 21:32:18,595 WARN [node-shutdownHook]
> (org.apache.flume.sink.hdfs.HDFSEventSink.stop:470) - Exception while
> closing hdfs://localhost:54310/user/flumeDump1//events. Exception follows.
>
> java.io.IOException: Filesystem closed
>
> at
> org.apache.hadoop.hdfs.DFSClient.checkOpen(DFSClient.java:264)
>
> at
> org.apache.hadoop.hdfs.DFSClient.getFileInfo(DFSClient.java:873)
>
> at
> org.apache.hadoop.hdfs.DistributedFileSystem.getFileStatus(DistributedFileSystem.java:513)
>
> at
> org.apache.hadoop.fs.FileSystem.exists(FileSystem.java:768)
>
> at
> org.apache.flume.sink.hdfs.BucketWriter.renameBucket(BucketWriter.java:375)
>
> at
> org.apache.flume.sink.hdfs.BucketWriter.doClose(BucketWriter.java:275)
>
> at
> org.apache.flume.sink.hdfs.BucketWriter.access$400(BucketWriter.java:50)
>
> at
> org.apache.flume.sink.hdfs.BucketWriter$3.run(BucketWriter.java:242)
>
> at
> org.apache.flume.sink.hdfs.BucketWriter$3.run(BucketWriter.java:239)
>
> at
> org.apache.flume.sink.hdfs.BucketWriter.runPrivileged(BucketWriter.java:127)
>
> at
> org.apache.flume.sink.hdfs.BucketWriter.close(BucketWriter.java:239)
>
> at
> org.apache.flume.sink.hdfs.HDFSEventSink$3.call(HDFSEventSink.java:750)
>
> at
> org.apache.flume.sink.hdfs.HDFSEventSink$3.call(HDFSEventSink.java:747)
>
> at
> java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:334)
>
> at java.util.concurrent.FutureTask.run(FutureTask.java:166)
>
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1110)
>
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:603)
>
> at java.lang.Thread.run(Thread.java:679)
>
> 24 Sep 2012 21:32:18,600 INFO [node-shutdownHook]
> (org.apache.flume.instrumentation.MonitoredCounterGroup.stop:87) -
> Component type: SINK, name: hdfs-sink1 stopped
>
> 24 Sep 2012 21:32:18,600 INFO [node-shutdownHook]
> (org.apache.flume.node.nodemanager.DefaultLogicalNodeManager.stopAllComponents:92)
> - Stopping Channel ch3
>
> 24 Sep 2012 21:32:18,601 INFO [node-shutdownHook]
> (org.apache.flume.lifecycle.LifecycleSupervisor.unsupervise:156) -
> Stopping component: org.apache.flume.channel.MemoryChannel{name: ch3}
>
> 24 Sep 2012 21:32:18,601 INFO [node-shutdownHook]
> (org.apache.flume.instrumentation.MonitoredCounterGroup.stop:87) -
> Component type: CHANNEL, name: ch3 stopped
>
> 24 Sep 2012 21:32:18,601 INFO [node-shutdownHook]
> (org.apache.flume.node.nodemanager.DefaultLogicalNodeManager.stopAllComponents:92)
> - Stopping Channel ch2
>
> 24 Sep 2012 21:32:18,601 INFO [node-shutdownHook]
> (org.apache.flume.lifecycle.LifecycleSupervisor.unsupervise:156) -
> Stopping component: org.apache.flume.channel.MemoryChannel{name: ch2}
>
> 24 Sep 2012 21:32:18,601 INFO [node-shutdownHook]
> (org.apache.flume.instrumentation.MonitoredCounterGroup.stop:87) -
> Component type: CHANNEL, name: ch2 stopped
>
> 24 Sep 2012 21:32:18,601 INFO [node-shutdownHook]
> (org.apache.flume.node.nodemanager.DefaultLogicalNodeManager.stopAllComponents:92)
> - Stopping Channel ch1
>
> 24 Sep 2012 21:32:18,601 INFO [node-shutdownHook]
> (org.apache.flume.lifecycle.LifecycleSupervisor.unsupervise:156) -
> Stopping component: org.apache.flume.channel.MemoryChannel{name: ch1}
>
> 24 Sep 2012 21:32:18,602 INFO [node-shutdownHook]
> (org.apache.flume.instrumentation.MonitoredCounterGroup.stop:87) -
> Component type: CHANNEL, name: ch1 stopped
>
> 24 Sep 2012 21:32:18,602 INFO [node-shutdownHook]
> (org.apache.flume.lifecycle.LifecycleSupervisor.stop:78) - Stopping
> lifecycle supervisor 8
>
> 24 Sep 2012 21:32:18,604 INFO [node-shutdownHook]
> (org.apache.flume.conf.file.AbstractFileConfigurationProvider.stop:91) -
> Configuration provider stopping
>
>
> Thanks,
>
> Harish
>