You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flume.apache.org by Anat Rozenzon <an...@viber.com> on 2013/08/01 09:59:54 UTC

Re: Problem Events

Hi,

I'm having the same problem with HDFS sink.

A 'poison' message which doesn't have timestamp header in it as the sink
expects.
This causes a NPE which ends in returning the message to the channel , over
and over again.

Is my only option to re-write the HDFS sink?
Isn't there any way to intercept in the sink work?

Thanks
Anat


On Fri, Jul 26, 2013 at 3:35 AM, Arvind Prabhakar <ar...@apache.org> wrote:

> Sounds like a bug in ElasticSearch sink to me. Do you mind filing a Jira
> to track this? Sample data to cause this would be even better.
>
> Regards,
> Arvind Prabhakar
>
>
> On Thu, Jul 25, 2013 at 9:50 AM, Jeremy Karlson <je...@gmail.com>wrote:
>
>> This was using the provided ElasticSearch sink.  The logs were not
>> helpful.  I ran it through with the debugger and found the source of the
>> problem.
>>
>> ContentBuilderUtil uses a very "aggressive" method to determine if the
>> content is JSON; if it contains a "{" anywhere in it, it's considered JSON.
>>  My body contained that but wasn't JSON, causing the JSON parser to throw a
>> CharConversionException from addComplexField(...) (but not the expected
>> JSONException).  We've changed addComplexField(...) to catch different
>> types of exceptions and fall back to treating it as a simple field.  We'll
>> probably submit a patch for this soon.
>>
>> I'm reasonably happy with this, but I still think that in the bigger
>> picture there should be some sort of mechanism to automatically detect and
>> toss / skip / flag problematic events without them plugging up the flow.
>>
>> -- Jeremy
>>
>>
>> On Wed, Jul 24, 2013 at 7:51 PM, Arvind Prabhakar <ar...@apache.org>wrote:
>>
>>> Jeremy, would it be possible for you to show us logs for the part where
>>> the sink fails to remove an event from the channel? I am assuming this is a
>>> standard sink that Flume provides and not a custom one.
>>>
>>> The reason I ask is because sinks do not introspect the event, and hence
>>> there is no reason why it will fail during the event's removal. It is more
>>> likely that there is a problem within the channel in that it cannot
>>> dereference the event correctly. Looking at the logs will help us identify
>>> the root cause for what you are experiencing.
>>>
>>> Regards,
>>> Arvind Prabhakar
>>>
>>>
>>> On Wed, Jul 24, 2013 at 3:56 PM, Jeremy Karlson <jeremykarlson@gmail.com
>>> > wrote:
>>>
>>>> Both reasonable suggestions.  What would a custom sink look like in
>>>> this case, and how would I only eliminate the problem events since I don't
>>>> know what they are until they are attempted by the "real" sink?
>>>>
>>>> My philosophical concern (in general) is that we're taking the approach
>>>> of exhaustively finding and eliminating possible failure cases.  It's not
>>>> possible to eliminate every single failure case, so shouldn't there be a
>>>> method of last resort to eliminate problem events from the channel?
>>>>
>>>> -- Jeremy
>>>>
>>>>
>>>>
>>>> On Wed, Jul 24, 2013 at 3:45 PM, Hari Shreedharan <
>>>> hshreedharan@cloudera.com> wrote:
>>>>
>>>>> Or you could write a custom sink that removes this event (more work of
>>>>> course)
>>>>>
>>>>>
>>>>> Thanks,
>>>>> Hari
>>>>>
>>>>> On Wednesday, July 24, 2013 at 3:36 PM, Roshan Naik wrote:
>>>>>
>>>>> if you have a way to identify such events.. you may be able to use the
>>>>> Regex interceptor to toss them out before they get into the channel.
>>>>>
>>>>>
>>>>>  On Wed, Jul 24, 2013 at 2:52 PM, Jeremy Karlson <
>>>>> jeremykarlson@gmail.com> wrote:
>>>>>
>>>>> Hi everyone.  My Flume adventures continue.
>>>>>
>>>>> I'm in a situation now where I have a channel that's filling because a
>>>>> stubborn message is stuck.  The sink won't accept it (for whatever reason;
>>>>> I can go into detail but that's not my point here).  This just blocks up
>>>>> the channel entirely, because it goes back into the channel when the sink
>>>>> refuses.  Obviously, this isn't ideal.
>>>>>
>>>>> I'm wondering what mechanisms, if any, Flume has to deal with these
>>>>> situations.  Things that come to mind might be:
>>>>>
>>>>> 1. Ditch the event after n attempts.
>>>>> 2. After n attempts, send the event to a "problem area" (maybe a
>>>>> different source / sink / channel?)  that someone can look at later.
>>>>> 3. Some sort of mechanism that allows operators to manually kill these
>>>>> messages.
>>>>>
>>>>> I'm open to suggestions on alternatives as well.
>>>>>
>>>>> Thanks.
>>>>>
>>>>> -- Jeremy
>>>>>
>>>>>
>>>>>
>>>>>
>>>>
>>>
>>
>

Re: Problem Events

Posted by Hari Shreedharan <hs...@cloudera.com>.
Can you try setting this config param for your HDFS Sink:  
hdfs.useLocalTimeStamp = true 

This should insert the timestamp at the sink into the event (this may not be what you want - but this will get rid of the event from the channel). 


Thanks,
Hari


On Wednesday, August 7, 2013 at 7:14 AM, Jonathan Cooper-Ellis wrote:

> You can use a Static Interceptor before the RegexExtractor to add a timestamp of zero to the header, which can then be overwritten by the proper timestamp (if it exists). It also should sink misses into an obvious 'miss' directory.
> 
> 
> On Tue, Aug 6, 2013 at 10:40 PM, Anat Rozenzon <anat@viber.com (mailto:anat@viber.com)> wrote:
> > After some reading in the docs I think the existing fail-over behavior can't be used to solve the 'poison' message problem as it put the 'failed' sink in  a 'cooldown' period.
> > As the problem is in the message and not the sink, it means that after a poison message had arrived, the HDFS sink will 'fail' and thus next X messages will go to the failover sink.
> > My only solution for now is to avoid my current problem and hope that I won't have any other problematic messages, I'll be glad to have a less fragile solution.
> > 
> > Many thanks!
> > Other than that, Flume looks like a great tool :-)
> > 
> > Anat
> > 
> > 
> > On Sun, Aug 4, 2013 at 8:45 AM, Anat Rozenzon <anat@viber.com (mailto:anat@viber.com)> wrote:
> > > I think using a fail-over processor is a very good idea, I think I'll use it as an immediate solution.
> > > For the long run, I would like to see a general solution (not specific to file channel, in my case it is an HDFS channel), so the suggestion to add 'Poison Message' sink to the sink processor sound good. 
> > > 
> > > Just FYI, my problem is that a log file going through my source did not have (in all rows) the structure I expected.
> > > 
> > > Since I used regexp extractor to put timestamp, the 'bad' row didn't match the regexp and the timestamp was not set, then the HDFS sink throws NPE on that:
> > > 01 Aug 2013 09:36:24,259 ERROR [SinkRunner-PollingRunner-DefaultSinkProcessor] (org.apache.flume.sink.hdfs.HDFSEventSink.process:422)  - process failed
> > > java.lang.NullPointerException: Expected timestamp in the Flume event headers, but it was null
> > >         at com.google.common.base.Preconditions.checkNotNull(Preconditions.java:204)
> > >         at org.apache.flume.formatter.output.BucketPath.replaceShorthand(BucketPath.java:200)
> > >         at org.apache.flume.formatter.output.BucketPath.escapeString(BucketPath.java:396)
> > >         at org.apache.flume.sink.hdfs.HDFSEventSink.process(HDFSEventSink.java:356)
> > >         at org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:68)
> > >         at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:147)
> > >         at java.lang.Thread.run(Thread.java:722)
> > > 01 Aug 2013 09:36:24,262 ERROR [SinkRunner-PollingRunner-DefaultSinkProcessor] (org.apache.flume.SinkRunner$PollingRunner.run:160)  - Unable to deliver event. Exception follows.
> > > org.apache.flume.EventDeliveryException: java.lang.NullPointerException: Expected timestamp in the Flume event headers, but it was null
> > >         at org.apache.flume.sink.hdfs.HDFSEventSink.process(HDFSEventSink.java:426)
> > >         at org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:68)
> > >         at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:147)
> > >         at java.lang.Thread.run(Thread.java:722)
> > > Caused by: java.lang.NullPointerException: Expected timestamp in the Flume event headers, but it was null
> > >         at com.google.common.base.Preconditions.checkNotNull(Preconditions.java:204)
> > >         at org.apache.flume.formatter.output.BucketPath.replaceShorthand(BucketPath.java:200)
> > >         at org.apache.flume.formatter.output.BucketPath.escapeString(BucketPath.java:396)
> > >         at org.apache.flume.sink.hdfs.HDFSEventSink.process(HDFSEventSink.java:356)
> > >         ... 3 more
> > > 
> > > 
> > > I fixed my regexp now, still, I can never be sure all the log files the system creates will be perfect without any bad lines.
> > > 
> > > 
> > > On Sat, Aug 3, 2013 at 9:56 AM, Connor Woodson <cwoodson.dev@gmail.com (mailto:cwoodson.dev@gmail.com)> wrote:
> > > > Just some more thoughts. It could be even easier:
> > > > 
> > > > The whole set up might be even easier; for the failover sink processor, you have those settings "max attempts" and "time between attempts" and it will just try one event X times before it gives up and sends it to the next sink. The time between events could even backoff if needed. 
> > > > 
> > > > The advantage of this is that it preserves the ordering of events, something which gets completely broken in the previous scenario.
> > > > 
> > > > - Connor
> > > > 
> > > > 
> > > > On Fri, Aug 2, 2013 at 6:27 PM, Connor Woodson <cwoodson.dev@gmail.com (mailto:cwoodson.dev@gmail.com)> wrote:
> > > > > As another option to solve the problem of having a bad event in a channel: using a fail-over sink processor, log all bad events to a local file. And to be extra cautious, add a third failover of a null sink. This will mean that events will always flow through your channel. The file sink should almost never fail, so you shouldn't be losing events in the process. And then you can re-process everything in the file if you still want those events for something. 
> > > > > 
> > > > > For the system of having Flume detect bad events, I think implementing something like above is better than discarding events that fail X times. For instance, if you have an Avro sink -> Avro source, and you're restarting your source, Flume would end up discarding events unnecessarily. Instead, how about implementing the above system and then go a step further: Flume will attempt to re-send the bad events itself. And then if a bad event isn't able to be sent after X attempts, it is can be discarded. 
> > > > > 
> > > > > I envision this system as an extension to the current File Channel; when an event fails, it is written to a secondary File Channel from which events can be pulled when the main channel isn't in use. It would add headers like "lastAttempt" and "numberOfAttempts" to events. Then it can be configurable for a "min time between attempts" and "maximum attempts." When an event fails the second time, those headers are updated and it goes back into the fail-channel. If it comes out of the fail-channel but the lastAttempt is too recent, it goes back in. If it fails more times than the maximum, it is written to a final location (perhaps its just sent to another sink; maybe this would have to be in a sink processor). Assuming all of those steps are error-free, then all messages are preserved, and the badly-formatted eventually get stored somewhere else. (This system could be hacked together with current code - fail over sink processor -> avro sink -> avro source on same instance, but t
hat's a little too hacky). 
> > > > > 
> > > > > Just some thoughts.
> > > > > 
> > > > > - Connor
> > > > > 
> > > > > 
> > > > > 
> > > > > 
> > > > > On Thu, Aug 1, 2013 at 3:25 PM, Arvind Prabhakar <arvind@apache.org (mailto:arvind@apache.org)> wrote:
> > > > > > This sounds like a critical problem that can cause pipelines to block permanently. If you find yourself in this situation, a possible work around would be to decommission the channel, remove its data and route the flow with a new empty channel. If you have the ability to identify which component is causing the problem and see if you can remove it temporarily to let the problem events pass through another peer component.
> > > > > > 
> > > > > > I have also created FLUME-2140 [1] which will eventually allow the pipelines to identify and divert such bad events. If you have any logs, data, configurations that can be shared and will help provide more details for this problem, it will be great if you could attach them to this jira and provide your comments. 
> > > > > > 
> > > > > > [1] https://issues.apache.org/jira/browse/FLUME-2140 
> > > > > > 
> > > > > > Regards,
> > > > > > Arvind Prabhakar
> > > > > > 
> > > > > > On Thu, Aug 1, 2013 at 10:33 AM, Paul Chavez <pchavez@verticalsearchworks.com (mailto:pchavez@verticalsearchworks.com)> wrote:
> > > > > > > There's no way to deal with a bad event once it's in the channel, but you can mitigate future issues by having a timestamp interceptor bound to the source feeding the channel. There is a parameter 'preserve existing' that will only add the header if it doesn't exist. If you don't want to have 'bad' time data in there you could try a static interceptor with a specific past date so that corrupt events fall into a deterministic path in HDFS. 
> > > > > > >  
> > > > > > > I use this technique to prevent stuck events for both timestamp headers as well as some of our own custom headers we use for tokenized paths. The static interceptor will insert an arbitrary header if it doesn't exist so I have a couple that put in the value 'Unknown' so that I can still send the events through the HDFS sink but I can also find them later if need be.
> > > > > > >  
> > > > > > > hope that helps,
> > > > > > > Paul Chavez
> > > > > > > 
> > > > > > > From: Roshan Naik [mailto:roshan@hortonworks.com] 
> > > > > > > Sent: Thursday, August 01, 2013 10:27 AM
> > > > > > > To: user@flume.apache.org (mailto:user@flume.apache.org)
> > > > > > > Subject: Re: Problem Events
> > > > > > > 
> > > > > > > some questions: 
> > > > > > > - why is the sink unable to consume the event ?
> > > > > > > - how would you like to identify such an event ? by examining its content ? or by the fact that its ping-pong-ing between channel and sink ?
> > > > > > > - what would you prefer to do with such an event ? merely drop it ? 
> > > > > > > 
> > > > > > > 
> > > > > > > 
> > > > > > > On Thu, Aug 1, 2013 at 9:26 AM, Jeremy Karlson <jeremykarlson@gmail.com (mailto:jeremykarlson@gmail.com)> wrote:
> > > > > > > > To my knowledge (which is admittedly limited), there is no way to deal with these in a way that will make your day.  I'm happy if someone can say otherwise. 
> > > > > > > > 
> > > > > > > > This is very similar to a problem I had a week or two ago.  I fixed it by restarting Flume with debugging on, connecting to it with the debugger, and finding the message in the sink.  Discover a bug in the sink.  Downloaded Flume, fixed bug, recompiled, installed custom version, etc. 
> > > > > > > > 
> > > > > > > > I agree that this is not a practical solution, and I still believe that Flume needs some sort of "sink of last resort" option or something, like JMS implementations. 
> > > > > > > > 
> > > > > > > > -- Jeremy 
> > > > > > > > 
> > > > > > > > 
> > > > > > > > 
> > > > > > > > On Thu, Aug 1, 2013 at 2:42 AM, Anat Rozenzon <anat@viber.com (mailto:anat@viber.com)> wrote:
> > > > > > > > > The message is already in the channel.
> > > > > > > > > Is there a way to write an interceptor to work after the channel? or before the sink?
> > > > > > > > > 
> > > > > > > > > The only thing I found is to stop everything and delete the channel files, but I won't be able to use this approach in production :-(
> > > > > > > > > 
> > > > > > > > > 
> > > > > > > > > On Thu, Aug 1, 2013 at 11:13 AM, Ashish <paliwalashish@gmail.com (mailto:paliwalashish@gmail.com)> wrote:
> > > > > > > > > > 
> > > > > > > > > > 
> > > > > > > > > > 
> > > > > > > > > > On Thu, Aug 1, 2013 at 1:29 PM, Anat Rozenzon <anat@viber.com (mailto:anat@viber.com)> wrote:
> > > > > > > > > > > Hi,
> > > > > > > > > > > 
> > > > > > > > > > > I'm having the same problem with HDFS sink.
> > > > > > > > > > > 
> > > > > > > > > > > A 'poison' message which doesn't have timestamp header in it as the sink expects.
> > > > > > > > > > > This causes a NPE which ends in returning the message to the channel , over and over again.
> > > > > > > > > > > 
> > > > > > > > > > > Is my only option to re-write the HDFS sink?
> > > > > > > > > > > Isn't there any way to intercept in the sink work?
> > > > > > > > > > 
> > > > > > > > > > You can write a custom interceptor and remove/modify the poison message. 
> > > > > > > > > > 
> > > > > > > > > > Interceptors are called before message makes it way into the channel. 
> > > > > > > > > > 
> > > > > > > > > > http://flume.apache.org/FlumeUserGuide.html#flume-interceptors
> > > > > > > > > > 
> > > > > > > > > > I wrote a blog about it a while back http://www.ashishpaliwal.com/blog/2013/06/flume-cookbook-implementing-custom-interceptors/ 
> > > > > > > > > > 
> > > > > > > > > >  
> > > > > > > > > > > 
> > > > > > > > > > > Thanks
> > > > > > > > > > > Anat
> > > > > > > > > > > 
> > > > > > > > > > > 
> > > > > > > > > > > On Fri, Jul 26, 2013 at 3:35 AM, Arvind Prabhakar <arvind@apache.org (mailto:arvind@apache.org)> wrote:
> > > > > > > > > > > > Sounds like a bug in ElasticSearch sink to me. Do you mind filing a Jira to track this? Sample data to cause this would be even better. 
> > > > > > > > > > > > 
> > > > > > > > > > > > Regards,
> > > > > > > > > > > > Arvind Prabhakar 
> > > > > > > > > > > > 
> > > > > > > > > > > > 
> > > > > > > > > > > > On Thu, Jul 25, 2013 at 9:50 AM, Jeremy Karlson <jeremykarlson@gmail.com (mailto:jeremykarlson@gmail.com)> wrote:
> > > > > > > > > > > > > This was using the provided ElasticSearch sink.  The logs were not helpful.  I ran it through with the debugger and found the source of the problem. 
> > > > > > > > > > > > > 
> > > > > > > > > > > > > ContentBuilderUtil uses a very "aggressive" method to determine if the content is JSON; if it contains a "{" anywhere in it, it's considered JSON.  My body contained that but wasn't JSON, causing the JSON parser to throw a CharConversionException from addComplexField(...) (but not the expected JSONException).  We've changed addComplexField(...) to catch different types of exceptions and fall back to treating it as a simple field.  We'll probably submit a patch for this soon. 
> > > > > > > > > > > > > 
> > > > > > > > > > > > > I'm reasonably happy with this, but I still think that in the bigger picture there should be some sort of mechanism to automatically detect and toss / skip / flag problematic events without them plugging up the flow. 
> > > > > > > > > > > > > 
> > > > > > > > > > > > > -- Jeremy 
> > > > > > > > > > > > > 
> > > > > > > > > > > > > 
> > > > > > > > > > > > > On Wed, Jul 24, 2013 at 7:51 PM, Arvind Prabhakar <arvind@apache.org (mailto:arvind@apache.org)> wrote:
> > > > > > > > > > > > > > Jeremy, would it be possible for you to show us logs for the part where the sink fails to remove an event from the channel? I am assuming this is a standard sink that Flume provides and not a custom one. 
> > > > > > > > > > > > > > 
> > > > > > > > > > > > > > The reason I ask is because sinks do not introspect the event, and hence there is no reason why it will fail during the event's removal. It is more likely that there is a problem within the channel in that it cannot dereference the event correctly. Looking at the logs will help us identify the root cause for what you are experiencing. 
> > > > > > > > > > > > > > 
> > > > > > > > > > > > > > Regards, 
> > > > > > > > > > > > > > Arvind Prabhakar 
> > > > > > > > > > > > > > 
> > > > > > > > > > > > > > 
> > > > > > > > > > > > > > On Wed, Jul 24, 2013 at 3:56 PM, Jeremy Karlson <jeremykarlson@gmail.com (mailto:jeremykarlson@gmail.com)> wrote:
> > > > > > > > > > > > > > > Both reasonable suggestions.  What would a custom sink look like in this case, and how would I only eliminate the problem events since I don't know what they are until they are attempted by the "real" sink? 
> > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > My philosophical concern (in general) is that we're taking the approach of exhaustively finding and eliminating possible failure cases.  It's not possible to eliminate every single failure case, so shouldn't there be a method of last resort to eliminate problem events from the channel? 
> > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > -- Jeremy 
> > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > On Wed, Jul 24, 2013 at 3:45 PM, Hari Shreedharan <hshreedharan@cloudera.com (mailto:hshreedharan@cloudera.com)> wrote:
> > > > > > > > > > > > > > > > Or you could write a custom sink that removes this event (more work of course) 
> > > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > > Thanks, 
> > > > > > > > > > > > > > > > Hari
> > > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > > On Wednesday, July 24, 2013 at 3:36 PM, Roshan Naik wrote:
> > > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > > > if you have a way to identify such events.. you may be able to use the Regex interceptor to toss them out before they get into the channel.
> > > > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > > > On Wed, Jul 24, 2013 at 2:52 PM, Jeremy Karlson <jeremykarlson@gmail.com (mailto:jeremykarlson@gmail.com)> wrote:
> > > > > > > > > > > > > > > > > > Hi everyone.  My Flume adventures continue. 
> > > > > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > > > > I'm in a situation now where I have a channel that's filling because a stubborn message is stuck.  The sink won't accept it (for whatever reason; I can go into detail but that's not my point here).  This just blocks up the channel entirely, because it goes back into the channel when the sink refuses.  Obviously, this isn't ideal. 
> > > > > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > > > > I'm wondering what mechanisms, if any, Flume has to deal with these situations.  Things that come to mind might be: 
> > > > > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > > > > 1. Ditch the event after n attempts. 
> > > > > > > > > > > > > > > > > > 2. After n attempts, send the event to a "problem area" (maybe a different source / sink / channel?)  that someone can look at later.
> > > > > > > > > > > > > > > > > > 3. Some sort of mechanism that allows operators to manually kill these messages.
> > > > > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > > > > I'm open to suggestions on alternatives as well. 
> > > > > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > > > > Thanks. 
> > > > > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > > > > -- Jeremy
> > > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > 
> > > > > > > > > > > > > 
> > > > > > > > > > > > 
> > > > > > > > > > > 
> > > > > > > > > > 
> > > > > > > > > > 
> > > > > > > > > > 
> > > > > > > > > > -- 
> > > > > > > > > > thanks
> > > > > > > > > > ashish
> > > > > > > > > > 
> > > > > > > > > > Blog: http://www.ashishpaliwal.com/blog
> > > > > > > > > > My Photo Galleries: http://www.pbase.com/ashishpaliwal 
> > > > > > > > 
> > > > > > > 
> > > > > > 
> > > > > 
> > > > 
> > > 
> > 
> 


