You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flume.apache.org by Sverre Bakke <sv...@gmail.com> on 2014/11/10 13:00:43 UTC

Flume error handling

Hi,

When creating a new EventDrivenSource running as an executor, what is
the correct approach to handling shutdown gracefully?

I am writing a custom source that will poll a compressed file line by
line using BufferedReader and pushing these lines to a
ChannelProcessor using processEvent(). This as a result of Spooling
Directory not supporting compressed files. This also means that most
of the time, my Flume source will be blocking on
BufferedReader.readLine() or blocking on
ChannelProcessor.processEvent().

If I shutdown the executor from the stop() method of my source, the
typical response from Flume will be that the ChannelProcessor will
generate a ChannelException. In what situations can I expect that the
ChannelException actually is the result of a shutdown (e.g. ctrl+c)
rather than some other issue that should be handled as a truly
exceptional situation/error? Or am I approaching graceful shutdown
completely wrong?

Is there any specific order in which the Flume sources, interceptors
and sinks are signaled to shut down?

I feel that when it comes to error handling (and shutdowns), the
developer guide and javadoc is a bit lacking unfortunately.

Regards,
Sverre

Re: Flume error handling

Posted by Sverre Bakke <sv...@gmail.com>.
Hi,

Actually, I considered creating a deserializer instead of a source,
but this seems to have even less documentation and samples. Also, the
files I am reading are quite large, so this might have an impact on
the system performance. I will try to have another look at this as
this would simplify my job significantly.

Regarding the source shutdown issue, the current version is listed
here (loosely based on other Flume sources):
http://pastebin.com/3GTxkmYD

Except for the fact that the HDFS sink seems to have some issues from
time to time, this works great, until I press ctrl+c on Flume. The
problem is that it never exists if I press ctrl+c, but throws quite a
lot of exceptions, including Channel full. Seems as if the sink is
shut down before the source.

My theory is as follows:
1) shutdown on a scheduled executor will only cancel the next
scheduled task, not the current one
2) since there are a few thousand files in the input directory, the
source will have a long queue of files to process, even after shutdown
3) also, as the source might be in the middle of processing a file
with a few thousand lines, it might even continue doing this even
after shutdown
4) sink is shut down independently, while the source is still feeding
events, this complaining about full buffer

The problem is that I just don't quite know how to work around this
given that I am unable to signal the Runnable objects to gracefully
shut down in the middle of processing a list of files.

Any pointers would be appreciated.

Regards,
Sverre



On Mon, Nov 10, 2014 at 8:27 PM, Hari Shreedharan
<hs...@cloudera.com> wrote:
> You could implement a deserializer that reads compressed files. You’d have
> to decompress the file and return the data event by event though. Take a
> look at something like the BlobSerializer as an example (it is a pretty
> simple one that demostrates the behavior).
>
> If you shutdown the executor which you started in the start() method, and
> you shut it down in the stop method, you probably will get an exception from
> the Executor framework complaining that the executor was shutdown and not a
> channel exception. If you get a channel exception it means that the channel
> is full or can’t accept events right now for some other reason — not that
> the source can’t handle it. If that is the case, the bug is in the source
> implementation. Do you see that behavior in a source bundled with flume?
>
> Thanks,
> Hari
>
>
> On Mon, Nov 10, 2014 at 4:01 AM, Sverre Bakke <sv...@gmail.com>
> wrote:
>>
>> Hi,
>>
>> When creating a new EventDrivenSource running as an executor, what is
>> the correct approach to handling shutdown gracefully?
>>
>> I am writing a custom source that will poll a compressed file line by
>> line using BufferedReader and pushing these lines to a
>> ChannelProcessor using processEvent(). This as a result of Spooling
>> Directory not supporting compressed files. This also means that most
>> of the time, my Flume source will be blocking on
>> BufferedReader.readLine() or blocking on
>> ChannelProcessor.processEvent().
>>
>> If I shutdown the executor from the stop() method of my source, the
>> typical response from Flume will be that the ChannelProcessor will
>> generate a ChannelException. In what situations can I expect that the
>> ChannelException actually is the result of a shutdown (e.g. ctrl+c)
>> rather than some other issue that should be handled as a truly
>> exceptional situation/error? Or am I approaching graceful shutdown
>> completely wrong?
>>
>> Is there any specific order in which the Flume sources, interceptors
>> and sinks are signaled to shut down?
>>
>> I feel that when it comes to error handling (and shutdowns), the
>> developer guide and javadoc is a bit lacking unfortunately.
>>
>> Regards,
>> Sverre
>
>

Re: Flume error handling

Posted by Sverre Bakke <sv...@gmail.com>.
Hi,

One thing I find weird about Flume deserializers are that the
resettableinputstream is incompatible with the Java standard library input
streams. Otherwise, this would probably be solved by 10 lines of code.

Regards,
Sverre
On Nov 10, 2014 8:27 PM, "Hari Shreedharan" <hs...@cloudera.com>
wrote:

> You could implement a deserializer that reads compressed files. You’d have
> to decompress the file and return the data event by event though. Take a
> look at something like the BlobSerializer as an example (it is a pretty
> simple one that demostrates the behavior).
>
> If you shutdown the executor which you started in the start() method, and
> you shut it down in the stop method, you probably will get an exception
> from the Executor framework complaining that the executor was shutdown and
> not a channel exception. If you get a channel exception it means that the
> channel is full or can’t accept events right now for some other reason —
> not that the source can’t handle it. If that is the case, the bug is in the
> source implementation. Do you see that behavior in a source bundled with
> flume?
>
> Thanks,
> Hari
>
>
> On Mon, Nov 10, 2014 at 4:01 AM, Sverre Bakke <sv...@gmail.com>
> wrote:
>
>> Hi,
>>
>> When creating a new EventDrivenSource running as an executor, what is
>> the correct approach to handling shutdown gracefully?
>>
>> I am writing a custom source that will poll a compressed file line by
>> line using BufferedReader and pushing these lines to a
>> ChannelProcessor using processEvent(). This as a result of Spooling
>> Directory not supporting compressed files. This also means that most
>> of the time, my Flume source will be blocking on
>> BufferedReader.readLine() or blocking on
>> ChannelProcessor.processEvent().
>>
>> If I shutdown the executor from the stop() method of my source, the
>> typical response from Flume will be that the ChannelProcessor will
>> generate a ChannelException. In what situations can I expect that the
>> ChannelException actually is the result of a shutdown (e.g. ctrl+c)
>> rather than some other issue that should be handled as a truly
>> exceptional situation/error? Or am I approaching graceful shutdown
>> completely wrong?
>>
>> Is there any specific order in which the Flume sources, interceptors
>> and sinks are signaled to shut down?
>>
>> I feel that when it comes to error handling (and shutdowns), the
>> developer guide and javadoc is a bit lacking unfortunately.
>>
>> Regards,
>> Sverre
>>
>
>

Re: starting flume from another Java process

Posted by Ed Judge <ej...@gmail.com>.
FYI, this issue seems to be related to not handling Flume IO.  My Java process was inheriting IO and not reading it causing flume to suspend.

-Ed

On Nov 10, 2014, at 7:27 PM, Ed Judge <ej...@gmail.com> wrote:

> Has anyone had experience with starting/stopping flume from another Java process via ProcessBuilder?
> Seems like I am able to start it the first time (have it do a transfer and log messages) then destroy it.
> However, the next time I start it, it doesn’t do any transfer or writing to its log file.  I do see the following process listed:
> 
> /bin/bash /vagrant/flume/bin/apache-flume-1.5.0.1-bin/bin/flume-ng agent -n a1 -c ./conf --conf-file ./conf/FlumeAgent.1.conf --classpath /vagrant/flume/bin/apache-flume-1.5.0.1-bin/lib/*:/home/hadoop/hadoop/share/hadoop/common/*:/home/hadoop/hadoop/share/hadoop/common/lib/*:/home/hadoop/hadoop/share/hadoop/hdfs/* -Dflume.monitoring.type=http -Dflume.monitoring.port=14001 -Dflume.log.file=FlumeAgent.1.log
> 
> 14001 is unresponsive too.
> 
> Any suggestions for how I should go about debugging this?
> 
> Thanks,
> -Ed
> 


starting flume from another Java process

Posted by Ed Judge <ej...@gmail.com>.
Has anyone had experience with starting/stopping flume from another Java process via ProcessBuilder?
Seems like I am able to start it the first time (have it do a transfer and log messages) then destroy it.
However, the next time I start it, it doesn’t do any transfer or writing to its log file.  I do see the following process listed:

/bin/bash /vagrant/flume/bin/apache-flume-1.5.0.1-bin/bin/flume-ng agent -n a1 -c ./conf --conf-file ./conf/FlumeAgent.1.conf --classpath /vagrant/flume/bin/apache-flume-1.5.0.1-bin/lib/*:/home/hadoop/hadoop/share/hadoop/common/*:/home/hadoop/hadoop/share/hadoop/common/lib/*:/home/hadoop/hadoop/share/hadoop/hdfs/* -Dflume.monitoring.type=http -Dflume.monitoring.port=14001 -Dflume.log.file=FlumeAgent.1.log

14001 is unresponsive too.

Any suggestions for how I should go about debugging this?

Thanks,
-Ed


Re: Flume error handling

Posted by Hari Shreedharan <hs...@cloudera.com>.
You could implement a deserializer that reads compressed files. You’d have to decompress the file and return the data event by event though. Take a look at something like the BlobSerializer as an example (it is a pretty simple one that demostrates the behavior).




If you shutdown the executor which you started in the start() method, and you shut it down in the stop method, you probably will get an exception from the Executor framework complaining that the executor was shutdown and not a channel exception. If you get a channel exception it means that the channel is full or can’t accept events right now for some other reason — not that the source can’t handle it. If that is the case, the bug is in the source implementation. Do you see that behavior in a source bundled with flume?


Thanks,
Hari

On Mon, Nov 10, 2014 at 4:01 AM, Sverre Bakke <sv...@gmail.com>
wrote:

> Hi,
> When creating a new EventDrivenSource running as an executor, what is
> the correct approach to handling shutdown gracefully?
> I am writing a custom source that will poll a compressed file line by
> line using BufferedReader and pushing these lines to a
> ChannelProcessor using processEvent(). This as a result of Spooling
> Directory not supporting compressed files. This also means that most
> of the time, my Flume source will be blocking on
> BufferedReader.readLine() or blocking on
> ChannelProcessor.processEvent().
> If I shutdown the executor from the stop() method of my source, the
> typical response from Flume will be that the ChannelProcessor will
> generate a ChannelException. In what situations can I expect that the
> ChannelException actually is the result of a shutdown (e.g. ctrl+c)
> rather than some other issue that should be handled as a truly
> exceptional situation/error? Or am I approaching graceful shutdown
> completely wrong?
> Is there any specific order in which the Flume sources, interceptors
> and sinks are signaled to shut down?
> I feel that when it comes to error handling (and shutdowns), the
> developer guide and javadoc is a bit lacking unfortunately.
> Regards,
> Sverre