Re: Problem Events

Posted by Connor Woodson <cw...@gmail.com>.
To avoid the cooldown period, set the maxBackoff to 0; the failover sink
should have some better logic regarding this, but what will happen will
take a little more processing time:

an event fails -> that sink is put on the fail over list -> the event goes
to the next sink, succeeds.
next event -> failover processor checks fail over list, removes the first
sink from that list -> event goes to first sink

I am fairly confident that's how it works.

- Connor


On Wed, Aug 7, 2013 at 10:26 PM, Anat Rozenzon <an...@viber.com> wrote:

> That's what I'll do, add a static timestamp of 1 and let all the 'bad'
> messages flow into one directoty.
> Thanks
>
>
> On Wed, Aug 7, 2013 at 5:14 PM, Jonathan Cooper-Ellis <jc...@cloudera.com>wrote:
>
>> You can use a Static Interceptor before the RegexExtractor to add a
>> timestamp of zero to the header, which can then be overwritten by the
>> proper timestamp (if it exists). It also should sink misses into an obvious
>> 'miss' directory.
>>
>>
>> On Tue, Aug 6, 2013 at 10:40 PM, Anat Rozenzon <an...@viber.com> wrote:
>>
>>> After some reading in the docs I think the existing fail-over behavior
>>> can't be used to solve the 'poison' message problem as it put the 'failed'
>>> sink in  a 'cooldown' period.
>>> As the problem is in the message and not the sink, it means that after a
>>> poison message had arrived, the HDFS sink will 'fail' and thus next X
>>> messages will go to the failover sink.
>>> My only solution for now is to avoid my current problem and hope that I
>>> won't have any other problematic messages, I'll be glad to have a less
>>> fragile solution.
>>>
>>> Many thanks!
>>> Other than that, Flume looks like a great tool :-)
>>>
>>> Anat
>>>
>>>
>>> On Sun, Aug 4, 2013 at 8:45 AM, Anat Rozenzon <an...@viber.com> wrote:
>>>
>>>> I think using a fail-over processor is a very good idea, I think I'll
>>>> use it as an immediate solution.
>>>> For the long run, I would like to see a general solution (not specific
>>>> to file channel, in my case it is an HDFS channel), so the suggestion to
>>>> add 'Poison Message' sink to the sink processor sound good.
>>>>
>>>> Just FYI, my problem is that a log file going through my source did not
>>>> have (in all rows) the structure I expected.
>>>>
>>>> Since I used regexp extractor to put timestamp, the 'bad' row didn't
>>>> match the regexp and the timestamp was not set, then the HDFS sink throws
>>>> NPE on that:
>>>> 01 Aug 2013 09:36:24,259 ERROR
>>>> [SinkRunner-PollingRunner-DefaultSinkProcessor]
>>>> (org.apache.flume.sink.hdfs.HDFSEventSink.process:422)  - process failed
>>>> java.lang.NullPointerException: Expected timestamp in the Flume event
>>>> headers, but it was null
>>>>         at
>>>> com.google.common.base.Preconditions.checkNotNull(Preconditions.java:204)
>>>>         at
>>>> org.apache.flume.formatter.output.BucketPath.replaceShorthand(BucketPath.java:200)
>>>>         at
>>>> org.apache.flume.formatter.output.BucketPath.escapeString(BucketPath.java:396)
>>>>         at
>>>> org.apache.flume.sink.hdfs.HDFSEventSink.process(HDFSEventSink.java:356)
>>>>         at
>>>> org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:68)
>>>>         at
>>>> org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:147)
>>>>         at java.lang.Thread.run(Thread.java:722)
>>>> 01 Aug 2013 09:36:24,262 ERROR
>>>> [SinkRunner-PollingRunner-DefaultSinkProcessor]
>>>> (org.apache.flume.SinkRunner$PollingRunner.run:160)  - Unable to deliver
>>>> event. Exception follows.
>>>> org.apache.flume.EventDeliveryException:
>>>> java.lang.NullPointerException: Expected timestamp in the Flume event
>>>> headers, but it was null
>>>>         at
>>>> org.apache.flume.sink.hdfs.HDFSEventSink.process(HDFSEventSink.java:426)
>>>>         at
>>>> org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:68)
>>>>         at
>>>> org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:147)
>>>>         at java.lang.Thread.run(Thread.java:722)
>>>> Caused by: java.lang.NullPointerException: Expected timestamp in the
>>>> Flume event headers, but it was null
>>>>         at
>>>> com.google.common.base.Preconditions.checkNotNull(Preconditions.java:204)
>>>>         at
>>>> org.apache.flume.formatter.output.BucketPath.replaceShorthand(BucketPath.java:200)
>>>>         at
>>>> org.apache.flume.formatter.output.BucketPath.escapeString(BucketPath.java:396)
>>>>         at
>>>> org.apache.flume.sink.hdfs.HDFSEventSink.process(HDFSEventSink.java:356)
>>>>         ... 3 more
>>>>
>>>>
>>>> I fixed my regexp now, still, I can never be sure all the log files the
>>>> system creates will be perfect without any bad lines.
>>>>
>>>>
>>>> On Sat, Aug 3, 2013 at 9:56 AM, Connor Woodson <cw...@gmail.com>wrote:
>>>>
>>>>> Just some more thoughts. It could be even easier:
>>>>>
>>>>> The whole set up might be even easier; for the failover sink
>>>>> processor, you have those settings "max attempts" and "time between
>>>>> attempts" and it will just try one event X times before it gives up and
>>>>> sends it to the next sink. The time between events could even backoff if
>>>>> needed.
>>>>>
>>>>> The advantage of this is that it preserves the ordering of events,
>>>>> something which gets completely broken in the previous scenario.
>>>>>
>>>>> - Connor
>>>>>
>>>>>
>>>>> On Fri, Aug 2, 2013 at 6:27 PM, Connor Woodson <cwoodson.dev@gmail.com
>>>>> > wrote:
>>>>>
>>>>>> As another option to solve the problem of having a bad event in a
>>>>>> channel: using a fail-over sink processor, log all bad events to a local
>>>>>> file. And to be extra cautious, add a third failover of a null sink. This
>>>>>> will mean that events will always flow through your channel. The file sink
>>>>>> should almost never fail, so you shouldn't be losing events in the process.
>>>>>> And then you can re-process everything in the file if you still want those
>>>>>> events for something.
>>>>>>
>>>>>> For the system of having Flume detect bad events, I think
>>>>>> implementing something like above is better than discarding events that
>>>>>> fail X times. For instance, if you have an Avro sink -> Avro source, and
>>>>>> you're restarting your source, Flume would end up discarding events
>>>>>> unnecessarily. Instead, how about implementing the above system and then go
>>>>>> a step further: Flume will attempt to re-send the bad events itself. And
>>>>>> then if a bad event isn't able to be sent after X attempts, it is can be
>>>>>> discarded.
>>>>>>
>>>>>> I envision this system as an extension to the current File Channel;
>>>>>> when an event fails, it is written to a secondary File Channel from which
>>>>>> events can be pulled when the main channel isn't in use. It would add
>>>>>> headers like "lastAttempt" and "numberOfAttempts" to events. Then it can be
>>>>>> configurable for a "min time between attempts" and "maximum attempts." When
>>>>>> an event fails the second time, those headers are updated and it goes back
>>>>>> into the fail-channel. If it comes out of the fail-channel but the
>>>>>> lastAttempt is too recent, it goes back in. If it fails more times than the
>>>>>> maximum, it is written to a final location (perhaps its just sent to
>>>>>> another sink; maybe this would have to be in a sink processor). Assuming
>>>>>> all of those steps are error-free, then all messages are preserved, and the
>>>>>> badly-formatted eventually get stored somewhere else. (This system could be
>>>>>> hacked together with current code - fail over sink processor -> avro sink
>>>>>> -> avro source on same instance, but that's a little too hacky).
>>>>>>
>>>>>> Just some thoughts.
>>>>>>
>>>>>> - Connor
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>> On Thu, Aug 1, 2013 at 3:25 PM, Arvind Prabhakar <ar...@apache.org>wrote:
>>>>>>
>>>>>>> This sounds like a critical problem that can cause pipelines to
>>>>>>> block permanently. If you find yourself in this situation, a possible work
>>>>>>> around would be to decommission the channel, remove its data and route the
>>>>>>> flow with a new empty channel. If you have the ability to identify which
>>>>>>> component is causing the problem and see if you can remove it temporarily
>>>>>>> to let the problem events pass through another peer component.
>>>>>>>
>>>>>>> I have also created FLUME-2140 [1] which will eventually allow the
>>>>>>> pipelines to identify and divert such bad events. If you have any logs,
>>>>>>> data, configurations that can be shared and will help provide more details
>>>>>>> for this problem, it will be great if you could attach them to this jira
>>>>>>> and provide your comments.
>>>>>>>
>>>>>>> [1] https://issues.apache.org/jira/browse/FLUME-2140
>>>>>>>
>>>>>>> Regards,
>>>>>>> Arvind Prabhakar
>>>>>>>
>>>>>>> On Thu, Aug 1, 2013 at 10:33 AM, Paul Chavez <
>>>>>>> pchavez@verticalsearchworks.com> wrote:
>>>>>>>
>>>>>>>> **
>>>>>>>> There's no way to deal with a bad event once it's in the channel,
>>>>>>>> but you can mitigate future issues by having a timestamp interceptor bound
>>>>>>>> to the source feeding the channel. There is a parameter 'preserve existing'
>>>>>>>> that will only add the header if it doesn't exist. If you don't want to
>>>>>>>> have 'bad' time data in there you could try a static interceptor with a
>>>>>>>> specific past date so that corrupt events fall into a deterministic path in
>>>>>>>> HDFS.
>>>>>>>>
>>>>>>>> I use this technique to prevent stuck events for both timestamp
>>>>>>>> headers as well as some of our own custom headers we use for tokenized
>>>>>>>> paths. The static interceptor will insert an arbitrary header if it doesn't
>>>>>>>> exist so I have a couple that put in the value 'Unknown' so that I can
>>>>>>>> still send the events through the HDFS sink but I can also find them later
>>>>>>>> if need be.
>>>>>>>>
>>>>>>>> hope that helps,
>>>>>>>> Paul Chavez
>>>>>>>>
>>>>>>>>  ------------------------------
>>>>>>>> *From:* Roshan Naik [mailto:roshan@hortonworks.com]
>>>>>>>> *Sent:* Thursday, August 01, 2013 10:27 AM
>>>>>>>> *To:* user@flume.apache.org
>>>>>>>> *Subject:* Re: Problem Events
>>>>>>>>
>>>>>>>>  some questions:
>>>>>>>> - why is the sink unable to consume the event ?
>>>>>>>> - how would you like to identify such an event ? by examining its
>>>>>>>> content ? or by the fact that its ping-pong-ing between channel and sink ?
>>>>>>>> - what would you prefer to do with such an event ? merely drop it ?
>>>>>>>>
>>>>>>>>
>>>>>>>> On Thu, Aug 1, 2013 at 9:26 AM, Jeremy Karlson <
>>>>>>>> jeremykarlson@gmail.com> wrote:
>>>>>>>>
>>>>>>>>>  To my knowledge (which is admittedly limited), there is no way
>>>>>>>>> to deal with these in a way that will make your day.  I'm happy if someone
>>>>>>>>> can say otherwise.
>>>>>>>>>
>>>>>>>>> This is very similar to a problem I had a week or two ago.  I
>>>>>>>>> fixed it by restarting Flume with debugging on, connecting to it with the
>>>>>>>>> debugger, and finding the message in the sink.  Discover a bug in the sink.
>>>>>>>>>  Downloaded Flume, fixed bug, recompiled, installed custom version, etc.
>>>>>>>>>
>>>>>>>>> I agree that this is not a practical solution, and I still believe
>>>>>>>>> that Flume needs some sort of "sink of last resort" option or something,
>>>>>>>>> like JMS implementations.
>>>>>>>>>
>>>>>>>>> -- Jeremy
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> On Thu, Aug 1, 2013 at 2:42 AM, Anat Rozenzon <an...@viber.com>wrote:
>>>>>>>>>
>>>>>>>>>>  The message is already in the channel.
>>>>>>>>>> Is there a way to write an interceptor to work after the channel?
>>>>>>>>>> or before the sink?
>>>>>>>>>>
>>>>>>>>>> The only thing I found is to stop everything and delete the
>>>>>>>>>> channel files, but I won't be able to use this approach in production :-(
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> On Thu, Aug 1, 2013 at 11:13 AM, Ashish <pa...@gmail.com>wrote:
>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>  On Thu, Aug 1, 2013 at 1:29 PM, Anat Rozenzon <an...@viber.com>wrote:
>>>>>>>>>>>
>>>>>>>>>>>>   Hi,
>>>>>>>>>>>>
>>>>>>>>>>>> I'm having the same problem with HDFS sink.
>>>>>>>>>>>>
>>>>>>>>>>>> A 'poison' message which doesn't have timestamp header in it as
>>>>>>>>>>>> the sink expects.
>>>>>>>>>>>> This causes a NPE which ends in returning the message to the
>>>>>>>>>>>> channel , over and over again.
>>>>>>>>>>>>
>>>>>>>>>>>> Is my only option to re-write the HDFS sink?
>>>>>>>>>>>> Isn't there any way to intercept in the sink work?
>>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> You can write a custom interceptor and remove/modify the poison
>>>>>>>>>>> message.
>>>>>>>>>>>
>>>>>>>>>>> Interceptors are called before message makes it way into the
>>>>>>>>>>> channel.
>>>>>>>>>>>
>>>>>>>>>>> http://flume.apache.org/FlumeUserGuide.html#flume-interceptors
>>>>>>>>>>>
>>>>>>>>>>> I wrote a blog about it a while back
>>>>>>>>>>> http://www.ashishpaliwal.com/blog/2013/06/flume-cookbook-implementing-custom-interceptors/
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> Thanks
>>>>>>>>>>>> Anat
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> On Fri, Jul 26, 2013 at 3:35 AM, Arvind Prabhakar <
>>>>>>>>>>>> arvind@apache.org> wrote:
>>>>>>>>>>>>
>>>>>>>>>>>>> Sounds like a bug in ElasticSearch sink to me. Do you mind
>>>>>>>>>>>>> filing a Jira to track this? Sample data to cause this would be even
>>>>>>>>>>>>> better.
>>>>>>>>>>>>>
>>>>>>>>>>>>> Regards,
>>>>>>>>>>>>> Arvind Prabhakar
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>> On Thu, Jul 25, 2013 at 9:50 AM, Jeremy Karlson <
>>>>>>>>>>>>> jeremykarlson@gmail.com> wrote:
>>>>>>>>>>>>>
>>>>>>>>>>>>>> This was using the provided ElasticSearch sink.  The logs
>>>>>>>>>>>>>> were not helpful.  I ran it through with the debugger and found the source
>>>>>>>>>>>>>> of the problem.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> ContentBuilderUtil uses a very "aggressive" method to
>>>>>>>>>>>>>> determine if the content is JSON; if it contains a "{" anywhere in it, it's
>>>>>>>>>>>>>> considered JSON.  My body contained that but wasn't JSON, causing the JSON
>>>>>>>>>>>>>> parser to throw a CharConversionException from addComplexField(...) (but
>>>>>>>>>>>>>> not the expected JSONException).  We've changed addComplexField(...) to
>>>>>>>>>>>>>> catch different types of exceptions and fall back to treating it as a
>>>>>>>>>>>>>> simple field.  We'll probably submit a patch for this soon.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> I'm reasonably happy with this, but I still think that in the
>>>>>>>>>>>>>> bigger picture there should be some sort of mechanism to automatically
>>>>>>>>>>>>>> detect and toss / skip / flag problematic events without them plugging up
>>>>>>>>>>>>>> the flow.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> -- Jeremy
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> On Wed, Jul 24, 2013 at 7:51 PM, Arvind Prabhakar <
>>>>>>>>>>>>>> arvind@apache.org> wrote:
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Jeremy, would it be possible for you to show us logs for the
>>>>>>>>>>>>>>> part where the sink fails to remove an event from the channel? I am
>>>>>>>>>>>>>>> assuming this is a standard sink that Flume provides and not a custom one.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> The reason I ask is because sinks do not introspect the
>>>>>>>>>>>>>>> event, and hence there is no reason why it will fail during the event's
>>>>>>>>>>>>>>> removal. It is more likely that there is a problem within the channel in
>>>>>>>>>>>>>>> that it cannot dereference the event correctly. Looking at the logs will
>>>>>>>>>>>>>>> help us identify the root cause for what you are experiencing.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Regards,
>>>>>>>>>>>>>>> Arvind Prabhakar
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> On Wed, Jul 24, 2013 at 3:56 PM, Jeremy Karlson <
>>>>>>>>>>>>>>> jeremykarlson@gmail.com> wrote:
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>  Both reasonable suggestions.  What would a custom sink
>>>>>>>>>>>>>>>> look like in this case, and how would I only eliminate the problem events
>>>>>>>>>>>>>>>> since I don't know what they are until they are attempted by the "real"
>>>>>>>>>>>>>>>> sink?
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> My philosophical concern (in general) is that we're taking
>>>>>>>>>>>>>>>> the approach of exhaustively finding and eliminating possible failure
>>>>>>>>>>>>>>>> cases.  It's not possible to eliminate every single failure case, so
>>>>>>>>>>>>>>>> shouldn't there be a method of last resort to eliminate problem events from
>>>>>>>>>>>>>>>> the channel?
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> -- Jeremy
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> On Wed, Jul 24, 2013 at 3:45 PM, Hari Shreedharan <
>>>>>>>>>>>>>>>> hshreedharan@cloudera.com> wrote:
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> Or you could write a custom sink that removes this event
>>>>>>>>>>>>>>>>> (more work of course)
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> Thanks,
>>>>>>>>>>>>>>>>> Hari
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>  On Wednesday, July 24, 2013 at 3:36 PM, Roshan Naik
>>>>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>   if you have a way to identify such events.. you may be
>>>>>>>>>>>>>>>>> able to use the Regex interceptor to toss them out before they get into the
>>>>>>>>>>>>>>>>> channel.
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> On Wed, Jul 24, 2013 at 2:52 PM, Jeremy Karlson <
>>>>>>>>>>>>>>>>> jeremykarlson@gmail.com> wrote:
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>  Hi everyone.  My Flume adventures continue.
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> I'm in a situation now where I have a channel that's
>>>>>>>>>>>>>>>>> filling because a stubborn message is stuck.  The sink won't accept it (for
>>>>>>>>>>>>>>>>> whatever reason; I can go into detail but that's not my point here).  This
>>>>>>>>>>>>>>>>> just blocks up the channel entirely, because it goes back into the channel
>>>>>>>>>>>>>>>>> when the sink refuses.  Obviously, this isn't ideal.
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> I'm wondering what mechanisms, if any, Flume has to deal
>>>>>>>>>>>>>>>>> with these situations.  Things that come to mind might be:
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> 1. Ditch the event after n attempts.
>>>>>>>>>>>>>>>>> 2. After n attempts, send the event to a "problem area"
>>>>>>>>>>>>>>>>> (maybe a different source / sink / channel?)  that someone can look at
>>>>>>>>>>>>>>>>> later.
>>>>>>>>>>>>>>>>> 3. Some sort of mechanism that allows operators to
>>>>>>>>>>>>>>>>> manually kill these messages.
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> I'm open to suggestions on alternatives as well.
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> Thanks.
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> -- Jeremy
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> --
>>>>>>>>>>> thanks
>>>>>>>>>>> ashish
>>>>>>>>>>>
>>>>>>>>>>> Blog: http://www.ashishpaliwal.com/blog
>>>>>>>>>>> My Photo Galleries: http://www.pbase.com/ashishpaliwal
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>

Re: Problem Events

Posted by Anat Rozenzon <an...@viber.com>.
That's what I'll do, add a static timestamp of 1 and let all the 'bad'
messages flow into one directoty.
Thanks


On Wed, Aug 7, 2013 at 5:14 PM, Jonathan Cooper-Ellis <jc...@cloudera.com>wrote:

> You can use a Static Interceptor before the RegexExtractor to add a
> timestamp of zero to the header, which can then be overwritten by the
> proper timestamp (if it exists). It also should sink misses into an obvious
> 'miss' directory.
>
>
> On Tue, Aug 6, 2013 at 10:40 PM, Anat Rozenzon <an...@viber.com> wrote:
>
>> After some reading in the docs I think the existing fail-over behavior
>> can't be used to solve the 'poison' message problem as it put the 'failed'
>> sink in  a 'cooldown' period.
>> As the problem is in the message and not the sink, it means that after a
>> poison message had arrived, the HDFS sink will 'fail' and thus next X
>> messages will go to the failover sink.
>> My only solution for now is to avoid my current problem and hope that I
>> won't have any other problematic messages, I'll be glad to have a less
>> fragile solution.
>>
>> Many thanks!
>> Other than that, Flume looks like a great tool :-)
>>
>> Anat
>>
>>
>> On Sun, Aug 4, 2013 at 8:45 AM, Anat Rozenzon <an...@viber.com> wrote:
>>
>>> I think using a fail-over processor is a very good idea, I think I'll
>>> use it as an immediate solution.
>>> For the long run, I would like to see a general solution (not specific
>>> to file channel, in my case it is an HDFS channel), so the suggestion to
>>> add 'Poison Message' sink to the sink processor sound good.
>>>
>>> Just FYI, my problem is that a log file going through my source did not
>>> have (in all rows) the structure I expected.
>>>
>>> Since I used regexp extractor to put timestamp, the 'bad' row didn't
>>> match the regexp and the timestamp was not set, then the HDFS sink throws
>>> NPE on that:
>>> 01 Aug 2013 09:36:24,259 ERROR
>>> [SinkRunner-PollingRunner-DefaultSinkProcessor]
>>> (org.apache.flume.sink.hdfs.HDFSEventSink.process:422)  - process failed
>>> java.lang.NullPointerException: Expected timestamp in the Flume event
>>> headers, but it was null
>>>         at
>>> com.google.common.base.Preconditions.checkNotNull(Preconditions.java:204)
>>>         at
>>> org.apache.flume.formatter.output.BucketPath.replaceShorthand(BucketPath.java:200)
>>>         at
>>> org.apache.flume.formatter.output.BucketPath.escapeString(BucketPath.java:396)
>>>         at
>>> org.apache.flume.sink.hdfs.HDFSEventSink.process(HDFSEventSink.java:356)
>>>         at
>>> org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:68)
>>>         at
>>> org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:147)
>>>         at java.lang.Thread.run(Thread.java:722)
>>> 01 Aug 2013 09:36:24,262 ERROR
>>> [SinkRunner-PollingRunner-DefaultSinkProcessor]
>>> (org.apache.flume.SinkRunner$PollingRunner.run:160)  - Unable to deliver
>>> event. Exception follows.
>>> org.apache.flume.EventDeliveryException: java.lang.NullPointerException:
>>> Expected timestamp in the Flume event headers, but it was null
>>>         at
>>> org.apache.flume.sink.hdfs.HDFSEventSink.process(HDFSEventSink.java:426)
>>>         at
>>> org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:68)
>>>         at
>>> org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:147)
>>>         at java.lang.Thread.run(Thread.java:722)
>>> Caused by: java.lang.NullPointerException: Expected timestamp in the
>>> Flume event headers, but it was null
>>>         at
>>> com.google.common.base.Preconditions.checkNotNull(Preconditions.java:204)
>>>         at
>>> org.apache.flume.formatter.output.BucketPath.replaceShorthand(BucketPath.java:200)
>>>         at
>>> org.apache.flume.formatter.output.BucketPath.escapeString(BucketPath.java:396)
>>>         at
>>> org.apache.flume.sink.hdfs.HDFSEventSink.process(HDFSEventSink.java:356)
>>>         ... 3 more
>>>
>>>
>>> I fixed my regexp now, still, I can never be sure all the log files the
>>> system creates will be perfect without any bad lines.
>>>
>>>
>>> On Sat, Aug 3, 2013 at 9:56 AM, Connor Woodson <cw...@gmail.com>wrote:
>>>
>>>> Just some more thoughts. It could be even easier:
>>>>
>>>> The whole set up might be even easier; for the failover sink processor,
>>>> you have those settings "max attempts" and "time between attempts" and it
>>>> will just try one event X times before it gives up and sends it to the next
>>>> sink. The time between events could even backoff if needed.
>>>>
>>>> The advantage of this is that it preserves the ordering of events,
>>>> something which gets completely broken in the previous scenario.
>>>>
>>>> - Connor
>>>>
>>>>
>>>> On Fri, Aug 2, 2013 at 6:27 PM, Connor Woodson <cw...@gmail.com>wrote:
>>>>
>>>>> As another option to solve the problem of having a bad event in a
>>>>> channel: using a fail-over sink processor, log all bad events to a local
>>>>> file. And to be extra cautious, add a third failover of a null sink. This
>>>>> will mean that events will always flow through your channel. The file sink
>>>>> should almost never fail, so you shouldn't be losing events in the process.
>>>>> And then you can re-process everything in the file if you still want those
>>>>> events for something.
>>>>>
>>>>> For the system of having Flume detect bad events, I think implementing
>>>>> something like above is better than discarding events that fail X times.
>>>>> For instance, if you have an Avro sink -> Avro source, and you're
>>>>> restarting your source, Flume would end up discarding events unnecessarily.
>>>>> Instead, how about implementing the above system and then go a step
>>>>> further: Flume will attempt to re-send the bad events itself. And then if a
>>>>> bad event isn't able to be sent after X attempts, it is can be discarded.
>>>>>
>>>>> I envision this system as an extension to the current File Channel;
>>>>> when an event fails, it is written to a secondary File Channel from which
>>>>> events can be pulled when the main channel isn't in use. It would add
>>>>> headers like "lastAttempt" and "numberOfAttempts" to events. Then it can be
>>>>> configurable for a "min time between attempts" and "maximum attempts." When
>>>>> an event fails the second time, those headers are updated and it goes back
>>>>> into the fail-channel. If it comes out of the fail-channel but the
>>>>> lastAttempt is too recent, it goes back in. If it fails more times than the
>>>>> maximum, it is written to a final location (perhaps its just sent to
>>>>> another sink; maybe this would have to be in a sink processor). Assuming
>>>>> all of those steps are error-free, then all messages are preserved, and the
>>>>> badly-formatted eventually get stored somewhere else. (This system could be
>>>>> hacked together with current code - fail over sink processor -> avro sink
>>>>> -> avro source on same instance, but that's a little too hacky).
>>>>>
>>>>> Just some thoughts.
>>>>>
>>>>> - Connor
>>>>>
>>>>>
>>>>>
>>>>>
>>>>> On Thu, Aug 1, 2013 at 3:25 PM, Arvind Prabhakar <ar...@apache.org>wrote:
>>>>>
>>>>>> This sounds like a critical problem that can cause pipelines to block
>>>>>> permanently. If you find yourself in this situation, a possible work around
>>>>>> would be to decommission the channel, remove its data and route the flow
>>>>>> with a new empty channel. If you have the ability to identify which
>>>>>> component is causing the problem and see if you can remove it temporarily
>>>>>> to let the problem events pass through another peer component.
>>>>>>
>>>>>> I have also created FLUME-2140 [1] which will eventually allow the
>>>>>> pipelines to identify and divert such bad events. If you have any logs,
>>>>>> data, configurations that can be shared and will help provide more details
>>>>>> for this problem, it will be great if you could attach them to this jira
>>>>>> and provide your comments.
>>>>>>
>>>>>> [1] https://issues.apache.org/jira/browse/FLUME-2140
>>>>>>
>>>>>> Regards,
>>>>>> Arvind Prabhakar
>>>>>>
>>>>>> On Thu, Aug 1, 2013 at 10:33 AM, Paul Chavez <
>>>>>> pchavez@verticalsearchworks.com> wrote:
>>>>>>
>>>>>>> **
>>>>>>> There's no way to deal with a bad event once it's in the channel,
>>>>>>> but you can mitigate future issues by having a timestamp interceptor bound
>>>>>>> to the source feeding the channel. There is a parameter 'preserve existing'
>>>>>>> that will only add the header if it doesn't exist. If you don't want to
>>>>>>> have 'bad' time data in there you could try a static interceptor with a
>>>>>>> specific past date so that corrupt events fall into a deterministic path in
>>>>>>> HDFS.
>>>>>>>
>>>>>>> I use this technique to prevent stuck events for both timestamp
>>>>>>> headers as well as some of our own custom headers we use for tokenized
>>>>>>> paths. The static interceptor will insert an arbitrary header if it doesn't
>>>>>>> exist so I have a couple that put in the value 'Unknown' so that I can
>>>>>>> still send the events through the HDFS sink but I can also find them later
>>>>>>> if need be.
>>>>>>>
>>>>>>> hope that helps,
>>>>>>> Paul Chavez
>>>>>>>
>>>>>>>  ------------------------------
>>>>>>> *From:* Roshan Naik [mailto:roshan@hortonworks.com]
>>>>>>> *Sent:* Thursday, August 01, 2013 10:27 AM
>>>>>>> *To:* user@flume.apache.org
>>>>>>> *Subject:* Re: Problem Events
>>>>>>>
>>>>>>>  some questions:
>>>>>>> - why is the sink unable to consume the event ?
>>>>>>> - how would you like to identify such an event ? by examining its
>>>>>>> content ? or by the fact that its ping-pong-ing between channel and sink ?
>>>>>>> - what would you prefer to do with such an event ? merely drop it ?
>>>>>>>
>>>>>>>
>>>>>>> On Thu, Aug 1, 2013 at 9:26 AM, Jeremy Karlson <
>>>>>>> jeremykarlson@gmail.com> wrote:
>>>>>>>
>>>>>>>>  To my knowledge (which is admittedly limited), there is no way to
>>>>>>>> deal with these in a way that will make your day.  I'm happy if someone can
>>>>>>>> say otherwise.
>>>>>>>>
>>>>>>>> This is very similar to a problem I had a week or two ago.  I fixed
>>>>>>>> it by restarting Flume with debugging on, connecting to it with the
>>>>>>>> debugger, and finding the message in the sink.  Discover a bug in the sink.
>>>>>>>>  Downloaded Flume, fixed bug, recompiled, installed custom version, etc.
>>>>>>>>
>>>>>>>> I agree that this is not a practical solution, and I still believe
>>>>>>>> that Flume needs some sort of "sink of last resort" option or something,
>>>>>>>> like JMS implementations.
>>>>>>>>
>>>>>>>> -- Jeremy
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> On Thu, Aug 1, 2013 at 2:42 AM, Anat Rozenzon <an...@viber.com>wrote:
>>>>>>>>
>>>>>>>>>  The message is already in the channel.
>>>>>>>>> Is there a way to write an interceptor to work after the channel?
>>>>>>>>> or before the sink?
>>>>>>>>>
>>>>>>>>> The only thing I found is to stop everything and delete the
>>>>>>>>> channel files, but I won't be able to use this approach in production :-(
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> On Thu, Aug 1, 2013 at 11:13 AM, Ashish <pa...@gmail.com>wrote:
>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>  On Thu, Aug 1, 2013 at 1:29 PM, Anat Rozenzon <an...@viber.com>wrote:
>>>>>>>>>>
>>>>>>>>>>>   Hi,
>>>>>>>>>>>
>>>>>>>>>>> I'm having the same problem with HDFS sink.
>>>>>>>>>>>
>>>>>>>>>>> A 'poison' message which doesn't have timestamp header in it as
>>>>>>>>>>> the sink expects.
>>>>>>>>>>> This causes a NPE which ends in returning the message to the
>>>>>>>>>>> channel , over and over again.
>>>>>>>>>>>
>>>>>>>>>>> Is my only option to re-write the HDFS sink?
>>>>>>>>>>> Isn't there any way to intercept in the sink work?
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> You can write a custom interceptor and remove/modify the poison
>>>>>>>>>> message.
>>>>>>>>>>
>>>>>>>>>> Interceptors are called before message makes it way into the
>>>>>>>>>> channel.
>>>>>>>>>>
>>>>>>>>>> http://flume.apache.org/FlumeUserGuide.html#flume-interceptors
>>>>>>>>>>
>>>>>>>>>> I wrote a blog about it a while back
>>>>>>>>>> http://www.ashishpaliwal.com/blog/2013/06/flume-cookbook-implementing-custom-interceptors/
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> Thanks
>>>>>>>>>>> Anat
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> On Fri, Jul 26, 2013 at 3:35 AM, Arvind Prabhakar <
>>>>>>>>>>> arvind@apache.org> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> Sounds like a bug in ElasticSearch sink to me. Do you mind
>>>>>>>>>>>> filing a Jira to track this? Sample data to cause this would be even
>>>>>>>>>>>> better.
>>>>>>>>>>>>
>>>>>>>>>>>> Regards,
>>>>>>>>>>>> Arvind Prabhakar
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> On Thu, Jul 25, 2013 at 9:50 AM, Jeremy Karlson <
>>>>>>>>>>>> jeremykarlson@gmail.com> wrote:
>>>>>>>>>>>>
>>>>>>>>>>>>> This was using the provided ElasticSearch sink.  The logs were
>>>>>>>>>>>>> not helpful.  I ran it through with the debugger and found the source of
>>>>>>>>>>>>> the problem.
>>>>>>>>>>>>>
>>>>>>>>>>>>> ContentBuilderUtil uses a very "aggressive" method to
>>>>>>>>>>>>> determine if the content is JSON; if it contains a "{" anywhere in it, it's
>>>>>>>>>>>>> considered JSON.  My body contained that but wasn't JSON, causing the JSON
>>>>>>>>>>>>> parser to throw a CharConversionException from addComplexField(...) (but
>>>>>>>>>>>>> not the expected JSONException).  We've changed addComplexField(...) to
>>>>>>>>>>>>> catch different types of exceptions and fall back to treating it as a
>>>>>>>>>>>>> simple field.  We'll probably submit a patch for this soon.
>>>>>>>>>>>>>
>>>>>>>>>>>>> I'm reasonably happy with this, but I still think that in the
>>>>>>>>>>>>> bigger picture there should be some sort of mechanism to automatically
>>>>>>>>>>>>> detect and toss / skip / flag problematic events without them plugging up
>>>>>>>>>>>>> the flow.
>>>>>>>>>>>>>
>>>>>>>>>>>>> -- Jeremy
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>> On Wed, Jul 24, 2013 at 7:51 PM, Arvind Prabhakar <
>>>>>>>>>>>>> arvind@apache.org> wrote:
>>>>>>>>>>>>>
>>>>>>>>>>>>>> Jeremy, would it be possible for you to show us logs for the
>>>>>>>>>>>>>> part where the sink fails to remove an event from the channel? I am
>>>>>>>>>>>>>> assuming this is a standard sink that Flume provides and not a custom one.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> The reason I ask is because sinks do not introspect the
>>>>>>>>>>>>>> event, and hence there is no reason why it will fail during the event's
>>>>>>>>>>>>>> removal. It is more likely that there is a problem within the channel in
>>>>>>>>>>>>>> that it cannot dereference the event correctly. Looking at the logs will
>>>>>>>>>>>>>> help us identify the root cause for what you are experiencing.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Regards,
>>>>>>>>>>>>>> Arvind Prabhakar
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> On Wed, Jul 24, 2013 at 3:56 PM, Jeremy Karlson <
>>>>>>>>>>>>>> jeremykarlson@gmail.com> wrote:
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>  Both reasonable suggestions.  What would a custom sink
>>>>>>>>>>>>>>> look like in this case, and how would I only eliminate the problem events
>>>>>>>>>>>>>>> since I don't know what they are until they are attempted by the "real"
>>>>>>>>>>>>>>> sink?
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> My philosophical concern (in general) is that we're taking
>>>>>>>>>>>>>>> the approach of exhaustively finding and eliminating possible failure
>>>>>>>>>>>>>>> cases.  It's not possible to eliminate every single failure case, so
>>>>>>>>>>>>>>> shouldn't there be a method of last resort to eliminate problem events from
>>>>>>>>>>>>>>> the channel?
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> -- Jeremy
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> On Wed, Jul 24, 2013 at 3:45 PM, Hari Shreedharan <
>>>>>>>>>>>>>>> hshreedharan@cloudera.com> wrote:
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Or you could write a custom sink that removes this event
>>>>>>>>>>>>>>>> (more work of course)
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Thanks,
>>>>>>>>>>>>>>>> Hari
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>  On Wednesday, July 24, 2013 at 3:36 PM, Roshan Naik wrote:
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>   if you have a way to identify such events.. you may be
>>>>>>>>>>>>>>>> able to use the Regex interceptor to toss them out before they get into the
>>>>>>>>>>>>>>>> channel.
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> On Wed, Jul 24, 2013 at 2:52 PM, Jeremy Karlson <
>>>>>>>>>>>>>>>> jeremykarlson@gmail.com> wrote:
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>  Hi everyone.  My Flume adventures continue.
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> I'm in a situation now where I have a channel that's
>>>>>>>>>>>>>>>> filling because a stubborn message is stuck.  The sink won't accept it (for
>>>>>>>>>>>>>>>> whatever reason; I can go into detail but that's not my point here).  This
>>>>>>>>>>>>>>>> just blocks up the channel entirely, because it goes back into the channel
>>>>>>>>>>>>>>>> when the sink refuses.  Obviously, this isn't ideal.
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> I'm wondering what mechanisms, if any, Flume has to deal
>>>>>>>>>>>>>>>> with these situations.  Things that come to mind might be:
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> 1. Ditch the event after n attempts.
>>>>>>>>>>>>>>>> 2. After n attempts, send the event to a "problem area"
>>>>>>>>>>>>>>>> (maybe a different source / sink / channel?)  that someone can look at
>>>>>>>>>>>>>>>> later.
>>>>>>>>>>>>>>>> 3. Some sort of mechanism that allows operators to manually
>>>>>>>>>>>>>>>> kill these messages.
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> I'm open to suggestions on alternatives as well.
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Thanks.
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> -- Jeremy
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> --
>>>>>>>>>> thanks
>>>>>>>>>> ashish
>>>>>>>>>>
>>>>>>>>>> Blog: http://www.ashishpaliwal.com/blog
>>>>>>>>>> My Photo Galleries: http://www.pbase.com/ashishpaliwal
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>

Re: Problem Events

Posted by Jonathan Cooper-Ellis <jc...@cloudera.com>.
You can use a Static Interceptor before the RegexExtractor to add a
timestamp of zero to the header, which can then be overwritten by the
proper timestamp (if it exists). It also should sink misses into an obvious
'miss' directory.


On Tue, Aug 6, 2013 at 10:40 PM, Anat Rozenzon <an...@viber.com> wrote:

> After some reading in the docs I think the existing fail-over behavior
> can't be used to solve the 'poison' message problem as it put the 'failed'
> sink in  a 'cooldown' period.
> As the problem is in the message and not the sink, it means that after a
> poison message had arrived, the HDFS sink will 'fail' and thus next X
> messages will go to the failover sink.
> My only solution for now is to avoid my current problem and hope that I
> won't have any other problematic messages, I'll be glad to have a less
> fragile solution.
>
> Many thanks!
> Other than that, Flume looks like a great tool :-)
>
> Anat
>
>
> On Sun, Aug 4, 2013 at 8:45 AM, Anat Rozenzon <an...@viber.com> wrote:
>
>> I think using a fail-over processor is a very good idea, I think I'll use
>> it as an immediate solution.
>> For the long run, I would like to see a general solution (not specific to
>> file channel, in my case it is an HDFS channel), so the suggestion to add
>> 'Poison Message' sink to the sink processor sound good.
>>
>> Just FYI, my problem is that a log file going through my source did not
>> have (in all rows) the structure I expected.
>>
>> Since I used regexp extractor to put timestamp, the 'bad' row didn't
>> match the regexp and the timestamp was not set, then the HDFS sink throws
>> NPE on that:
>> 01 Aug 2013 09:36:24,259 ERROR
>> [SinkRunner-PollingRunner-DefaultSinkProcessor]
>> (org.apache.flume.sink.hdfs.HDFSEventSink.process:422)  - process failed
>> java.lang.NullPointerException: Expected timestamp in the Flume event
>> headers, but it was null
>>         at
>> com.google.common.base.Preconditions.checkNotNull(Preconditions.java:204)
>>         at
>> org.apache.flume.formatter.output.BucketPath.replaceShorthand(BucketPath.java:200)
>>         at
>> org.apache.flume.formatter.output.BucketPath.escapeString(BucketPath.java:396)
>>         at
>> org.apache.flume.sink.hdfs.HDFSEventSink.process(HDFSEventSink.java:356)
>>         at
>> org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:68)
>>         at
>> org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:147)
>>         at java.lang.Thread.run(Thread.java:722)
>> 01 Aug 2013 09:36:24,262 ERROR
>> [SinkRunner-PollingRunner-DefaultSinkProcessor]
>> (org.apache.flume.SinkRunner$PollingRunner.run:160)  - Unable to deliver
>> event. Exception follows.
>> org.apache.flume.EventDeliveryException: java.lang.NullPointerException:
>> Expected timestamp in the Flume event headers, but it was null
>>         at
>> org.apache.flume.sink.hdfs.HDFSEventSink.process(HDFSEventSink.java:426)
>>         at
>> org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:68)
>>         at
>> org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:147)
>>         at java.lang.Thread.run(Thread.java:722)
>> Caused by: java.lang.NullPointerException: Expected timestamp in the
>> Flume event headers, but it was null
>>         at
>> com.google.common.base.Preconditions.checkNotNull(Preconditions.java:204)
>>         at
>> org.apache.flume.formatter.output.BucketPath.replaceShorthand(BucketPath.java:200)
>>         at
>> org.apache.flume.formatter.output.BucketPath.escapeString(BucketPath.java:396)
>>         at
>> org.apache.flume.sink.hdfs.HDFSEventSink.process(HDFSEventSink.java:356)
>>         ... 3 more
>>
>>
>> I fixed my regexp now, still, I can never be sure all the log files the
>> system creates will be perfect without any bad lines.
>>
>>
>> On Sat, Aug 3, 2013 at 9:56 AM, Connor Woodson <cw...@gmail.com>wrote:
>>
>>> Just some more thoughts. It could be even easier:
>>>
>>> The whole set up might be even easier; for the failover sink processor,
>>> you have those settings "max attempts" and "time between attempts" and it
>>> will just try one event X times before it gives up and sends it to the next
>>> sink. The time between events could even backoff if needed.
>>>
>>> The advantage of this is that it preserves the ordering of events,
>>> something which gets completely broken in the previous scenario.
>>>
>>> - Connor
>>>
>>>
>>> On Fri, Aug 2, 2013 at 6:27 PM, Connor Woodson <cw...@gmail.com>wrote:
>>>
>>>> As another option to solve the problem of having a bad event in a
>>>> channel: using a fail-over sink processor, log all bad events to a local
>>>> file. And to be extra cautious, add a third failover of a null sink. This
>>>> will mean that events will always flow through your channel. The file sink
>>>> should almost never fail, so you shouldn't be losing events in the process.
>>>> And then you can re-process everything in the file if you still want those
>>>> events for something.
>>>>
>>>> For the system of having Flume detect bad events, I think implementing
>>>> something like above is better than discarding events that fail X times.
>>>> For instance, if you have an Avro sink -> Avro source, and you're
>>>> restarting your source, Flume would end up discarding events unnecessarily.
>>>> Instead, how about implementing the above system and then go a step
>>>> further: Flume will attempt to re-send the bad events itself. And then if a
>>>> bad event isn't able to be sent after X attempts, it is can be discarded.
>>>>
>>>> I envision this system as an extension to the current File Channel;
>>>> when an event fails, it is written to a secondary File Channel from which
>>>> events can be pulled when the main channel isn't in use. It would add
>>>> headers like "lastAttempt" and "numberOfAttempts" to events. Then it can be
>>>> configurable for a "min time between attempts" and "maximum attempts." When
>>>> an event fails the second time, those headers are updated and it goes back
>>>> into the fail-channel. If it comes out of the fail-channel but the
>>>> lastAttempt is too recent, it goes back in. If it fails more times than the
>>>> maximum, it is written to a final location (perhaps its just sent to
>>>> another sink; maybe this would have to be in a sink processor). Assuming
>>>> all of those steps are error-free, then all messages are preserved, and the
>>>> badly-formatted eventually get stored somewhere else. (This system could be
>>>> hacked together with current code - fail over sink processor -> avro sink
>>>> -> avro source on same instance, but that's a little too hacky).
>>>>
>>>> Just some thoughts.
>>>>
>>>> - Connor
>>>>
>>>>
>>>>
>>>>
>>>> On Thu, Aug 1, 2013 at 3:25 PM, Arvind Prabhakar <ar...@apache.org>wrote:
>>>>
>>>>> This sounds like a critical problem that can cause pipelines to block
>>>>> permanently. If you find yourself in this situation, a possible work around
>>>>> would be to decommission the channel, remove its data and route the flow
>>>>> with a new empty channel. If you have the ability to identify which
>>>>> component is causing the problem and see if you can remove it temporarily
>>>>> to let the problem events pass through another peer component.
>>>>>
>>>>> I have also created FLUME-2140 [1] which will eventually allow the
>>>>> pipelines to identify and divert such bad events. If you have any logs,
>>>>> data, configurations that can be shared and will help provide more details
>>>>> for this problem, it will be great if you could attach them to this jira
>>>>> and provide your comments.
>>>>>
>>>>> [1] https://issues.apache.org/jira/browse/FLUME-2140
>>>>>
>>>>> Regards,
>>>>> Arvind Prabhakar
>>>>>
>>>>> On Thu, Aug 1, 2013 at 10:33 AM, Paul Chavez <
>>>>> pchavez@verticalsearchworks.com> wrote:
>>>>>
>>>>>> **
>>>>>> There's no way to deal with a bad event once it's in the channel, but
>>>>>> you can mitigate future issues by having a timestamp interceptor bound to
>>>>>> the source feeding the channel. There is a parameter 'preserve existing'
>>>>>> that will only add the header if it doesn't exist. If you don't want to
>>>>>> have 'bad' time data in there you could try a static interceptor with a
>>>>>> specific past date so that corrupt events fall into a deterministic path in
>>>>>> HDFS.
>>>>>>
>>>>>> I use this technique to prevent stuck events for both timestamp
>>>>>> headers as well as some of our own custom headers we use for tokenized
>>>>>> paths. The static interceptor will insert an arbitrary header if it doesn't
>>>>>> exist so I have a couple that put in the value 'Unknown' so that I can
>>>>>> still send the events through the HDFS sink but I can also find them later
>>>>>> if need be.
>>>>>>
>>>>>> hope that helps,
>>>>>> Paul Chavez
>>>>>>
>>>>>>  ------------------------------
>>>>>> *From:* Roshan Naik [mailto:roshan@hortonworks.com]
>>>>>> *Sent:* Thursday, August 01, 2013 10:27 AM
>>>>>> *To:* user@flume.apache.org
>>>>>> *Subject:* Re: Problem Events
>>>>>>
>>>>>>  some questions:
>>>>>> - why is the sink unable to consume the event ?
>>>>>> - how would you like to identify such an event ? by examining its
>>>>>> content ? or by the fact that its ping-pong-ing between channel and sink ?
>>>>>> - what would you prefer to do with such an event ? merely drop it ?
>>>>>>
>>>>>>
>>>>>> On Thu, Aug 1, 2013 at 9:26 AM, Jeremy Karlson <
>>>>>> jeremykarlson@gmail.com> wrote:
>>>>>>
>>>>>>>  To my knowledge (which is admittedly limited), there is no way to
>>>>>>> deal with these in a way that will make your day.  I'm happy if someone can
>>>>>>> say otherwise.
>>>>>>>
>>>>>>> This is very similar to a problem I had a week or two ago.  I fixed
>>>>>>> it by restarting Flume with debugging on, connecting to it with the
>>>>>>> debugger, and finding the message in the sink.  Discover a bug in the sink.
>>>>>>>  Downloaded Flume, fixed bug, recompiled, installed custom version, etc.
>>>>>>>
>>>>>>> I agree that this is not a practical solution, and I still believe
>>>>>>> that Flume needs some sort of "sink of last resort" option or something,
>>>>>>> like JMS implementations.
>>>>>>>
>>>>>>> -- Jeremy
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> On Thu, Aug 1, 2013 at 2:42 AM, Anat Rozenzon <an...@viber.com>wrote:
>>>>>>>
>>>>>>>>  The message is already in the channel.
>>>>>>>> Is there a way to write an interceptor to work after the channel?
>>>>>>>> or before the sink?
>>>>>>>>
>>>>>>>> The only thing I found is to stop everything and delete the channel
>>>>>>>> files, but I won't be able to use this approach in production :-(
>>>>>>>>
>>>>>>>>
>>>>>>>> On Thu, Aug 1, 2013 at 11:13 AM, Ashish <pa...@gmail.com>wrote:
>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>  On Thu, Aug 1, 2013 at 1:29 PM, Anat Rozenzon <an...@viber.com>wrote:
>>>>>>>>>
>>>>>>>>>>   Hi,
>>>>>>>>>>
>>>>>>>>>> I'm having the same problem with HDFS sink.
>>>>>>>>>>
>>>>>>>>>> A 'poison' message which doesn't have timestamp header in it as
>>>>>>>>>> the sink expects.
>>>>>>>>>> This causes a NPE which ends in returning the message to the
>>>>>>>>>> channel , over and over again.
>>>>>>>>>>
>>>>>>>>>> Is my only option to re-write the HDFS sink?
>>>>>>>>>> Isn't there any way to intercept in the sink work?
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>> You can write a custom interceptor and remove/modify the poison
>>>>>>>>> message.
>>>>>>>>>
>>>>>>>>> Interceptors are called before message makes it way into the
>>>>>>>>> channel.
>>>>>>>>>
>>>>>>>>> http://flume.apache.org/FlumeUserGuide.html#flume-interceptors
>>>>>>>>>
>>>>>>>>> I wrote a blog about it a while back
>>>>>>>>> http://www.ashishpaliwal.com/blog/2013/06/flume-cookbook-implementing-custom-interceptors/
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> Thanks
>>>>>>>>>> Anat
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> On Fri, Jul 26, 2013 at 3:35 AM, Arvind Prabhakar <
>>>>>>>>>> arvind@apache.org> wrote:
>>>>>>>>>>
>>>>>>>>>>> Sounds like a bug in ElasticSearch sink to me. Do you mind
>>>>>>>>>>> filing a Jira to track this? Sample data to cause this would be even
>>>>>>>>>>> better.
>>>>>>>>>>>
>>>>>>>>>>> Regards,
>>>>>>>>>>> Arvind Prabhakar
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> On Thu, Jul 25, 2013 at 9:50 AM, Jeremy Karlson <
>>>>>>>>>>> jeremykarlson@gmail.com> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> This was using the provided ElasticSearch sink.  The logs were
>>>>>>>>>>>> not helpful.  I ran it through with the debugger and found the source of
>>>>>>>>>>>> the problem.
>>>>>>>>>>>>
>>>>>>>>>>>> ContentBuilderUtil uses a very "aggressive" method to determine
>>>>>>>>>>>> if the content is JSON; if it contains a "{" anywhere in it, it's
>>>>>>>>>>>> considered JSON.  My body contained that but wasn't JSON, causing the JSON
>>>>>>>>>>>> parser to throw a CharConversionException from addComplexField(...) (but
>>>>>>>>>>>> not the expected JSONException).  We've changed addComplexField(...) to
>>>>>>>>>>>> catch different types of exceptions and fall back to treating it as a
>>>>>>>>>>>> simple field.  We'll probably submit a patch for this soon.
>>>>>>>>>>>>
>>>>>>>>>>>> I'm reasonably happy with this, but I still think that in the
>>>>>>>>>>>> bigger picture there should be some sort of mechanism to automatically
>>>>>>>>>>>> detect and toss / skip / flag problematic events without them plugging up
>>>>>>>>>>>> the flow.
>>>>>>>>>>>>
>>>>>>>>>>>> -- Jeremy
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> On Wed, Jul 24, 2013 at 7:51 PM, Arvind Prabhakar <
>>>>>>>>>>>> arvind@apache.org> wrote:
>>>>>>>>>>>>
>>>>>>>>>>>>> Jeremy, would it be possible for you to show us logs for the
>>>>>>>>>>>>> part where the sink fails to remove an event from the channel? I am
>>>>>>>>>>>>> assuming this is a standard sink that Flume provides and not a custom one.
>>>>>>>>>>>>>
>>>>>>>>>>>>> The reason I ask is because sinks do not introspect the event,
>>>>>>>>>>>>> and hence there is no reason why it will fail during the event's removal.
>>>>>>>>>>>>> It is more likely that there is a problem within the channel in that it
>>>>>>>>>>>>> cannot dereference the event correctly. Looking at the logs will help us
>>>>>>>>>>>>> identify the root cause for what you are experiencing.
>>>>>>>>>>>>>
>>>>>>>>>>>>> Regards,
>>>>>>>>>>>>> Arvind Prabhakar
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>> On Wed, Jul 24, 2013 at 3:56 PM, Jeremy Karlson <
>>>>>>>>>>>>> jeremykarlson@gmail.com> wrote:
>>>>>>>>>>>>>
>>>>>>>>>>>>>>  Both reasonable suggestions.  What would a custom sink look
>>>>>>>>>>>>>> like in this case, and how would I only eliminate the problem events since
>>>>>>>>>>>>>> I don't know what they are until they are attempted by the "real" sink?
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> My philosophical concern (in general) is that we're taking
>>>>>>>>>>>>>> the approach of exhaustively finding and eliminating possible failure
>>>>>>>>>>>>>> cases.  It's not possible to eliminate every single failure case, so
>>>>>>>>>>>>>> shouldn't there be a method of last resort to eliminate problem events from
>>>>>>>>>>>>>> the channel?
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> -- Jeremy
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> On Wed, Jul 24, 2013 at 3:45 PM, Hari Shreedharan <
>>>>>>>>>>>>>> hshreedharan@cloudera.com> wrote:
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Or you could write a custom sink that removes this event
>>>>>>>>>>>>>>> (more work of course)
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Thanks,
>>>>>>>>>>>>>>> Hari
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>  On Wednesday, July 24, 2013 at 3:36 PM, Roshan Naik wrote:
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>   if you have a way to identify such events.. you may be
>>>>>>>>>>>>>>> able to use the Regex interceptor to toss them out before they get into the
>>>>>>>>>>>>>>> channel.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> On Wed, Jul 24, 2013 at 2:52 PM, Jeremy Karlson <
>>>>>>>>>>>>>>> jeremykarlson@gmail.com> wrote:
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>  Hi everyone.  My Flume adventures continue.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> I'm in a situation now where I have a channel that's filling
>>>>>>>>>>>>>>> because a stubborn message is stuck.  The sink won't accept it (for
>>>>>>>>>>>>>>> whatever reason; I can go into detail but that's not my point here).  This
>>>>>>>>>>>>>>> just blocks up the channel entirely, because it goes back into the channel
>>>>>>>>>>>>>>> when the sink refuses.  Obviously, this isn't ideal.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> I'm wondering what mechanisms, if any, Flume has to deal
>>>>>>>>>>>>>>> with these situations.  Things that come to mind might be:
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> 1. Ditch the event after n attempts.
>>>>>>>>>>>>>>> 2. After n attempts, send the event to a "problem area"
>>>>>>>>>>>>>>> (maybe a different source / sink / channel?)  that someone can look at
>>>>>>>>>>>>>>> later.
>>>>>>>>>>>>>>> 3. Some sort of mechanism that allows operators to manually
>>>>>>>>>>>>>>> kill these messages.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> I'm open to suggestions on alternatives as well.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Thanks.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> -- Jeremy
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> --
>>>>>>>>> thanks
>>>>>>>>> ashish
>>>>>>>>>
>>>>>>>>> Blog: http://www.ashishpaliwal.com/blog
>>>>>>>>> My Photo Galleries: http://www.pbase.com/ashishpaliwal
>>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>

Re: Problem Events

Posted by Anat Rozenzon <an...@viber.com>.
After some reading in the docs I think the existing fail-over behavior
can't be used to solve the 'poison' message problem as it put the 'failed'
sink in  a 'cooldown' period.
As the problem is in the message and not the sink, it means that after a
poison message had arrived, the HDFS sink will 'fail' and thus next X
messages will go to the failover sink.
My only solution for now is to avoid my current problem and hope that I
won't have any other problematic messages, I'll be glad to have a less
fragile solution.

Many thanks!
Other than that, Flume looks like a great tool :-)

Anat


On Sun, Aug 4, 2013 at 8:45 AM, Anat Rozenzon <an...@viber.com> wrote:

> I think using a fail-over processor is a very good idea, I think I'll use
> it as an immediate solution.
> For the long run, I would like to see a general solution (not specific to
> file channel, in my case it is an HDFS channel), so the suggestion to add
> 'Poison Message' sink to the sink processor sound good.
>
> Just FYI, my problem is that a log file going through my source did not
> have (in all rows) the structure I expected.
>
> Since I used regexp extractor to put timestamp, the 'bad' row didn't match
> the regexp and the timestamp was not set, then the HDFS sink throws NPE on
> that:
> 01 Aug 2013 09:36:24,259 ERROR
> [SinkRunner-PollingRunner-DefaultSinkProcessor]
> (org.apache.flume.sink.hdfs.HDFSEventSink.process:422)  - process failed
> java.lang.NullPointerException: Expected timestamp in the Flume event
> headers, but it was null
>         at
> com.google.common.base.Preconditions.checkNotNull(Preconditions.java:204)
>         at
> org.apache.flume.formatter.output.BucketPath.replaceShorthand(BucketPath.java:200)
>         at
> org.apache.flume.formatter.output.BucketPath.escapeString(BucketPath.java:396)
>         at
> org.apache.flume.sink.hdfs.HDFSEventSink.process(HDFSEventSink.java:356)
>         at
> org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:68)
>         at
> org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:147)
>         at java.lang.Thread.run(Thread.java:722)
> 01 Aug 2013 09:36:24,262 ERROR
> [SinkRunner-PollingRunner-DefaultSinkProcessor]
> (org.apache.flume.SinkRunner$PollingRunner.run:160)  - Unable to deliver
> event. Exception follows.
> org.apache.flume.EventDeliveryException: java.lang.NullPointerException:
> Expected timestamp in the Flume event headers, but it was null
>         at
> org.apache.flume.sink.hdfs.HDFSEventSink.process(HDFSEventSink.java:426)
>         at
> org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:68)
>         at
> org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:147)
>         at java.lang.Thread.run(Thread.java:722)
> Caused by: java.lang.NullPointerException: Expected timestamp in the Flume
> event headers, but it was null
>         at
> com.google.common.base.Preconditions.checkNotNull(Preconditions.java:204)
>         at
> org.apache.flume.formatter.output.BucketPath.replaceShorthand(BucketPath.java:200)
>         at
> org.apache.flume.formatter.output.BucketPath.escapeString(BucketPath.java:396)
>         at
> org.apache.flume.sink.hdfs.HDFSEventSink.process(HDFSEventSink.java:356)
>         ... 3 more
>
>
> I fixed my regexp now, still, I can never be sure all the log files the
> system creates will be perfect without any bad lines.
>
>
> On Sat, Aug 3, 2013 at 9:56 AM, Connor Woodson <cw...@gmail.com>wrote:
>
>> Just some more thoughts. It could be even easier:
>>
>> The whole set up might be even easier; for the failover sink processor,
>> you have those settings "max attempts" and "time between attempts" and it
>> will just try one event X times before it gives up and sends it to the next
>> sink. The time between events could even backoff if needed.
>>
>> The advantage of this is that it preserves the ordering of events,
>> something which gets completely broken in the previous scenario.
>>
>> - Connor
>>
>>
>> On Fri, Aug 2, 2013 at 6:27 PM, Connor Woodson <cw...@gmail.com>wrote:
>>
>>> As another option to solve the problem of having a bad event in a
>>> channel: using a fail-over sink processor, log all bad events to a local
>>> file. And to be extra cautious, add a third failover of a null sink. This
>>> will mean that events will always flow through your channel. The file sink
>>> should almost never fail, so you shouldn't be losing events in the process.
>>> And then you can re-process everything in the file if you still want those
>>> events for something.
>>>
>>> For the system of having Flume detect bad events, I think implementing
>>> something like above is better than discarding events that fail X times.
>>> For instance, if you have an Avro sink -> Avro source, and you're
>>> restarting your source, Flume would end up discarding events unnecessarily.
>>> Instead, how about implementing the above system and then go a step
>>> further: Flume will attempt to re-send the bad events itself. And then if a
>>> bad event isn't able to be sent after X attempts, it is can be discarded.
>>>
>>> I envision this system as an extension to the current File Channel; when
>>> an event fails, it is written to a secondary File Channel from which events
>>> can be pulled when the main channel isn't in use. It would add headers like
>>> "lastAttempt" and "numberOfAttempts" to events. Then it can be configurable
>>> for a "min time between attempts" and "maximum attempts." When an event
>>> fails the second time, those headers are updated and it goes back into the
>>> fail-channel. If it comes out of the fail-channel but the lastAttempt is
>>> too recent, it goes back in. If it fails more times than the maximum, it is
>>> written to a final location (perhaps its just sent to another sink; maybe
>>> this would have to be in a sink processor). Assuming all of those steps are
>>> error-free, then all messages are preserved, and the badly-formatted
>>> eventually get stored somewhere else. (This system could be hacked together
>>> with current code - fail over sink processor -> avro sink -> avro source on
>>> same instance, but that's a little too hacky).
>>>
>>> Just some thoughts.
>>>
>>> - Connor
>>>
>>>
>>>
>>>
>>> On Thu, Aug 1, 2013 at 3:25 PM, Arvind Prabhakar <ar...@apache.org>wrote:
>>>
>>>> This sounds like a critical problem that can cause pipelines to block
>>>> permanently. If you find yourself in this situation, a possible work around
>>>> would be to decommission the channel, remove its data and route the flow
>>>> with a new empty channel. If you have the ability to identify which
>>>> component is causing the problem and see if you can remove it temporarily
>>>> to let the problem events pass through another peer component.
>>>>
>>>> I have also created FLUME-2140 [1] which will eventually allow the
>>>> pipelines to identify and divert such bad events. If you have any logs,
>>>> data, configurations that can be shared and will help provide more details
>>>> for this problem, it will be great if you could attach them to this jira
>>>> and provide your comments.
>>>>
>>>> [1] https://issues.apache.org/jira/browse/FLUME-2140
>>>>
>>>> Regards,
>>>> Arvind Prabhakar
>>>>
>>>> On Thu, Aug 1, 2013 at 10:33 AM, Paul Chavez <
>>>> pchavez@verticalsearchworks.com> wrote:
>>>>
>>>>> **
>>>>> There's no way to deal with a bad event once it's in the channel, but
>>>>> you can mitigate future issues by having a timestamp interceptor bound to
>>>>> the source feeding the channel. There is a parameter 'preserve existing'
>>>>> that will only add the header if it doesn't exist. If you don't want to
>>>>> have 'bad' time data in there you could try a static interceptor with a
>>>>> specific past date so that corrupt events fall into a deterministic path in
>>>>> HDFS.
>>>>>
>>>>> I use this technique to prevent stuck events for both timestamp
>>>>> headers as well as some of our own custom headers we use for tokenized
>>>>> paths. The static interceptor will insert an arbitrary header if it doesn't
>>>>> exist so I have a couple that put in the value 'Unknown' so that I can
>>>>> still send the events through the HDFS sink but I can also find them later
>>>>> if need be.
>>>>>
>>>>> hope that helps,
>>>>> Paul Chavez
>>>>>
>>>>>  ------------------------------
>>>>> *From:* Roshan Naik [mailto:roshan@hortonworks.com]
>>>>> *Sent:* Thursday, August 01, 2013 10:27 AM
>>>>> *To:* user@flume.apache.org
>>>>> *Subject:* Re: Problem Events
>>>>>
>>>>>  some questions:
>>>>> - why is the sink unable to consume the event ?
>>>>> - how would you like to identify such an event ? by examining its
>>>>> content ? or by the fact that its ping-pong-ing between channel and sink ?
>>>>> - what would you prefer to do with such an event ? merely drop it ?
>>>>>
>>>>>
>>>>> On Thu, Aug 1, 2013 at 9:26 AM, Jeremy Karlson <
>>>>> jeremykarlson@gmail.com> wrote:
>>>>>
>>>>>>  To my knowledge (which is admittedly limited), there is no way to
>>>>>> deal with these in a way that will make your day.  I'm happy if someone can
>>>>>> say otherwise.
>>>>>>
>>>>>> This is very similar to a problem I had a week or two ago.  I fixed
>>>>>> it by restarting Flume with debugging on, connecting to it with the
>>>>>> debugger, and finding the message in the sink.  Discover a bug in the sink.
>>>>>>  Downloaded Flume, fixed bug, recompiled, installed custom version, etc.
>>>>>>
>>>>>> I agree that this is not a practical solution, and I still believe
>>>>>> that Flume needs some sort of "sink of last resort" option or something,
>>>>>> like JMS implementations.
>>>>>>
>>>>>> -- Jeremy
>>>>>>
>>>>>>
>>>>>>
>>>>>> On Thu, Aug 1, 2013 at 2:42 AM, Anat Rozenzon <an...@viber.com> wrote:
>>>>>>
>>>>>>>  The message is already in the channel.
>>>>>>> Is there a way to write an interceptor to work after the channel? or
>>>>>>> before the sink?
>>>>>>>
>>>>>>> The only thing I found is to stop everything and delete the channel
>>>>>>> files, but I won't be able to use this approach in production :-(
>>>>>>>
>>>>>>>
>>>>>>> On Thu, Aug 1, 2013 at 11:13 AM, Ashish <pa...@gmail.com>wrote:
>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>  On Thu, Aug 1, 2013 at 1:29 PM, Anat Rozenzon <an...@viber.com>wrote:
>>>>>>>>
>>>>>>>>>   Hi,
>>>>>>>>>
>>>>>>>>> I'm having the same problem with HDFS sink.
>>>>>>>>>
>>>>>>>>> A 'poison' message which doesn't have timestamp header in it as
>>>>>>>>> the sink expects.
>>>>>>>>> This causes a NPE which ends in returning the message to the
>>>>>>>>> channel , over and over again.
>>>>>>>>>
>>>>>>>>> Is my only option to re-write the HDFS sink?
>>>>>>>>> Isn't there any way to intercept in the sink work?
>>>>>>>>>
>>>>>>>>
>>>>>>>> You can write a custom interceptor and remove/modify the poison
>>>>>>>> message.
>>>>>>>>
>>>>>>>> Interceptors are called before message makes it way into the
>>>>>>>> channel.
>>>>>>>>
>>>>>>>> http://flume.apache.org/FlumeUserGuide.html#flume-interceptors
>>>>>>>>
>>>>>>>> I wrote a blog about it a while back
>>>>>>>> http://www.ashishpaliwal.com/blog/2013/06/flume-cookbook-implementing-custom-interceptors/
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>>
>>>>>>>>> Thanks
>>>>>>>>> Anat
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> On Fri, Jul 26, 2013 at 3:35 AM, Arvind Prabhakar <
>>>>>>>>> arvind@apache.org> wrote:
>>>>>>>>>
>>>>>>>>>> Sounds like a bug in ElasticSearch sink to me. Do you mind filing
>>>>>>>>>> a Jira to track this? Sample data to cause this would be even better.
>>>>>>>>>>
>>>>>>>>>> Regards,
>>>>>>>>>> Arvind Prabhakar
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> On Thu, Jul 25, 2013 at 9:50 AM, Jeremy Karlson <
>>>>>>>>>> jeremykarlson@gmail.com> wrote:
>>>>>>>>>>
>>>>>>>>>>> This was using the provided ElasticSearch sink.  The logs were
>>>>>>>>>>> not helpful.  I ran it through with the debugger and found the source of
>>>>>>>>>>> the problem.
>>>>>>>>>>>
>>>>>>>>>>> ContentBuilderUtil uses a very "aggressive" method to determine
>>>>>>>>>>> if the content is JSON; if it contains a "{" anywhere in it, it's
>>>>>>>>>>> considered JSON.  My body contained that but wasn't JSON, causing the JSON
>>>>>>>>>>> parser to throw a CharConversionException from addComplexField(...) (but
>>>>>>>>>>> not the expected JSONException).  We've changed addComplexField(...) to
>>>>>>>>>>> catch different types of exceptions and fall back to treating it as a
>>>>>>>>>>> simple field.  We'll probably submit a patch for this soon.
>>>>>>>>>>>
>>>>>>>>>>> I'm reasonably happy with this, but I still think that in the
>>>>>>>>>>> bigger picture there should be some sort of mechanism to automatically
>>>>>>>>>>> detect and toss / skip / flag problematic events without them plugging up
>>>>>>>>>>> the flow.
>>>>>>>>>>>
>>>>>>>>>>> -- Jeremy
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> On Wed, Jul 24, 2013 at 7:51 PM, Arvind Prabhakar <
>>>>>>>>>>> arvind@apache.org> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> Jeremy, would it be possible for you to show us logs for the
>>>>>>>>>>>> part where the sink fails to remove an event from the channel? I am
>>>>>>>>>>>> assuming this is a standard sink that Flume provides and not a custom one.
>>>>>>>>>>>>
>>>>>>>>>>>> The reason I ask is because sinks do not introspect the event,
>>>>>>>>>>>> and hence there is no reason why it will fail during the event's removal.
>>>>>>>>>>>> It is more likely that there is a problem within the channel in that it
>>>>>>>>>>>> cannot dereference the event correctly. Looking at the logs will help us
>>>>>>>>>>>> identify the root cause for what you are experiencing.
>>>>>>>>>>>>
>>>>>>>>>>>> Regards,
>>>>>>>>>>>> Arvind Prabhakar
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> On Wed, Jul 24, 2013 at 3:56 PM, Jeremy Karlson <
>>>>>>>>>>>> jeremykarlson@gmail.com> wrote:
>>>>>>>>>>>>
>>>>>>>>>>>>>  Both reasonable suggestions.  What would a custom sink look
>>>>>>>>>>>>> like in this case, and how would I only eliminate the problem events since
>>>>>>>>>>>>> I don't know what they are until they are attempted by the "real" sink?
>>>>>>>>>>>>>
>>>>>>>>>>>>> My philosophical concern (in general) is that we're taking the
>>>>>>>>>>>>> approach of exhaustively finding and eliminating possible failure cases.
>>>>>>>>>>>>>  It's not possible to eliminate every single failure case, so shouldn't
>>>>>>>>>>>>> there be a method of last resort to eliminate problem events from the
>>>>>>>>>>>>> channel?
>>>>>>>>>>>>>
>>>>>>>>>>>>> -- Jeremy
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>> On Wed, Jul 24, 2013 at 3:45 PM, Hari Shreedharan <
>>>>>>>>>>>>> hshreedharan@cloudera.com> wrote:
>>>>>>>>>>>>>
>>>>>>>>>>>>>> Or you could write a custom sink that removes this event
>>>>>>>>>>>>>> (more work of course)
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Thanks,
>>>>>>>>>>>>>> Hari
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>  On Wednesday, July 24, 2013 at 3:36 PM, Roshan Naik wrote:
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>   if you have a way to identify such events.. you may be
>>>>>>>>>>>>>> able to use the Regex interceptor to toss them out before they get into the
>>>>>>>>>>>>>> channel.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> On Wed, Jul 24, 2013 at 2:52 PM, Jeremy Karlson <
>>>>>>>>>>>>>> jeremykarlson@gmail.com> wrote:
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>  Hi everyone.  My Flume adventures continue.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> I'm in a situation now where I have a channel that's filling
>>>>>>>>>>>>>> because a stubborn message is stuck.  The sink won't accept it (for
>>>>>>>>>>>>>> whatever reason; I can go into detail but that's not my point here).  This
>>>>>>>>>>>>>> just blocks up the channel entirely, because it goes back into the channel
>>>>>>>>>>>>>> when the sink refuses.  Obviously, this isn't ideal.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> I'm wondering what mechanisms, if any, Flume has to deal with
>>>>>>>>>>>>>> these situations.  Things that come to mind might be:
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> 1. Ditch the event after n attempts.
>>>>>>>>>>>>>> 2. After n attempts, send the event to a "problem area"
>>>>>>>>>>>>>> (maybe a different source / sink / channel?)  that someone can look at
>>>>>>>>>>>>>> later.
>>>>>>>>>>>>>> 3. Some sort of mechanism that allows operators to manually
>>>>>>>>>>>>>> kill these messages.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> I'm open to suggestions on alternatives as well.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Thanks.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> -- Jeremy
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> --
>>>>>>>> thanks
>>>>>>>> ashish
>>>>>>>>
>>>>>>>> Blog: http://www.ashishpaliwal.com/blog
>>>>>>>> My Photo Galleries: http://www.pbase.com/ashishpaliwal
>>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>

Re: Problem Events

Posted by Anat Rozenzon <an...@viber.com>.
I think using a fail-over processor is a very good idea, I think I'll use
it as an immediate solution.
For the long run, I would like to see a general solution (not specific to
file channel, in my case it is an HDFS channel), so the suggestion to add
'Poison Message' sink to the sink processor sound good.

Just FYI, my problem is that a log file going through my source did not
have (in all rows) the structure I expected.

Since I used regexp extractor to put timestamp, the 'bad' row didn't match
the regexp and the timestamp was not set, then the HDFS sink throws NPE on
that:
01 Aug 2013 09:36:24,259 ERROR
[SinkRunner-PollingRunner-DefaultSinkProcessor]
(org.apache.flume.sink.hdfs.HDFSEventSink.process:422)  - process failed
java.lang.NullPointerException: Expected timestamp in the Flume event
headers, but it was null
        at
com.google.common.base.Preconditions.checkNotNull(Preconditions.java:204)
        at
org.apache.flume.formatter.output.BucketPath.replaceShorthand(BucketPath.java:200)
        at
org.apache.flume.formatter.output.BucketPath.escapeString(BucketPath.java:396)
        at
org.apache.flume.sink.hdfs.HDFSEventSink.process(HDFSEventSink.java:356)
        at
org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:68)
        at
org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:147)
        at java.lang.Thread.run(Thread.java:722)
01 Aug 2013 09:36:24,262 ERROR
[SinkRunner-PollingRunner-DefaultSinkProcessor]
(org.apache.flume.SinkRunner$PollingRunner.run:160)  - Unable to deliver
event. Exception follows.
org.apache.flume.EventDeliveryException: java.lang.NullPointerException:
Expected timestamp in the Flume event headers, but it was null
        at
org.apache.flume.sink.hdfs.HDFSEventSink.process(HDFSEventSink.java:426)
        at
org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:68)
        at
org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:147)
        at java.lang.Thread.run(Thread.java:722)
Caused by: java.lang.NullPointerException: Expected timestamp in the Flume
event headers, but it was null
        at
com.google.common.base.Preconditions.checkNotNull(Preconditions.java:204)
        at
org.apache.flume.formatter.output.BucketPath.replaceShorthand(BucketPath.java:200)
        at
org.apache.flume.formatter.output.BucketPath.escapeString(BucketPath.java:396)
        at
org.apache.flume.sink.hdfs.HDFSEventSink.process(HDFSEventSink.java:356)
        ... 3 more


I fixed my regexp now, still, I can never be sure all the log files the
system creates will be perfect without any bad lines.


On Sat, Aug 3, 2013 at 9:56 AM, Connor Woodson <cw...@gmail.com>wrote:

> Just some more thoughts. It could be even easier:
>
> The whole set up might be even easier; for the failover sink processor,
> you have those settings "max attempts" and "time between attempts" and it
> will just try one event X times before it gives up and sends it to the next
> sink. The time between events could even backoff if needed.
>
> The advantage of this is that it preserves the ordering of events,
> something which gets completely broken in the previous scenario.
>
> - Connor
>
>
> On Fri, Aug 2, 2013 at 6:27 PM, Connor Woodson <cw...@gmail.com>wrote:
>
>> As another option to solve the problem of having a bad event in a
>> channel: using a fail-over sink processor, log all bad events to a local
>> file. And to be extra cautious, add a third failover of a null sink. This
>> will mean that events will always flow through your channel. The file sink
>> should almost never fail, so you shouldn't be losing events in the process.
>> And then you can re-process everything in the file if you still want those
>> events for something.
>>
>> For the system of having Flume detect bad events, I think implementing
>> something like above is better than discarding events that fail X times.
>> For instance, if you have an Avro sink -> Avro source, and you're
>> restarting your source, Flume would end up discarding events unnecessarily.
>> Instead, how about implementing the above system and then go a step
>> further: Flume will attempt to re-send the bad events itself. And then if a
>> bad event isn't able to be sent after X attempts, it is can be discarded.
>>
>> I envision this system as an extension to the current File Channel; when
>> an event fails, it is written to a secondary File Channel from which events
>> can be pulled when the main channel isn't in use. It would add headers like
>> "lastAttempt" and "numberOfAttempts" to events. Then it can be configurable
>> for a "min time between attempts" and "maximum attempts." When an event
>> fails the second time, those headers are updated and it goes back into the
>> fail-channel. If it comes out of the fail-channel but the lastAttempt is
>> too recent, it goes back in. If it fails more times than the maximum, it is
>> written to a final location (perhaps its just sent to another sink; maybe
>> this would have to be in a sink processor). Assuming all of those steps are
>> error-free, then all messages are preserved, and the badly-formatted
>> eventually get stored somewhere else. (This system could be hacked together
>> with current code - fail over sink processor -> avro sink -> avro source on
>> same instance, but that's a little too hacky).
>>
>> Just some thoughts.
>>
>> - Connor
>>
>>
>>
>>
>> On Thu, Aug 1, 2013 at 3:25 PM, Arvind Prabhakar <ar...@apache.org>wrote:
>>
>>> This sounds like a critical problem that can cause pipelines to block
>>> permanently. If you find yourself in this situation, a possible work around
>>> would be to decommission the channel, remove its data and route the flow
>>> with a new empty channel. If you have the ability to identify which
>>> component is causing the problem and see if you can remove it temporarily
>>> to let the problem events pass through another peer component.
>>>
>>> I have also created FLUME-2140 [1] which will eventually allow the
>>> pipelines to identify and divert such bad events. If you have any logs,
>>> data, configurations that can be shared and will help provide more details
>>> for this problem, it will be great if you could attach them to this jira
>>> and provide your comments.
>>>
>>> [1] https://issues.apache.org/jira/browse/FLUME-2140
>>>
>>> Regards,
>>> Arvind Prabhakar
>>>
>>> On Thu, Aug 1, 2013 at 10:33 AM, Paul Chavez <
>>> pchavez@verticalsearchworks.com> wrote:
>>>
>>>> **
>>>> There's no way to deal with a bad event once it's in the channel, but
>>>> you can mitigate future issues by having a timestamp interceptor bound to
>>>> the source feeding the channel. There is a parameter 'preserve existing'
>>>> that will only add the header if it doesn't exist. If you don't want to
>>>> have 'bad' time data in there you could try a static interceptor with a
>>>> specific past date so that corrupt events fall into a deterministic path in
>>>> HDFS.
>>>>
>>>> I use this technique to prevent stuck events for both timestamp headers
>>>> as well as some of our own custom headers we use for tokenized paths. The
>>>> static interceptor will insert an arbitrary header if it doesn't exist so I
>>>> have a couple that put in the value 'Unknown' so that I can still send the
>>>> events through the HDFS sink but I can also find them later if need be.
>>>>
>>>> hope that helps,
>>>> Paul Chavez
>>>>
>>>>  ------------------------------
>>>> *From:* Roshan Naik [mailto:roshan@hortonworks.com]
>>>> *Sent:* Thursday, August 01, 2013 10:27 AM
>>>> *To:* user@flume.apache.org
>>>> *Subject:* Re: Problem Events
>>>>
>>>>  some questions:
>>>> - why is the sink unable to consume the event ?
>>>> - how would you like to identify such an event ? by examining its
>>>> content ? or by the fact that its ping-pong-ing between channel and sink ?
>>>> - what would you prefer to do with such an event ? merely drop it ?
>>>>
>>>>
>>>> On Thu, Aug 1, 2013 at 9:26 AM, Jeremy Karlson <jeremykarlson@gmail.com
>>>> > wrote:
>>>>
>>>>>  To my knowledge (which is admittedly limited), there is no way to
>>>>> deal with these in a way that will make your day.  I'm happy if someone can
>>>>> say otherwise.
>>>>>
>>>>> This is very similar to a problem I had a week or two ago.  I fixed it
>>>>> by restarting Flume with debugging on, connecting to it with the debugger,
>>>>> and finding the message in the sink.  Discover a bug in the sink.
>>>>>  Downloaded Flume, fixed bug, recompiled, installed custom version, etc.
>>>>>
>>>>> I agree that this is not a practical solution, and I still believe
>>>>> that Flume needs some sort of "sink of last resort" option or something,
>>>>> like JMS implementations.
>>>>>
>>>>> -- Jeremy
>>>>>
>>>>>
>>>>>
>>>>> On Thu, Aug 1, 2013 at 2:42 AM, Anat Rozenzon <an...@viber.com> wrote:
>>>>>
>>>>>>  The message is already in the channel.
>>>>>> Is there a way to write an interceptor to work after the channel? or
>>>>>> before the sink?
>>>>>>
>>>>>> The only thing I found is to stop everything and delete the channel
>>>>>> files, but I won't be able to use this approach in production :-(
>>>>>>
>>>>>>
>>>>>> On Thu, Aug 1, 2013 at 11:13 AM, Ashish <pa...@gmail.com>wrote:
>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>  On Thu, Aug 1, 2013 at 1:29 PM, Anat Rozenzon <an...@viber.com>wrote:
>>>>>>>
>>>>>>>>   Hi,
>>>>>>>>
>>>>>>>> I'm having the same problem with HDFS sink.
>>>>>>>>
>>>>>>>> A 'poison' message which doesn't have timestamp header in it as the
>>>>>>>> sink expects.
>>>>>>>> This causes a NPE which ends in returning the message to the
>>>>>>>> channel , over and over again.
>>>>>>>>
>>>>>>>> Is my only option to re-write the HDFS sink?
>>>>>>>> Isn't there any way to intercept in the sink work?
>>>>>>>>
>>>>>>>
>>>>>>> You can write a custom interceptor and remove/modify the poison
>>>>>>> message.
>>>>>>>
>>>>>>> Interceptors are called before message makes it way into the channel.
>>>>>>>
>>>>>>> http://flume.apache.org/FlumeUserGuide.html#flume-interceptors
>>>>>>>
>>>>>>> I wrote a blog about it a while back
>>>>>>> http://www.ashishpaliwal.com/blog/2013/06/flume-cookbook-implementing-custom-interceptors/
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>>
>>>>>>>> Thanks
>>>>>>>> Anat
>>>>>>>>
>>>>>>>>
>>>>>>>> On Fri, Jul 26, 2013 at 3:35 AM, Arvind Prabhakar <
>>>>>>>> arvind@apache.org> wrote:
>>>>>>>>
>>>>>>>>> Sounds like a bug in ElasticSearch sink to me. Do you mind filing
>>>>>>>>> a Jira to track this? Sample data to cause this would be even better.
>>>>>>>>>
>>>>>>>>> Regards,
>>>>>>>>> Arvind Prabhakar
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> On Thu, Jul 25, 2013 at 9:50 AM, Jeremy Karlson <
>>>>>>>>> jeremykarlson@gmail.com> wrote:
>>>>>>>>>
>>>>>>>>>> This was using the provided ElasticSearch sink.  The logs were
>>>>>>>>>> not helpful.  I ran it through with the debugger and found the source of
>>>>>>>>>> the problem.
>>>>>>>>>>
>>>>>>>>>> ContentBuilderUtil uses a very "aggressive" method to determine
>>>>>>>>>> if the content is JSON; if it contains a "{" anywhere in it, it's
>>>>>>>>>> considered JSON.  My body contained that but wasn't JSON, causing the JSON
>>>>>>>>>> parser to throw a CharConversionException from addComplexField(...) (but
>>>>>>>>>> not the expected JSONException).  We've changed addComplexField(...) to
>>>>>>>>>> catch different types of exceptions and fall back to treating it as a
>>>>>>>>>> simple field.  We'll probably submit a patch for this soon.
>>>>>>>>>>
>>>>>>>>>> I'm reasonably happy with this, but I still think that in the
>>>>>>>>>> bigger picture there should be some sort of mechanism to automatically
>>>>>>>>>> detect and toss / skip / flag problematic events without them plugging up
>>>>>>>>>> the flow.
>>>>>>>>>>
>>>>>>>>>> -- Jeremy
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> On Wed, Jul 24, 2013 at 7:51 PM, Arvind Prabhakar <
>>>>>>>>>> arvind@apache.org> wrote:
>>>>>>>>>>
>>>>>>>>>>> Jeremy, would it be possible for you to show us logs for the
>>>>>>>>>>> part where the sink fails to remove an event from the channel? I am
>>>>>>>>>>> assuming this is a standard sink that Flume provides and not a custom one.
>>>>>>>>>>>
>>>>>>>>>>> The reason I ask is because sinks do not introspect the event,
>>>>>>>>>>> and hence there is no reason why it will fail during the event's removal.
>>>>>>>>>>> It is more likely that there is a problem within the channel in that it
>>>>>>>>>>> cannot dereference the event correctly. Looking at the logs will help us
>>>>>>>>>>> identify the root cause for what you are experiencing.
>>>>>>>>>>>
>>>>>>>>>>> Regards,
>>>>>>>>>>> Arvind Prabhakar
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> On Wed, Jul 24, 2013 at 3:56 PM, Jeremy Karlson <
>>>>>>>>>>> jeremykarlson@gmail.com> wrote:
>>>>>>>>>>>
>>>>>>>>>>>>  Both reasonable suggestions.  What would a custom sink look
>>>>>>>>>>>> like in this case, and how would I only eliminate the problem events since
>>>>>>>>>>>> I don't know what they are until they are attempted by the "real" sink?
>>>>>>>>>>>>
>>>>>>>>>>>> My philosophical concern (in general) is that we're taking the
>>>>>>>>>>>> approach of exhaustively finding and eliminating possible failure cases.
>>>>>>>>>>>>  It's not possible to eliminate every single failure case, so shouldn't
>>>>>>>>>>>> there be a method of last resort to eliminate problem events from the
>>>>>>>>>>>> channel?
>>>>>>>>>>>>
>>>>>>>>>>>> -- Jeremy
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> On Wed, Jul 24, 2013 at 3:45 PM, Hari Shreedharan <
>>>>>>>>>>>> hshreedharan@cloudera.com> wrote:
>>>>>>>>>>>>
>>>>>>>>>>>>> Or you could write a custom sink that removes this event (more
>>>>>>>>>>>>> work of course)
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>> Thanks,
>>>>>>>>>>>>> Hari
>>>>>>>>>>>>>
>>>>>>>>>>>>>  On Wednesday, July 24, 2013 at 3:36 PM, Roshan Naik wrote:
>>>>>>>>>>>>>
>>>>>>>>>>>>>   if you have a way to identify such events.. you may be able
>>>>>>>>>>>>> to use the Regex interceptor to toss them out before they get into the
>>>>>>>>>>>>> channel.
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>> On Wed, Jul 24, 2013 at 2:52 PM, Jeremy Karlson <
>>>>>>>>>>>>> jeremykarlson@gmail.com> wrote:
>>>>>>>>>>>>>
>>>>>>>>>>>>>  Hi everyone.  My Flume adventures continue.
>>>>>>>>>>>>>
>>>>>>>>>>>>> I'm in a situation now where I have a channel that's filling
>>>>>>>>>>>>> because a stubborn message is stuck.  The sink won't accept it (for
>>>>>>>>>>>>> whatever reason; I can go into detail but that's not my point here).  This
>>>>>>>>>>>>> just blocks up the channel entirely, because it goes back into the channel
>>>>>>>>>>>>> when the sink refuses.  Obviously, this isn't ideal.
>>>>>>>>>>>>>
>>>>>>>>>>>>> I'm wondering what mechanisms, if any, Flume has to deal with
>>>>>>>>>>>>> these situations.  Things that come to mind might be:
>>>>>>>>>>>>>
>>>>>>>>>>>>> 1. Ditch the event after n attempts.
>>>>>>>>>>>>> 2. After n attempts, send the event to a "problem area" (maybe
>>>>>>>>>>>>> a different source / sink / channel?)  that someone can look at later.
>>>>>>>>>>>>> 3. Some sort of mechanism that allows operators to manually
>>>>>>>>>>>>> kill these messages.
>>>>>>>>>>>>>
>>>>>>>>>>>>> I'm open to suggestions on alternatives as well.
>>>>>>>>>>>>>
>>>>>>>>>>>>> Thanks.
>>>>>>>>>>>>>
>>>>>>>>>>>>> -- Jeremy
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> --
>>>>>>> thanks
>>>>>>> ashish
>>>>>>>
>>>>>>> Blog: http://www.ashishpaliwal.com/blog
>>>>>>> My Photo Galleries: http://www.pbase.com/ashishpaliwal
>>>>>>>
>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>

Re: Problem Events

Posted by Connor Woodson <cw...@gmail.com>.
Just some more thoughts. It could be even easier:

The whole set up might be even easier; for the failover sink processor, you
have those settings "max attempts" and "time between attempts" and it will
just try one event X times before it gives up and sends it to the next
sink. The time between events could even backoff if needed.

The advantage of this is that it preserves the ordering of events,
something which gets completely broken in the previous scenario.

- Connor


On Fri, Aug 2, 2013 at 6:27 PM, Connor Woodson <cw...@gmail.com>wrote:

> As another option to solve the problem of having a bad event in a channel:
> using a fail-over sink processor, log all bad events to a local file. And
> to be extra cautious, add a third failover of a null sink. This will mean
> that events will always flow through your channel. The file sink should
> almost never fail, so you shouldn't be losing events in the process. And
> then you can re-process everything in the file if you still want those
> events for something.
>
> For the system of having Flume detect bad events, I think implementing
> something like above is better than discarding events that fail X times.
> For instance, if you have an Avro sink -> Avro source, and you're
> restarting your source, Flume would end up discarding events unnecessarily.
> Instead, how about implementing the above system and then go a step
> further: Flume will attempt to re-send the bad events itself. And then if a
> bad event isn't able to be sent after X attempts, it is can be discarded.
>
> I envision this system as an extension to the current File Channel; when
> an event fails, it is written to a secondary File Channel from which events
> can be pulled when the main channel isn't in use. It would add headers like
> "lastAttempt" and "numberOfAttempts" to events. Then it can be configurable
> for a "min time between attempts" and "maximum attempts." When an event
> fails the second time, those headers are updated and it goes back into the
> fail-channel. If it comes out of the fail-channel but the lastAttempt is
> too recent, it goes back in. If it fails more times than the maximum, it is
> written to a final location (perhaps its just sent to another sink; maybe
> this would have to be in a sink processor). Assuming all of those steps are
> error-free, then all messages are preserved, and the badly-formatted
> eventually get stored somewhere else. (This system could be hacked together
> with current code - fail over sink processor -> avro sink -> avro source on
> same instance, but that's a little too hacky).
>
> Just some thoughts.
>
> - Connor
>
>
>
>
> On Thu, Aug 1, 2013 at 3:25 PM, Arvind Prabhakar <ar...@apache.org>wrote:
>
>> This sounds like a critical problem that can cause pipelines to block
>> permanently. If you find yourself in this situation, a possible work around
>> would be to decommission the channel, remove its data and route the flow
>> with a new empty channel. If you have the ability to identify which
>> component is causing the problem and see if you can remove it temporarily
>> to let the problem events pass through another peer component.
>>
>> I have also created FLUME-2140 [1] which will eventually allow the
>> pipelines to identify and divert such bad events. If you have any logs,
>> data, configurations that can be shared and will help provide more details
>> for this problem, it will be great if you could attach them to this jira
>> and provide your comments.
>>
>> [1] https://issues.apache.org/jira/browse/FLUME-2140
>>
>> Regards,
>> Arvind Prabhakar
>>
>> On Thu, Aug 1, 2013 at 10:33 AM, Paul Chavez <
>> pchavez@verticalsearchworks.com> wrote:
>>
>>> **
>>> There's no way to deal with a bad event once it's in the channel, but
>>> you can mitigate future issues by having a timestamp interceptor bound to
>>> the source feeding the channel. There is a parameter 'preserve existing'
>>> that will only add the header if it doesn't exist. If you don't want to
>>> have 'bad' time data in there you could try a static interceptor with a
>>> specific past date so that corrupt events fall into a deterministic path in
>>> HDFS.
>>>
>>> I use this technique to prevent stuck events for both timestamp headers
>>> as well as some of our own custom headers we use for tokenized paths. The
>>> static interceptor will insert an arbitrary header if it doesn't exist so I
>>> have a couple that put in the value 'Unknown' so that I can still send the
>>> events through the HDFS sink but I can also find them later if need be.
>>>
>>> hope that helps,
>>> Paul Chavez
>>>
>>>  ------------------------------
>>> *From:* Roshan Naik [mailto:roshan@hortonworks.com]
>>> *Sent:* Thursday, August 01, 2013 10:27 AM
>>> *To:* user@flume.apache.org
>>> *Subject:* Re: Problem Events
>>>
>>>  some questions:
>>> - why is the sink unable to consume the event ?
>>> - how would you like to identify such an event ? by examining its
>>> content ? or by the fact that its ping-pong-ing between channel and sink ?
>>> - what would you prefer to do with such an event ? merely drop it ?
>>>
>>>
>>> On Thu, Aug 1, 2013 at 9:26 AM, Jeremy Karlson <je...@gmail.com>wrote:
>>>
>>>>  To my knowledge (which is admittedly limited), there is no way to
>>>> deal with these in a way that will make your day.  I'm happy if someone can
>>>> say otherwise.
>>>>
>>>> This is very similar to a problem I had a week or two ago.  I fixed it
>>>> by restarting Flume with debugging on, connecting to it with the debugger,
>>>> and finding the message in the sink.  Discover a bug in the sink.
>>>>  Downloaded Flume, fixed bug, recompiled, installed custom version, etc.
>>>>
>>>> I agree that this is not a practical solution, and I still believe that
>>>> Flume needs some sort of "sink of last resort" option or something, like
>>>> JMS implementations.
>>>>
>>>> -- Jeremy
>>>>
>>>>
>>>>
>>>> On Thu, Aug 1, 2013 at 2:42 AM, Anat Rozenzon <an...@viber.com> wrote:
>>>>
>>>>>  The message is already in the channel.
>>>>> Is there a way to write an interceptor to work after the channel? or
>>>>> before the sink?
>>>>>
>>>>> The only thing I found is to stop everything and delete the channel
>>>>> files, but I won't be able to use this approach in production :-(
>>>>>
>>>>>
>>>>> On Thu, Aug 1, 2013 at 11:13 AM, Ashish <pa...@gmail.com>wrote:
>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>  On Thu, Aug 1, 2013 at 1:29 PM, Anat Rozenzon <an...@viber.com>wrote:
>>>>>>
>>>>>>>   Hi,
>>>>>>>
>>>>>>> I'm having the same problem with HDFS sink.
>>>>>>>
>>>>>>> A 'poison' message which doesn't have timestamp header in it as the
>>>>>>> sink expects.
>>>>>>> This causes a NPE which ends in returning the message to the channel
>>>>>>> , over and over again.
>>>>>>>
>>>>>>> Is my only option to re-write the HDFS sink?
>>>>>>> Isn't there any way to intercept in the sink work?
>>>>>>>
>>>>>>
>>>>>> You can write a custom interceptor and remove/modify the poison
>>>>>> message.
>>>>>>
>>>>>> Interceptors are called before message makes it way into the channel.
>>>>>>
>>>>>> http://flume.apache.org/FlumeUserGuide.html#flume-interceptors
>>>>>>
>>>>>> I wrote a blog about it a while back
>>>>>> http://www.ashishpaliwal.com/blog/2013/06/flume-cookbook-implementing-custom-interceptors/
>>>>>>
>>>>>>
>>>>>>
>>>>>>>
>>>>>>> Thanks
>>>>>>> Anat
>>>>>>>
>>>>>>>
>>>>>>> On Fri, Jul 26, 2013 at 3:35 AM, Arvind Prabhakar <arvind@apache.org
>>>>>>> > wrote:
>>>>>>>
>>>>>>>> Sounds like a bug in ElasticSearch sink to me. Do you mind filing a
>>>>>>>> Jira to track this? Sample data to cause this would be even better.
>>>>>>>>
>>>>>>>> Regards,
>>>>>>>> Arvind Prabhakar
>>>>>>>>
>>>>>>>>
>>>>>>>> On Thu, Jul 25, 2013 at 9:50 AM, Jeremy Karlson <
>>>>>>>> jeremykarlson@gmail.com> wrote:
>>>>>>>>
>>>>>>>>> This was using the provided ElasticSearch sink.  The logs were not
>>>>>>>>> helpful.  I ran it through with the debugger and found the source of the
>>>>>>>>> problem.
>>>>>>>>>
>>>>>>>>> ContentBuilderUtil uses a very "aggressive" method to determine if
>>>>>>>>> the content is JSON; if it contains a "{" anywhere in it, it's considered
>>>>>>>>> JSON.  My body contained that but wasn't JSON, causing the JSON parser to
>>>>>>>>> throw a CharConversionException from addComplexField(...) (but not the
>>>>>>>>> expected JSONException).  We've changed addComplexField(...) to catch
>>>>>>>>> different types of exceptions and fall back to treating it as a simple
>>>>>>>>> field.  We'll probably submit a patch for this soon.
>>>>>>>>>
>>>>>>>>> I'm reasonably happy with this, but I still think that in the
>>>>>>>>> bigger picture there should be some sort of mechanism to automatically
>>>>>>>>> detect and toss / skip / flag problematic events without them plugging up
>>>>>>>>> the flow.
>>>>>>>>>
>>>>>>>>> -- Jeremy
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> On Wed, Jul 24, 2013 at 7:51 PM, Arvind Prabhakar <
>>>>>>>>> arvind@apache.org> wrote:
>>>>>>>>>
>>>>>>>>>> Jeremy, would it be possible for you to show us logs for the part
>>>>>>>>>> where the sink fails to remove an event from the channel? I am assuming
>>>>>>>>>> this is a standard sink that Flume provides and not a custom one.
>>>>>>>>>>
>>>>>>>>>> The reason I ask is because sinks do not introspect the event,
>>>>>>>>>> and hence there is no reason why it will fail during the event's removal.
>>>>>>>>>> It is more likely that there is a problem within the channel in that it
>>>>>>>>>> cannot dereference the event correctly. Looking at the logs will help us
>>>>>>>>>> identify the root cause for what you are experiencing.
>>>>>>>>>>
>>>>>>>>>> Regards,
>>>>>>>>>> Arvind Prabhakar
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> On Wed, Jul 24, 2013 at 3:56 PM, Jeremy Karlson <
>>>>>>>>>> jeremykarlson@gmail.com> wrote:
>>>>>>>>>>
>>>>>>>>>>>  Both reasonable suggestions.  What would a custom sink look
>>>>>>>>>>> like in this case, and how would I only eliminate the problem events since
>>>>>>>>>>> I don't know what they are until they are attempted by the "real" sink?
>>>>>>>>>>>
>>>>>>>>>>> My philosophical concern (in general) is that we're taking the
>>>>>>>>>>> approach of exhaustively finding and eliminating possible failure cases.
>>>>>>>>>>>  It's not possible to eliminate every single failure case, so shouldn't
>>>>>>>>>>> there be a method of last resort to eliminate problem events from the
>>>>>>>>>>> channel?
>>>>>>>>>>>
>>>>>>>>>>> -- Jeremy
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> On Wed, Jul 24, 2013 at 3:45 PM, Hari Shreedharan <
>>>>>>>>>>> hshreedharan@cloudera.com> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> Or you could write a custom sink that removes this event (more
>>>>>>>>>>>> work of course)
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> Thanks,
>>>>>>>>>>>> Hari
>>>>>>>>>>>>
>>>>>>>>>>>>  On Wednesday, July 24, 2013 at 3:36 PM, Roshan Naik wrote:
>>>>>>>>>>>>
>>>>>>>>>>>>   if you have a way to identify such events.. you may be able
>>>>>>>>>>>> to use the Regex interceptor to toss them out before they get into the
>>>>>>>>>>>> channel.
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> On Wed, Jul 24, 2013 at 2:52 PM, Jeremy Karlson <
>>>>>>>>>>>> jeremykarlson@gmail.com> wrote:
>>>>>>>>>>>>
>>>>>>>>>>>>  Hi everyone.  My Flume adventures continue.
>>>>>>>>>>>>
>>>>>>>>>>>> I'm in a situation now where I have a channel that's filling
>>>>>>>>>>>> because a stubborn message is stuck.  The sink won't accept it (for
>>>>>>>>>>>> whatever reason; I can go into detail but that's not my point here).  This
>>>>>>>>>>>> just blocks up the channel entirely, because it goes back into the channel
>>>>>>>>>>>> when the sink refuses.  Obviously, this isn't ideal.
>>>>>>>>>>>>
>>>>>>>>>>>> I'm wondering what mechanisms, if any, Flume has to deal with
>>>>>>>>>>>> these situations.  Things that come to mind might be:
>>>>>>>>>>>>
>>>>>>>>>>>> 1. Ditch the event after n attempts.
>>>>>>>>>>>> 2. After n attempts, send the event to a "problem area" (maybe
>>>>>>>>>>>> a different source / sink / channel?)  that someone can look at later.
>>>>>>>>>>>> 3. Some sort of mechanism that allows operators to manually
>>>>>>>>>>>> kill these messages.
>>>>>>>>>>>>
>>>>>>>>>>>> I'm open to suggestions on alternatives as well.
>>>>>>>>>>>>
>>>>>>>>>>>> Thanks.
>>>>>>>>>>>>
>>>>>>>>>>>> -- Jeremy
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>>
>>>>>> --
>>>>>> thanks
>>>>>> ashish
>>>>>>
>>>>>> Blog: http://www.ashishpaliwal.com/blog
>>>>>> My Photo Galleries: http://www.pbase.com/ashishpaliwal
>>>>>>
>>>>>
>>>>>
>>>>
>>>
>>
>

Re: Problem Events

Posted by Connor Woodson <cw...@gmail.com>.
As another option to solve the problem of having a bad event in a channel:
using a fail-over sink processor, log all bad events to a local file. And
to be extra cautious, add a third failover of a null sink. This will mean
that events will always flow through your channel. The file sink should
almost never fail, so you shouldn't be losing events in the process. And
then you can re-process everything in the file if you still want those
events for something.

For the system of having Flume detect bad events, I think implementing
something like above is better than discarding events that fail X times.
For instance, if you have an Avro sink -> Avro source, and you're
restarting your source, Flume would end up discarding events unnecessarily.
Instead, how about implementing the above system and then go a step
further: Flume will attempt to re-send the bad events itself. And then if a
bad event isn't able to be sent after X attempts, it is can be discarded.

I envision this system as an extension to the current File Channel; when an
event fails, it is written to a secondary File Channel from which events
can be pulled when the main channel isn't in use. It would add headers like
"lastAttempt" and "numberOfAttempts" to events. Then it can be configurable
for a "min time between attempts" and "maximum attempts." When an event
fails the second time, those headers are updated and it goes back into the
fail-channel. If it comes out of the fail-channel but the lastAttempt is
too recent, it goes back in. If it fails more times than the maximum, it is
written to a final location (perhaps its just sent to another sink; maybe
this would have to be in a sink processor). Assuming all of those steps are
error-free, then all messages are preserved, and the badly-formatted
eventually get stored somewhere else. (This system could be hacked together
with current code - fail over sink processor -> avro sink -> avro source on
same instance, but that's a little too hacky).

Just some thoughts.

- Connor




On Thu, Aug 1, 2013 at 3:25 PM, Arvind Prabhakar <ar...@apache.org> wrote:

> This sounds like a critical problem that can cause pipelines to block
> permanently. If you find yourself in this situation, a possible work around
> would be to decommission the channel, remove its data and route the flow
> with a new empty channel. If you have the ability to identify which
> component is causing the problem and see if you can remove it temporarily
> to let the problem events pass through another peer component.
>
> I have also created FLUME-2140 [1] which will eventually allow the
> pipelines to identify and divert such bad events. If you have any logs,
> data, configurations that can be shared and will help provide more details
> for this problem, it will be great if you could attach them to this jira
> and provide your comments.
>
> [1] https://issues.apache.org/jira/browse/FLUME-2140
>
> Regards,
> Arvind Prabhakar
>
> On Thu, Aug 1, 2013 at 10:33 AM, Paul Chavez <
> pchavez@verticalsearchworks.com> wrote:
>
>> **
>> There's no way to deal with a bad event once it's in the channel, but you
>> can mitigate future issues by having a timestamp interceptor bound to the
>> source feeding the channel. There is a parameter 'preserve existing' that
>> will only add the header if it doesn't exist. If you don't want to have
>> 'bad' time data in there you could try a static interceptor with a specific
>> past date so that corrupt events fall into a deterministic path in HDFS.
>>
>> I use this technique to prevent stuck events for both timestamp headers
>> as well as some of our own custom headers we use for tokenized paths. The
>> static interceptor will insert an arbitrary header if it doesn't exist so I
>> have a couple that put in the value 'Unknown' so that I can still send the
>> events through the HDFS sink but I can also find them later if need be.
>>
>> hope that helps,
>> Paul Chavez
>>
>>  ------------------------------
>> *From:* Roshan Naik [mailto:roshan@hortonworks.com]
>> *Sent:* Thursday, August 01, 2013 10:27 AM
>> *To:* user@flume.apache.org
>> *Subject:* Re: Problem Events
>>
>>  some questions:
>> - why is the sink unable to consume the event ?
>> - how would you like to identify such an event ? by examining its content
>> ? or by the fact that its ping-pong-ing between channel and sink ?
>> - what would you prefer to do with such an event ? merely drop it ?
>>
>>
>> On Thu, Aug 1, 2013 at 9:26 AM, Jeremy Karlson <je...@gmail.com>wrote:
>>
>>>  To my knowledge (which is admittedly limited), there is no way to deal
>>> with these in a way that will make your day.  I'm happy if someone can say
>>> otherwise.
>>>
>>> This is very similar to a problem I had a week or two ago.  I fixed it
>>> by restarting Flume with debugging on, connecting to it with the debugger,
>>> and finding the message in the sink.  Discover a bug in the sink.
>>>  Downloaded Flume, fixed bug, recompiled, installed custom version, etc.
>>>
>>> I agree that this is not a practical solution, and I still believe that
>>> Flume needs some sort of "sink of last resort" option or something, like
>>> JMS implementations.
>>>
>>> -- Jeremy
>>>
>>>
>>>
>>> On Thu, Aug 1, 2013 at 2:42 AM, Anat Rozenzon <an...@viber.com> wrote:
>>>
>>>>  The message is already in the channel.
>>>> Is there a way to write an interceptor to work after the channel? or
>>>> before the sink?
>>>>
>>>> The only thing I found is to stop everything and delete the channel
>>>> files, but I won't be able to use this approach in production :-(
>>>>
>>>>
>>>> On Thu, Aug 1, 2013 at 11:13 AM, Ashish <pa...@gmail.com>wrote:
>>>>
>>>>>
>>>>>
>>>>>
>>>>>  On Thu, Aug 1, 2013 at 1:29 PM, Anat Rozenzon <an...@viber.com> wrote:
>>>>>
>>>>>>   Hi,
>>>>>>
>>>>>> I'm having the same problem with HDFS sink.
>>>>>>
>>>>>> A 'poison' message which doesn't have timestamp header in it as the
>>>>>> sink expects.
>>>>>> This causes a NPE which ends in returning the message to the channel
>>>>>> , over and over again.
>>>>>>
>>>>>> Is my only option to re-write the HDFS sink?
>>>>>> Isn't there any way to intercept in the sink work?
>>>>>>
>>>>>
>>>>> You can write a custom interceptor and remove/modify the poison
>>>>> message.
>>>>>
>>>>> Interceptors are called before message makes it way into the channel.
>>>>>
>>>>> http://flume.apache.org/FlumeUserGuide.html#flume-interceptors
>>>>>
>>>>> I wrote a blog about it a while back
>>>>> http://www.ashishpaliwal.com/blog/2013/06/flume-cookbook-implementing-custom-interceptors/
>>>>>
>>>>>
>>>>>
>>>>>>
>>>>>> Thanks
>>>>>> Anat
>>>>>>
>>>>>>
>>>>>> On Fri, Jul 26, 2013 at 3:35 AM, Arvind Prabhakar <ar...@apache.org>wrote:
>>>>>>
>>>>>>> Sounds like a bug in ElasticSearch sink to me. Do you mind filing a
>>>>>>> Jira to track this? Sample data to cause this would be even better.
>>>>>>>
>>>>>>> Regards,
>>>>>>> Arvind Prabhakar
>>>>>>>
>>>>>>>
>>>>>>> On Thu, Jul 25, 2013 at 9:50 AM, Jeremy Karlson <
>>>>>>> jeremykarlson@gmail.com> wrote:
>>>>>>>
>>>>>>>> This was using the provided ElasticSearch sink.  The logs were not
>>>>>>>> helpful.  I ran it through with the debugger and found the source of the
>>>>>>>> problem.
>>>>>>>>
>>>>>>>> ContentBuilderUtil uses a very "aggressive" method to determine if
>>>>>>>> the content is JSON; if it contains a "{" anywhere in it, it's considered
>>>>>>>> JSON.  My body contained that but wasn't JSON, causing the JSON parser to
>>>>>>>> throw a CharConversionException from addComplexField(...) (but not the
>>>>>>>> expected JSONException).  We've changed addComplexField(...) to catch
>>>>>>>> different types of exceptions and fall back to treating it as a simple
>>>>>>>> field.  We'll probably submit a patch for this soon.
>>>>>>>>
>>>>>>>> I'm reasonably happy with this, but I still think that in the
>>>>>>>> bigger picture there should be some sort of mechanism to automatically
>>>>>>>> detect and toss / skip / flag problematic events without them plugging up
>>>>>>>> the flow.
>>>>>>>>
>>>>>>>> -- Jeremy
>>>>>>>>
>>>>>>>>
>>>>>>>> On Wed, Jul 24, 2013 at 7:51 PM, Arvind Prabhakar <
>>>>>>>> arvind@apache.org> wrote:
>>>>>>>>
>>>>>>>>> Jeremy, would it be possible for you to show us logs for the part
>>>>>>>>> where the sink fails to remove an event from the channel? I am assuming
>>>>>>>>> this is a standard sink that Flume provides and not a custom one.
>>>>>>>>>
>>>>>>>>> The reason I ask is because sinks do not introspect the event, and
>>>>>>>>> hence there is no reason why it will fail during the event's removal. It is
>>>>>>>>> more likely that there is a problem within the channel in that it cannot
>>>>>>>>> dereference the event correctly. Looking at the logs will help us identify
>>>>>>>>> the root cause for what you are experiencing.
>>>>>>>>>
>>>>>>>>> Regards,
>>>>>>>>> Arvind Prabhakar
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> On Wed, Jul 24, 2013 at 3:56 PM, Jeremy Karlson <
>>>>>>>>> jeremykarlson@gmail.com> wrote:
>>>>>>>>>
>>>>>>>>>>  Both reasonable suggestions.  What would a custom sink look
>>>>>>>>>> like in this case, and how would I only eliminate the problem events since
>>>>>>>>>> I don't know what they are until they are attempted by the "real" sink?
>>>>>>>>>>
>>>>>>>>>> My philosophical concern (in general) is that we're taking the
>>>>>>>>>> approach of exhaustively finding and eliminating possible failure cases.
>>>>>>>>>>  It's not possible to eliminate every single failure case, so shouldn't
>>>>>>>>>> there be a method of last resort to eliminate problem events from the
>>>>>>>>>> channel?
>>>>>>>>>>
>>>>>>>>>> -- Jeremy
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> On Wed, Jul 24, 2013 at 3:45 PM, Hari Shreedharan <
>>>>>>>>>> hshreedharan@cloudera.com> wrote:
>>>>>>>>>>
>>>>>>>>>>> Or you could write a custom sink that removes this event (more
>>>>>>>>>>> work of course)
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> Thanks,
>>>>>>>>>>> Hari
>>>>>>>>>>>
>>>>>>>>>>>  On Wednesday, July 24, 2013 at 3:36 PM, Roshan Naik wrote:
>>>>>>>>>>>
>>>>>>>>>>>   if you have a way to identify such events.. you may be able
>>>>>>>>>>> to use the Regex interceptor to toss them out before they get into the
>>>>>>>>>>> channel.
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> On Wed, Jul 24, 2013 at 2:52 PM, Jeremy Karlson <
>>>>>>>>>>> jeremykarlson@gmail.com> wrote:
>>>>>>>>>>>
>>>>>>>>>>>  Hi everyone.  My Flume adventures continue.
>>>>>>>>>>>
>>>>>>>>>>> I'm in a situation now where I have a channel that's filling
>>>>>>>>>>> because a stubborn message is stuck.  The sink won't accept it (for
>>>>>>>>>>> whatever reason; I can go into detail but that's not my point here).  This
>>>>>>>>>>> just blocks up the channel entirely, because it goes back into the channel
>>>>>>>>>>> when the sink refuses.  Obviously, this isn't ideal.
>>>>>>>>>>>
>>>>>>>>>>> I'm wondering what mechanisms, if any, Flume has to deal with
>>>>>>>>>>> these situations.  Things that come to mind might be:
>>>>>>>>>>>
>>>>>>>>>>> 1. Ditch the event after n attempts.
>>>>>>>>>>> 2. After n attempts, send the event to a "problem area" (maybe a
>>>>>>>>>>> different source / sink / channel?)  that someone can look at later.
>>>>>>>>>>> 3. Some sort of mechanism that allows operators to manually kill
>>>>>>>>>>> these messages.
>>>>>>>>>>>
>>>>>>>>>>> I'm open to suggestions on alternatives as well.
>>>>>>>>>>>
>>>>>>>>>>> Thanks.
>>>>>>>>>>>
>>>>>>>>>>> -- Jeremy
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>>
>>>>> --
>>>>> thanks
>>>>> ashish
>>>>>
>>>>> Blog: http://www.ashishpaliwal.com/blog
>>>>> My Photo Galleries: http://www.pbase.com/ashishpaliwal
>>>>>
>>>>
>>>>
>>>
>>
>

Re: Problem Events

Posted by Arvind Prabhakar <ar...@apache.org>.
This sounds like a critical problem that can cause pipelines to block
permanently. If you find yourself in this situation, a possible work around
would be to decommission the channel, remove its data and route the flow
with a new empty channel. If you have the ability to identify which
component is causing the problem and see if you can remove it temporarily
to let the problem events pass through another peer component.

I have also created FLUME-2140 [1] which will eventually allow the
pipelines to identify and divert such bad events. If you have any logs,
data, configurations that can be shared and will help provide more details
for this problem, it will be great if you could attach them to this jira
and provide your comments.

[1] https://issues.apache.org/jira/browse/FLUME-2140

Regards,
Arvind Prabhakar

On Thu, Aug 1, 2013 at 10:33 AM, Paul Chavez <
pchavez@verticalsearchworks.com> wrote:

> **
> There's no way to deal with a bad event once it's in the channel, but you
> can mitigate future issues by having a timestamp interceptor bound to the
> source feeding the channel. There is a parameter 'preserve existing' that
> will only add the header if it doesn't exist. If you don't want to have
> 'bad' time data in there you could try a static interceptor with a specific
> past date so that corrupt events fall into a deterministic path in HDFS.
>
> I use this technique to prevent stuck events for both timestamp headers as
> well as some of our own custom headers we use for tokenized paths. The
> static interceptor will insert an arbitrary header if it doesn't exist so I
> have a couple that put in the value 'Unknown' so that I can still send the
> events through the HDFS sink but I can also find them later if need be.
>
> hope that helps,
> Paul Chavez
>
>  ------------------------------
> *From:* Roshan Naik [mailto:roshan@hortonworks.com]
> *Sent:* Thursday, August 01, 2013 10:27 AM
> *To:* user@flume.apache.org
> *Subject:* Re: Problem Events
>
>  some questions:
> - why is the sink unable to consume the event ?
> - how would you like to identify such an event ? by examining its content
> ? or by the fact that its ping-pong-ing between channel and sink ?
> - what would you prefer to do with such an event ? merely drop it ?
>
>
> On Thu, Aug 1, 2013 at 9:26 AM, Jeremy Karlson <je...@gmail.com>wrote:
>
>>  To my knowledge (which is admittedly limited), there is no way to deal
>> with these in a way that will make your day.  I'm happy if someone can say
>> otherwise.
>>
>> This is very similar to a problem I had a week or two ago.  I fixed it by
>> restarting Flume with debugging on, connecting to it with the debugger, and
>> finding the message in the sink.  Discover a bug in the sink.  Downloaded
>> Flume, fixed bug, recompiled, installed custom version, etc.
>>
>> I agree that this is not a practical solution, and I still believe that
>> Flume needs some sort of "sink of last resort" option or something, like
>> JMS implementations.
>>
>> -- Jeremy
>>
>>
>>
>> On Thu, Aug 1, 2013 at 2:42 AM, Anat Rozenzon <an...@viber.com> wrote:
>>
>>>  The message is already in the channel.
>>> Is there a way to write an interceptor to work after the channel? or
>>> before the sink?
>>>
>>> The only thing I found is to stop everything and delete the channel
>>> files, but I won't be able to use this approach in production :-(
>>>
>>>
>>> On Thu, Aug 1, 2013 at 11:13 AM, Ashish <pa...@gmail.com> wrote:
>>>
>>>>
>>>>
>>>>
>>>>  On Thu, Aug 1, 2013 at 1:29 PM, Anat Rozenzon <an...@viber.com> wrote:
>>>>
>>>>>   Hi,
>>>>>
>>>>> I'm having the same problem with HDFS sink.
>>>>>
>>>>> A 'poison' message which doesn't have timestamp header in it as the
>>>>> sink expects.
>>>>> This causes a NPE which ends in returning the message to the channel ,
>>>>> over and over again.
>>>>>
>>>>> Is my only option to re-write the HDFS sink?
>>>>> Isn't there any way to intercept in the sink work?
>>>>>
>>>>
>>>> You can write a custom interceptor and remove/modify the poison message.
>>>>
>>>> Interceptors are called before message makes it way into the channel.
>>>>
>>>> http://flume.apache.org/FlumeUserGuide.html#flume-interceptors
>>>>
>>>> I wrote a blog about it a while back
>>>> http://www.ashishpaliwal.com/blog/2013/06/flume-cookbook-implementing-custom-interceptors/
>>>>
>>>>
>>>>
>>>>>
>>>>> Thanks
>>>>> Anat
>>>>>
>>>>>
>>>>> On Fri, Jul 26, 2013 at 3:35 AM, Arvind Prabhakar <ar...@apache.org>wrote:
>>>>>
>>>>>> Sounds like a bug in ElasticSearch sink to me. Do you mind filing a
>>>>>> Jira to track this? Sample data to cause this would be even better.
>>>>>>
>>>>>> Regards,
>>>>>> Arvind Prabhakar
>>>>>>
>>>>>>
>>>>>> On Thu, Jul 25, 2013 at 9:50 AM, Jeremy Karlson <
>>>>>> jeremykarlson@gmail.com> wrote:
>>>>>>
>>>>>>> This was using the provided ElasticSearch sink.  The logs were not
>>>>>>> helpful.  I ran it through with the debugger and found the source of the
>>>>>>> problem.
>>>>>>>
>>>>>>> ContentBuilderUtil uses a very "aggressive" method to determine if
>>>>>>> the content is JSON; if it contains a "{" anywhere in it, it's considered
>>>>>>> JSON.  My body contained that but wasn't JSON, causing the JSON parser to
>>>>>>> throw a CharConversionException from addComplexField(...) (but not the
>>>>>>> expected JSONException).  We've changed addComplexField(...) to catch
>>>>>>> different types of exceptions and fall back to treating it as a simple
>>>>>>> field.  We'll probably submit a patch for this soon.
>>>>>>>
>>>>>>> I'm reasonably happy with this, but I still think that in the bigger
>>>>>>> picture there should be some sort of mechanism to automatically detect and
>>>>>>> toss / skip / flag problematic events without them plugging up the flow.
>>>>>>>
>>>>>>> -- Jeremy
>>>>>>>
>>>>>>>
>>>>>>> On Wed, Jul 24, 2013 at 7:51 PM, Arvind Prabhakar <arvind@apache.org
>>>>>>> > wrote:
>>>>>>>
>>>>>>>> Jeremy, would it be possible for you to show us logs for the part
>>>>>>>> where the sink fails to remove an event from the channel? I am assuming
>>>>>>>> this is a standard sink that Flume provides and not a custom one.
>>>>>>>>
>>>>>>>> The reason I ask is because sinks do not introspect the event, and
>>>>>>>> hence there is no reason why it will fail during the event's removal. It is
>>>>>>>> more likely that there is a problem within the channel in that it cannot
>>>>>>>> dereference the event correctly. Looking at the logs will help us identify
>>>>>>>> the root cause for what you are experiencing.
>>>>>>>>
>>>>>>>> Regards,
>>>>>>>> Arvind Prabhakar
>>>>>>>>
>>>>>>>>
>>>>>>>> On Wed, Jul 24, 2013 at 3:56 PM, Jeremy Karlson <
>>>>>>>> jeremykarlson@gmail.com> wrote:
>>>>>>>>
>>>>>>>>>  Both reasonable suggestions.  What would a custom sink look like
>>>>>>>>> in this case, and how would I only eliminate the problem events since I
>>>>>>>>> don't know what they are until they are attempted by the "real" sink?
>>>>>>>>>
>>>>>>>>> My philosophical concern (in general) is that we're taking the
>>>>>>>>> approach of exhaustively finding and eliminating possible failure cases.
>>>>>>>>>  It's not possible to eliminate every single failure case, so shouldn't
>>>>>>>>> there be a method of last resort to eliminate problem events from the
>>>>>>>>> channel?
>>>>>>>>>
>>>>>>>>> -- Jeremy
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> On Wed, Jul 24, 2013 at 3:45 PM, Hari Shreedharan <
>>>>>>>>> hshreedharan@cloudera.com> wrote:
>>>>>>>>>
>>>>>>>>>> Or you could write a custom sink that removes this event (more
>>>>>>>>>> work of course)
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> Thanks,
>>>>>>>>>> Hari
>>>>>>>>>>
>>>>>>>>>>  On Wednesday, July 24, 2013 at 3:36 PM, Roshan Naik wrote:
>>>>>>>>>>
>>>>>>>>>>   if you have a way to identify such events.. you may be able to
>>>>>>>>>> use the Regex interceptor to toss them out before they get into the channel.
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> On Wed, Jul 24, 2013 at 2:52 PM, Jeremy Karlson <
>>>>>>>>>> jeremykarlson@gmail.com> wrote:
>>>>>>>>>>
>>>>>>>>>>  Hi everyone.  My Flume adventures continue.
>>>>>>>>>>
>>>>>>>>>> I'm in a situation now where I have a channel that's filling
>>>>>>>>>> because a stubborn message is stuck.  The sink won't accept it (for
>>>>>>>>>> whatever reason; I can go into detail but that's not my point here).  This
>>>>>>>>>> just blocks up the channel entirely, because it goes back into the channel
>>>>>>>>>> when the sink refuses.  Obviously, this isn't ideal.
>>>>>>>>>>
>>>>>>>>>> I'm wondering what mechanisms, if any, Flume has to deal with
>>>>>>>>>> these situations.  Things that come to mind might be:
>>>>>>>>>>
>>>>>>>>>> 1. Ditch the event after n attempts.
>>>>>>>>>> 2. After n attempts, send the event to a "problem area" (maybe a
>>>>>>>>>> different source / sink / channel?)  that someone can look at later.
>>>>>>>>>> 3. Some sort of mechanism that allows operators to manually kill
>>>>>>>>>> these messages.
>>>>>>>>>>
>>>>>>>>>> I'm open to suggestions on alternatives as well.
>>>>>>>>>>
>>>>>>>>>> Thanks.
>>>>>>>>>>
>>>>>>>>>> -- Jeremy
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>>
>>>> --
>>>> thanks
>>>> ashish
>>>>
>>>> Blog: http://www.ashishpaliwal.com/blog
>>>> My Photo Galleries: http://www.pbase.com/ashishpaliwal
>>>>
>>>
>>>
>>
>

RE: Problem Events

Posted by Paul Chavez <pc...@verticalsearchworks.com>.
There's no way to deal with a bad event once it's in the channel, but you can mitigate future issues by having a timestamp interceptor bound to the source feeding the channel. There is a parameter 'preserve existing' that will only add the header if it doesn't exist. If you don't want to have 'bad' time data in there you could try a static interceptor with a specific past date so that corrupt events fall into a deterministic path in HDFS.

I use this technique to prevent stuck events for both timestamp headers as well as some of our own custom headers we use for tokenized paths. The static interceptor will insert an arbitrary header if it doesn't exist so I have a couple that put in the value 'Unknown' so that I can still send the events through the HDFS sink but I can also find them later if need be.

hope that helps,
Paul Chavez

________________________________
From: Roshan Naik [mailto:roshan@hortonworks.com]
Sent: Thursday, August 01, 2013 10:27 AM
To: user@flume.apache.org
Subject: Re: Problem Events

some questions:
- why is the sink unable to consume the event ?
- how would you like to identify such an event ? by examining its content ? or by the fact that its ping-pong-ing between channel and sink ?
- what would you prefer to do with such an event ? merely drop it ?


On Thu, Aug 1, 2013 at 9:26 AM, Jeremy Karlson <je...@gmail.com>> wrote:
To my knowledge (which is admittedly limited), there is no way to deal with these in a way that will make your day.  I'm happy if someone can say otherwise.

This is very similar to a problem I had a week or two ago.  I fixed it by restarting Flume with debugging on, connecting to it with the debugger, and finding the message in the sink.  Discover a bug in the sink.  Downloaded Flume, fixed bug, recompiled, installed custom version, etc.

I agree that this is not a practical solution, and I still believe that Flume needs some sort of "sink of last resort" option or something, like JMS implementations.

-- Jeremy



On Thu, Aug 1, 2013 at 2:42 AM, Anat Rozenzon <an...@viber.com>> wrote:
The message is already in the channel.
Is there a way to write an interceptor to work after the channel? or before the sink?

The only thing I found is to stop everything and delete the channel files, but I won't be able to use this approach in production :-(


On Thu, Aug 1, 2013 at 11:13 AM, Ashish <pa...@gmail.com>> wrote:



On Thu, Aug 1, 2013 at 1:29 PM, Anat Rozenzon <an...@viber.com>> wrote:
Hi,

I'm having the same problem with HDFS sink.

A 'poison' message which doesn't have timestamp header in it as the sink expects.
This causes a NPE which ends in returning the message to the channel , over and over again.

Is my only option to re-write the HDFS sink?
Isn't there any way to intercept in the sink work?

You can write a custom interceptor and remove/modify the poison message.

Interceptors are called before message makes it way into the channel.

http://flume.apache.org/FlumeUserGuide.html#flume-interceptors

I wrote a blog about it a while back http://www.ashishpaliwal.com/blog/2013/06/flume-cookbook-implementing-custom-interceptors/



Thanks
Anat


On Fri, Jul 26, 2013 at 3:35 AM, Arvind Prabhakar <ar...@apache.org>> wrote:
Sounds like a bug in ElasticSearch sink to me. Do you mind filing a Jira to track this? Sample data to cause this would be even better.

Regards,
Arvind Prabhakar


On Thu, Jul 25, 2013 at 9:50 AM, Jeremy Karlson <je...@gmail.com>> wrote:
This was using the provided ElasticSearch sink.  The logs were not helpful.  I ran it through with the debugger and found the source of the problem.

ContentBuilderUtil uses a very "aggressive" method to determine if the content is JSON; if it contains a "{" anywhere in it, it's considered JSON.  My body contained that but wasn't JSON, causing the JSON parser to throw a CharConversionException from addComplexField(...) (but not the expected JSONException).  We've changed addComplexField(...) to catch different types of exceptions and fall back to treating it as a simple field.  We'll probably submit a patch for this soon.

I'm reasonably happy with this, but I still think that in the bigger picture there should be some sort of mechanism to automatically detect and toss / skip / flag problematic events without them plugging up the flow.

-- Jeremy


On Wed, Jul 24, 2013 at 7:51 PM, Arvind Prabhakar <ar...@apache.org>> wrote:
Jeremy, would it be possible for you to show us logs for the part where the sink fails to remove an event from the channel? I am assuming this is a standard sink that Flume provides and not a custom one.

The reason I ask is because sinks do not introspect the event, and hence there is no reason why it will fail during the event's removal. It is more likely that there is a problem within the channel in that it cannot dereference the event correctly. Looking at the logs will help us identify the root cause for what you are experiencing.

Regards,
Arvind Prabhakar


On Wed, Jul 24, 2013 at 3:56 PM, Jeremy Karlson <je...@gmail.com>> wrote:
Both reasonable suggestions.  What would a custom sink look like in this case, and how would I only eliminate the problem events since I don't know what they are until they are attempted by the "real" sink?

My philosophical concern (in general) is that we're taking the approach of exhaustively finding and eliminating possible failure cases.  It's not possible to eliminate every single failure case, so shouldn't there be a method of last resort to eliminate problem events from the channel?

-- Jeremy



On Wed, Jul 24, 2013 at 3:45 PM, Hari Shreedharan <hs...@cloudera.com>> wrote:
Or you could write a custom sink that removes this event (more work of course)


Thanks,
Hari


On Wednesday, July 24, 2013 at 3:36 PM, Roshan Naik wrote:

if you have a way to identify such events.. you may be able to use the Regex interceptor to toss them out before they get into the channel.


On Wed, Jul 24, 2013 at 2:52 PM, Jeremy Karlson <je...@gmail.com>> wrote:
Hi everyone.  My Flume adventures continue.

I'm in a situation now where I have a channel that's filling because a stubborn message is stuck.  The sink won't accept it (for whatever reason; I can go into detail but that's not my point here).  This just blocks up the channel entirely, because it goes back into the channel when the sink refuses.  Obviously, this isn't ideal.

I'm wondering what mechanisms, if any, Flume has to deal with these situations.  Things that come to mind might be:

1. Ditch the event after n attempts.
2. After n attempts, send the event to a "problem area" (maybe a different source / sink / channel?)  that someone can look at later.
3. Some sort of mechanism that allows operators to manually kill these messages.

I'm open to suggestions on alternatives as well.

Thanks.

-- Jeremy










--
thanks
ashish

Blog: http://www.ashishpaliwal.com/blog
My Photo Galleries: http://www.pbase.com/ashishpaliwal




Re: Problem Events

Posted by Roshan Naik <ro...@hortonworks.com>.
some questions:
- why is the sink unable to consume the event ?
- how would you like to identify such an event ? by examining its content ?
or by the fact that its ping-pong-ing between channel and sink ?
- what would you prefer to do with such an event ? merely drop it ?


On Thu, Aug 1, 2013 at 9:26 AM, Jeremy Karlson <je...@gmail.com>wrote:

> To my knowledge (which is admittedly limited), there is no way to deal
> with these in a way that will make your day.  I'm happy if someone can say
> otherwise.
>
> This is very similar to a problem I had a week or two ago.  I fixed it by
> restarting Flume with debugging on, connecting to it with the debugger, and
> finding the message in the sink.  Discover a bug in the sink.  Downloaded
> Flume, fixed bug, recompiled, installed custom version, etc.
>
> I agree that this is not a practical solution, and I still believe that
> Flume needs some sort of "sink of last resort" option or something, like
> JMS implementations.
>
> -- Jeremy
>
>
>
> On Thu, Aug 1, 2013 at 2:42 AM, Anat Rozenzon <an...@viber.com> wrote:
>
>> The message is already in the channel.
>> Is there a way to write an interceptor to work after the channel? or
>> before the sink?
>>
>> The only thing I found is to stop everything and delete the channel
>> files, but I won't be able to use this approach in production :-(
>>
>>
>> On Thu, Aug 1, 2013 at 11:13 AM, Ashish <pa...@gmail.com> wrote:
>>
>>>
>>>
>>>
>>> On Thu, Aug 1, 2013 at 1:29 PM, Anat Rozenzon <an...@viber.com> wrote:
>>>
>>>> Hi,
>>>>
>>>> I'm having the same problem with HDFS sink.
>>>>
>>>> A 'poison' message which doesn't have timestamp header in it as the
>>>> sink expects.
>>>> This causes a NPE which ends in returning the message to the channel ,
>>>> over and over again.
>>>>
>>>> Is my only option to re-write the HDFS sink?
>>>> Isn't there any way to intercept in the sink work?
>>>>
>>>
>>> You can write a custom interceptor and remove/modify the poison message.
>>>
>>> Interceptors are called before message makes it way into the channel.
>>>
>>> http://flume.apache.org/FlumeUserGuide.html#flume-interceptors
>>>
>>> I wrote a blog about it a while back
>>> http://www.ashishpaliwal.com/blog/2013/06/flume-cookbook-implementing-custom-interceptors/
>>>
>>>
>>>
>>>>
>>>> Thanks
>>>> Anat
>>>>
>>>>
>>>> On Fri, Jul 26, 2013 at 3:35 AM, Arvind Prabhakar <ar...@apache.org>wrote:
>>>>
>>>>> Sounds like a bug in ElasticSearch sink to me. Do you mind filing a
>>>>> Jira to track this? Sample data to cause this would be even better.
>>>>>
>>>>> Regards,
>>>>> Arvind Prabhakar
>>>>>
>>>>>
>>>>> On Thu, Jul 25, 2013 at 9:50 AM, Jeremy Karlson <
>>>>> jeremykarlson@gmail.com> wrote:
>>>>>
>>>>>> This was using the provided ElasticSearch sink.  The logs were not
>>>>>> helpful.  I ran it through with the debugger and found the source of the
>>>>>> problem.
>>>>>>
>>>>>> ContentBuilderUtil uses a very "aggressive" method to determine if
>>>>>> the content is JSON; if it contains a "{" anywhere in it, it's considered
>>>>>> JSON.  My body contained that but wasn't JSON, causing the JSON parser to
>>>>>> throw a CharConversionException from addComplexField(...) (but not the
>>>>>> expected JSONException).  We've changed addComplexField(...) to catch
>>>>>> different types of exceptions and fall back to treating it as a simple
>>>>>> field.  We'll probably submit a patch for this soon.
>>>>>>
>>>>>> I'm reasonably happy with this, but I still think that in the bigger
>>>>>> picture there should be some sort of mechanism to automatically detect and
>>>>>> toss / skip / flag problematic events without them plugging up the flow.
>>>>>>
>>>>>> -- Jeremy
>>>>>>
>>>>>>
>>>>>> On Wed, Jul 24, 2013 at 7:51 PM, Arvind Prabhakar <ar...@apache.org>wrote:
>>>>>>
>>>>>>> Jeremy, would it be possible for you to show us logs for the part
>>>>>>> where the sink fails to remove an event from the channel? I am assuming
>>>>>>> this is a standard sink that Flume provides and not a custom one.
>>>>>>>
>>>>>>> The reason I ask is because sinks do not introspect the event, and
>>>>>>> hence there is no reason why it will fail during the event's removal. It is
>>>>>>> more likely that there is a problem within the channel in that it cannot
>>>>>>> dereference the event correctly. Looking at the logs will help us identify
>>>>>>> the root cause for what you are experiencing.
>>>>>>>
>>>>>>> Regards,
>>>>>>> Arvind Prabhakar
>>>>>>>
>>>>>>>
>>>>>>> On Wed, Jul 24, 2013 at 3:56 PM, Jeremy Karlson <
>>>>>>> jeremykarlson@gmail.com> wrote:
>>>>>>>
>>>>>>>> Both reasonable suggestions.  What would a custom sink look like in
>>>>>>>> this case, and how would I only eliminate the problem events since I don't
>>>>>>>> know what they are until they are attempted by the "real" sink?
>>>>>>>>
>>>>>>>> My philosophical concern (in general) is that we're taking the
>>>>>>>> approach of exhaustively finding and eliminating possible failure cases.
>>>>>>>>  It's not possible to eliminate every single failure case, so shouldn't
>>>>>>>> there be a method of last resort to eliminate problem events from the
>>>>>>>> channel?
>>>>>>>>
>>>>>>>> -- Jeremy
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> On Wed, Jul 24, 2013 at 3:45 PM, Hari Shreedharan <
>>>>>>>> hshreedharan@cloudera.com> wrote:
>>>>>>>>
>>>>>>>>> Or you could write a custom sink that removes this event (more
>>>>>>>>> work of course)
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> Thanks,
>>>>>>>>> Hari
>>>>>>>>>
>>>>>>>>> On Wednesday, July 24, 2013 at 3:36 PM, Roshan Naik wrote:
>>>>>>>>>
>>>>>>>>> if you have a way to identify such events.. you may be able to use
>>>>>>>>> the Regex interceptor to toss them out before they get into the channel.
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>  On Wed, Jul 24, 2013 at 2:52 PM, Jeremy Karlson <
>>>>>>>>> jeremykarlson@gmail.com> wrote:
>>>>>>>>>
>>>>>>>>> Hi everyone.  My Flume adventures continue.
>>>>>>>>>
>>>>>>>>> I'm in a situation now where I have a channel that's filling
>>>>>>>>> because a stubborn message is stuck.  The sink won't accept it (for
>>>>>>>>> whatever reason; I can go into detail but that's not my point here).  This
>>>>>>>>> just blocks up the channel entirely, because it goes back into the channel
>>>>>>>>> when the sink refuses.  Obviously, this isn't ideal.
>>>>>>>>>
>>>>>>>>> I'm wondering what mechanisms, if any, Flume has to deal with
>>>>>>>>> these situations.  Things that come to mind might be:
>>>>>>>>>
>>>>>>>>> 1. Ditch the event after n attempts.
>>>>>>>>> 2. After n attempts, send the event to a "problem area" (maybe a
>>>>>>>>> different source / sink / channel?)  that someone can look at later.
>>>>>>>>> 3. Some sort of mechanism that allows operators to manually kill
>>>>>>>>> these messages.
>>>>>>>>>
>>>>>>>>> I'm open to suggestions on alternatives as well.
>>>>>>>>>
>>>>>>>>> Thanks.
>>>>>>>>>
>>>>>>>>> -- Jeremy
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>>
>>> --
>>> thanks
>>> ashish
>>>
>>> Blog: http://www.ashishpaliwal.com/blog
>>> My Photo Galleries: http://www.pbase.com/ashishpaliwal
>>>
>>
>>
>

Re: Problem Events

Posted by Jeremy Karlson <je...@gmail.com>.
To my knowledge (which is admittedly limited), there is no way to deal with
these in a way that will make your day.  I'm happy if someone can say
otherwise.

This is very similar to a problem I had a week or two ago.  I fixed it by
restarting Flume with debugging on, connecting to it with the debugger, and
finding the message in the sink.  Discover a bug in the sink.  Downloaded
Flume, fixed bug, recompiled, installed custom version, etc.

I agree that this is not a practical solution, and I still believe that
Flume needs some sort of "sink of last resort" option or something, like
JMS implementations.

-- Jeremy



On Thu, Aug 1, 2013 at 2:42 AM, Anat Rozenzon <an...@viber.com> wrote:

> The message is already in the channel.
> Is there a way to write an interceptor to work after the channel? or
> before the sink?
>
> The only thing I found is to stop everything and delete the channel files,
> but I won't be able to use this approach in production :-(
>
>
> On Thu, Aug 1, 2013 at 11:13 AM, Ashish <pa...@gmail.com> wrote:
>
>>
>>
>>
>> On Thu, Aug 1, 2013 at 1:29 PM, Anat Rozenzon <an...@viber.com> wrote:
>>
>>> Hi,
>>>
>>> I'm having the same problem with HDFS sink.
>>>
>>> A 'poison' message which doesn't have timestamp header in it as the sink
>>> expects.
>>> This causes a NPE which ends in returning the message to the channel ,
>>> over and over again.
>>>
>>> Is my only option to re-write the HDFS sink?
>>> Isn't there any way to intercept in the sink work?
>>>
>>
>> You can write a custom interceptor and remove/modify the poison message.
>>
>> Interceptors are called before message makes it way into the channel.
>>
>> http://flume.apache.org/FlumeUserGuide.html#flume-interceptors
>>
>> I wrote a blog about it a while back
>> http://www.ashishpaliwal.com/blog/2013/06/flume-cookbook-implementing-custom-interceptors/
>>
>>
>>
>>>
>>> Thanks
>>> Anat
>>>
>>>
>>> On Fri, Jul 26, 2013 at 3:35 AM, Arvind Prabhakar <ar...@apache.org>wrote:
>>>
>>>> Sounds like a bug in ElasticSearch sink to me. Do you mind filing a
>>>> Jira to track this? Sample data to cause this would be even better.
>>>>
>>>> Regards,
>>>> Arvind Prabhakar
>>>>
>>>>
>>>> On Thu, Jul 25, 2013 at 9:50 AM, Jeremy Karlson <
>>>> jeremykarlson@gmail.com> wrote:
>>>>
>>>>> This was using the provided ElasticSearch sink.  The logs were not
>>>>> helpful.  I ran it through with the debugger and found the source of the
>>>>> problem.
>>>>>
>>>>> ContentBuilderUtil uses a very "aggressive" method to determine if the
>>>>> content is JSON; if it contains a "{" anywhere in it, it's considered JSON.
>>>>>  My body contained that but wasn't JSON, causing the JSON parser to throw a
>>>>> CharConversionException from addComplexField(...) (but not the expected
>>>>> JSONException).  We've changed addComplexField(...) to catch different
>>>>> types of exceptions and fall back to treating it as a simple field.  We'll
>>>>> probably submit a patch for this soon.
>>>>>
>>>>> I'm reasonably happy with this, but I still think that in the bigger
>>>>> picture there should be some sort of mechanism to automatically detect and
>>>>> toss / skip / flag problematic events without them plugging up the flow.
>>>>>
>>>>> -- Jeremy
>>>>>
>>>>>
>>>>> On Wed, Jul 24, 2013 at 7:51 PM, Arvind Prabhakar <ar...@apache.org>wrote:
>>>>>
>>>>>> Jeremy, would it be possible for you to show us logs for the part
>>>>>> where the sink fails to remove an event from the channel? I am assuming
>>>>>> this is a standard sink that Flume provides and not a custom one.
>>>>>>
>>>>>> The reason I ask is because sinks do not introspect the event, and
>>>>>> hence there is no reason why it will fail during the event's removal. It is
>>>>>> more likely that there is a problem within the channel in that it cannot
>>>>>> dereference the event correctly. Looking at the logs will help us identify
>>>>>> the root cause for what you are experiencing.
>>>>>>
>>>>>> Regards,
>>>>>> Arvind Prabhakar
>>>>>>
>>>>>>
>>>>>> On Wed, Jul 24, 2013 at 3:56 PM, Jeremy Karlson <
>>>>>> jeremykarlson@gmail.com> wrote:
>>>>>>
>>>>>>> Both reasonable suggestions.  What would a custom sink look like in
>>>>>>> this case, and how would I only eliminate the problem events since I don't
>>>>>>> know what they are until they are attempted by the "real" sink?
>>>>>>>
>>>>>>> My philosophical concern (in general) is that we're taking the
>>>>>>> approach of exhaustively finding and eliminating possible failure cases.
>>>>>>>  It's not possible to eliminate every single failure case, so shouldn't
>>>>>>> there be a method of last resort to eliminate problem events from the
>>>>>>> channel?
>>>>>>>
>>>>>>> -- Jeremy
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> On Wed, Jul 24, 2013 at 3:45 PM, Hari Shreedharan <
>>>>>>> hshreedharan@cloudera.com> wrote:
>>>>>>>
>>>>>>>> Or you could write a custom sink that removes this event (more work
>>>>>>>> of course)
>>>>>>>>
>>>>>>>>
>>>>>>>> Thanks,
>>>>>>>> Hari
>>>>>>>>
>>>>>>>> On Wednesday, July 24, 2013 at 3:36 PM, Roshan Naik wrote:
>>>>>>>>
>>>>>>>> if you have a way to identify such events.. you may be able to use
>>>>>>>> the Regex interceptor to toss them out before they get into the channel.
>>>>>>>>
>>>>>>>>
>>>>>>>>  On Wed, Jul 24, 2013 at 2:52 PM, Jeremy Karlson <
>>>>>>>> jeremykarlson@gmail.com> wrote:
>>>>>>>>
>>>>>>>> Hi everyone.  My Flume adventures continue.
>>>>>>>>
>>>>>>>> I'm in a situation now where I have a channel that's filling
>>>>>>>> because a stubborn message is stuck.  The sink won't accept it (for
>>>>>>>> whatever reason; I can go into detail but that's not my point here).  This
>>>>>>>> just blocks up the channel entirely, because it goes back into the channel
>>>>>>>> when the sink refuses.  Obviously, this isn't ideal.
>>>>>>>>
>>>>>>>> I'm wondering what mechanisms, if any, Flume has to deal with these
>>>>>>>> situations.  Things that come to mind might be:
>>>>>>>>
>>>>>>>> 1. Ditch the event after n attempts.
>>>>>>>> 2. After n attempts, send the event to a "problem area" (maybe a
>>>>>>>> different source / sink / channel?)  that someone can look at later.
>>>>>>>> 3. Some sort of mechanism that allows operators to manually kill
>>>>>>>> these messages.
>>>>>>>>
>>>>>>>> I'm open to suggestions on alternatives as well.
>>>>>>>>
>>>>>>>> Thanks.
>>>>>>>>
>>>>>>>> -- Jeremy
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>>
>> --
>> thanks
>> ashish
>>
>> Blog: http://www.ashishpaliwal.com/blog
>> My Photo Galleries: http://www.pbase.com/ashishpaliwal
>>
>
>

Re: Problem Events

Posted by Anat Rozenzon <an...@viber.com>.
The message is already in the channel.
Is there a way to write an interceptor to work after the channel? or before
the sink?

The only thing I found is to stop everything and delete the channel files,
but I won't be able to use this approach in production :-(


On Thu, Aug 1, 2013 at 11:13 AM, Ashish <pa...@gmail.com> wrote:

>
>
>
> On Thu, Aug 1, 2013 at 1:29 PM, Anat Rozenzon <an...@viber.com> wrote:
>
>> Hi,
>>
>> I'm having the same problem with HDFS sink.
>>
>> A 'poison' message which doesn't have timestamp header in it as the sink
>> expects.
>> This causes a NPE which ends in returning the message to the channel ,
>> over and over again.
>>
>> Is my only option to re-write the HDFS sink?
>> Isn't there any way to intercept in the sink work?
>>
>
> You can write a custom interceptor and remove/modify the poison message.
>
> Interceptors are called before message makes it way into the channel.
>
> http://flume.apache.org/FlumeUserGuide.html#flume-interceptors
>
> I wrote a blog about it a while back
> http://www.ashishpaliwal.com/blog/2013/06/flume-cookbook-implementing-custom-interceptors/
>
>
>
>>
>> Thanks
>> Anat
>>
>>
>> On Fri, Jul 26, 2013 at 3:35 AM, Arvind Prabhakar <ar...@apache.org>wrote:
>>
>>> Sounds like a bug in ElasticSearch sink to me. Do you mind filing a Jira
>>> to track this? Sample data to cause this would be even better.
>>>
>>> Regards,
>>> Arvind Prabhakar
>>>
>>>
>>> On Thu, Jul 25, 2013 at 9:50 AM, Jeremy Karlson <jeremykarlson@gmail.com
>>> > wrote:
>>>
>>>> This was using the provided ElasticSearch sink.  The logs were not
>>>> helpful.  I ran it through with the debugger and found the source of the
>>>> problem.
>>>>
>>>> ContentBuilderUtil uses a very "aggressive" method to determine if the
>>>> content is JSON; if it contains a "{" anywhere in it, it's considered JSON.
>>>>  My body contained that but wasn't JSON, causing the JSON parser to throw a
>>>> CharConversionException from addComplexField(...) (but not the expected
>>>> JSONException).  We've changed addComplexField(...) to catch different
>>>> types of exceptions and fall back to treating it as a simple field.  We'll
>>>> probably submit a patch for this soon.
>>>>
>>>> I'm reasonably happy with this, but I still think that in the bigger
>>>> picture there should be some sort of mechanism to automatically detect and
>>>> toss / skip / flag problematic events without them plugging up the flow.
>>>>
>>>> -- Jeremy
>>>>
>>>>
>>>> On Wed, Jul 24, 2013 at 7:51 PM, Arvind Prabhakar <ar...@apache.org>wrote:
>>>>
>>>>> Jeremy, would it be possible for you to show us logs for the part
>>>>> where the sink fails to remove an event from the channel? I am assuming
>>>>> this is a standard sink that Flume provides and not a custom one.
>>>>>
>>>>> The reason I ask is because sinks do not introspect the event, and
>>>>> hence there is no reason why it will fail during the event's removal. It is
>>>>> more likely that there is a problem within the channel in that it cannot
>>>>> dereference the event correctly. Looking at the logs will help us identify
>>>>> the root cause for what you are experiencing.
>>>>>
>>>>> Regards,
>>>>> Arvind Prabhakar
>>>>>
>>>>>
>>>>> On Wed, Jul 24, 2013 at 3:56 PM, Jeremy Karlson <
>>>>> jeremykarlson@gmail.com> wrote:
>>>>>
>>>>>> Both reasonable suggestions.  What would a custom sink look like in
>>>>>> this case, and how would I only eliminate the problem events since I don't
>>>>>> know what they are until they are attempted by the "real" sink?
>>>>>>
>>>>>> My philosophical concern (in general) is that we're taking the
>>>>>> approach of exhaustively finding and eliminating possible failure cases.
>>>>>>  It's not possible to eliminate every single failure case, so shouldn't
>>>>>> there be a method of last resort to eliminate problem events from the
>>>>>> channel?
>>>>>>
>>>>>> -- Jeremy
>>>>>>
>>>>>>
>>>>>>
>>>>>> On Wed, Jul 24, 2013 at 3:45 PM, Hari Shreedharan <
>>>>>> hshreedharan@cloudera.com> wrote:
>>>>>>
>>>>>>> Or you could write a custom sink that removes this event (more work
>>>>>>> of course)
>>>>>>>
>>>>>>>
>>>>>>> Thanks,
>>>>>>> Hari
>>>>>>>
>>>>>>> On Wednesday, July 24, 2013 at 3:36 PM, Roshan Naik wrote:
>>>>>>>
>>>>>>> if you have a way to identify such events.. you may be able to use
>>>>>>> the Regex interceptor to toss them out before they get into the channel.
>>>>>>>
>>>>>>>
>>>>>>>  On Wed, Jul 24, 2013 at 2:52 PM, Jeremy Karlson <
>>>>>>> jeremykarlson@gmail.com> wrote:
>>>>>>>
>>>>>>> Hi everyone.  My Flume adventures continue.
>>>>>>>
>>>>>>> I'm in a situation now where I have a channel that's filling because
>>>>>>> a stubborn message is stuck.  The sink won't accept it (for whatever
>>>>>>> reason; I can go into detail but that's not my point here).  This just
>>>>>>> blocks up the channel entirely, because it goes back into the channel when
>>>>>>> the sink refuses.  Obviously, this isn't ideal.
>>>>>>>
>>>>>>> I'm wondering what mechanisms, if any, Flume has to deal with these
>>>>>>> situations.  Things that come to mind might be:
>>>>>>>
>>>>>>> 1. Ditch the event after n attempts.
>>>>>>> 2. After n attempts, send the event to a "problem area" (maybe a
>>>>>>> different source / sink / channel?)  that someone can look at later.
>>>>>>> 3. Some sort of mechanism that allows operators to manually kill
>>>>>>> these messages.
>>>>>>>
>>>>>>> I'm open to suggestions on alternatives as well.
>>>>>>>
>>>>>>> Thanks.
>>>>>>>
>>>>>>> -- Jeremy
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>
>
> --
> thanks
> ashish
>
> Blog: http://www.ashishpaliwal.com/blog
> My Photo Galleries: http://www.pbase.com/ashishpaliwal
>

Re: Problem Events

Posted by Ashish <pa...@gmail.com>.
On Thu, Aug 1, 2013 at 1:29 PM, Anat Rozenzon <an...@viber.com> wrote:

> Hi,
>
> I'm having the same problem with HDFS sink.
>
> A 'poison' message which doesn't have timestamp header in it as the sink
> expects.
> This causes a NPE which ends in returning the message to the channel ,
> over and over again.
>
> Is my only option to re-write the HDFS sink?
> Isn't there any way to intercept in the sink work?
>

You can write a custom interceptor and remove/modify the poison message.

Interceptors are called before message makes it way into the channel.

http://flume.apache.org/FlumeUserGuide.html#flume-interceptors

I wrote a blog about it a while back
http://www.ashishpaliwal.com/blog/2013/06/flume-cookbook-implementing-custom-interceptors/



>
> Thanks
> Anat
>
>
> On Fri, Jul 26, 2013 at 3:35 AM, Arvind Prabhakar <ar...@apache.org>wrote:
>
>> Sounds like a bug in ElasticSearch sink to me. Do you mind filing a Jira
>> to track this? Sample data to cause this would be even better.
>>
>> Regards,
>> Arvind Prabhakar
>>
>>
>> On Thu, Jul 25, 2013 at 9:50 AM, Jeremy Karlson <je...@gmail.com>wrote:
>>
>>> This was using the provided ElasticSearch sink.  The logs were not
>>> helpful.  I ran it through with the debugger and found the source of the
>>> problem.
>>>
>>> ContentBuilderUtil uses a very "aggressive" method to determine if the
>>> content is JSON; if it contains a "{" anywhere in it, it's considered JSON.
>>>  My body contained that but wasn't JSON, causing the JSON parser to throw a
>>> CharConversionException from addComplexField(...) (but not the expected
>>> JSONException).  We've changed addComplexField(...) to catch different
>>> types of exceptions and fall back to treating it as a simple field.  We'll
>>> probably submit a patch for this soon.
>>>
>>> I'm reasonably happy with this, but I still think that in the bigger
>>> picture there should be some sort of mechanism to automatically detect and
>>> toss / skip / flag problematic events without them plugging up the flow.
>>>
>>> -- Jeremy
>>>
>>>
>>> On Wed, Jul 24, 2013 at 7:51 PM, Arvind Prabhakar <ar...@apache.org>wrote:
>>>
>>>> Jeremy, would it be possible for you to show us logs for the part where
>>>> the sink fails to remove an event from the channel? I am assuming this is a
>>>> standard sink that Flume provides and not a custom one.
>>>>
>>>> The reason I ask is because sinks do not introspect the event, and
>>>> hence there is no reason why it will fail during the event's removal. It is
>>>> more likely that there is a problem within the channel in that it cannot
>>>> dereference the event correctly. Looking at the logs will help us identify
>>>> the root cause for what you are experiencing.
>>>>
>>>> Regards,
>>>> Arvind Prabhakar
>>>>
>>>>
>>>> On Wed, Jul 24, 2013 at 3:56 PM, Jeremy Karlson <
>>>> jeremykarlson@gmail.com> wrote:
>>>>
>>>>> Both reasonable suggestions.  What would a custom sink look like in
>>>>> this case, and how would I only eliminate the problem events since I don't
>>>>> know what they are until they are attempted by the "real" sink?
>>>>>
>>>>> My philosophical concern (in general) is that we're taking the
>>>>> approach of exhaustively finding and eliminating possible failure cases.
>>>>>  It's not possible to eliminate every single failure case, so shouldn't
>>>>> there be a method of last resort to eliminate problem events from the
>>>>> channel?
>>>>>
>>>>> -- Jeremy
>>>>>
>>>>>
>>>>>
>>>>> On Wed, Jul 24, 2013 at 3:45 PM, Hari Shreedharan <
>>>>> hshreedharan@cloudera.com> wrote:
>>>>>
>>>>>> Or you could write a custom sink that removes this event (more work
>>>>>> of course)
>>>>>>
>>>>>>
>>>>>> Thanks,
>>>>>> Hari
>>>>>>
>>>>>> On Wednesday, July 24, 2013 at 3:36 PM, Roshan Naik wrote:
>>>>>>
>>>>>> if you have a way to identify such events.. you may be able to use
>>>>>> the Regex interceptor to toss them out before they get into the channel.
>>>>>>
>>>>>>
>>>>>>  On Wed, Jul 24, 2013 at 2:52 PM, Jeremy Karlson <
>>>>>> jeremykarlson@gmail.com> wrote:
>>>>>>
>>>>>> Hi everyone.  My Flume adventures continue.
>>>>>>
>>>>>> I'm in a situation now where I have a channel that's filling because
>>>>>> a stubborn message is stuck.  The sink won't accept it (for whatever
>>>>>> reason; I can go into detail but that's not my point here).  This just
>>>>>> blocks up the channel entirely, because it goes back into the channel when
>>>>>> the sink refuses.  Obviously, this isn't ideal.
>>>>>>
>>>>>> I'm wondering what mechanisms, if any, Flume has to deal with these
>>>>>> situations.  Things that come to mind might be:
>>>>>>
>>>>>> 1. Ditch the event after n attempts.
>>>>>> 2. After n attempts, send the event to a "problem area" (maybe a
>>>>>> different source / sink / channel?)  that someone can look at later.
>>>>>> 3. Some sort of mechanism that allows operators to manually kill
>>>>>> these messages.
>>>>>>
>>>>>> I'm open to suggestions on alternatives as well.
>>>>>>
>>>>>> Thanks.
>>>>>>
>>>>>> -- Jeremy
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>


-- 
thanks
ashish

Blog: http://www.ashishpaliwal.com/blog
My Photo Galleries: http://www.pbase.com/ashishpaliwal