You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flume.apache.org by James Estes <ja...@gmail.com> on 2013/12/02 18:21:19 UTC

Optional Channels

Hoping someone can point me in the right direction.  We're indexing our logs into elastic search just for added real time convenience and want to make that step optional.  Essentially, if we fall behind writing to ES, we would prefer to just skip ES (since we have a more durable channel for higher latency querying of the same data).  Optional Channels seemed to fit, but we haven't had much success. 

First, we set our config to have a Memory Channel and made it optional.  If the ES sink fell behind, the channel would fill and reject new events.  However, the channel throws an exception and the Channel Processor rolls back the transaction, causing the events to be put back on the queue to be attempted again.  The doc for getOptionalChannels says "A failure in writing the event to these channels must be ignored."  Should the transaction just always commit when optional channels fail (basically a best-effort commit-what-you-could since it was optional anyway)?

Second, we tried the PseudoTxMemoryChannel, but found it to also continue to bottleneck on ES.  Turns out that it uses queue.put instead of queue.offer, which means it will block until there is room in the queue to add the event.  MemoryChannel uses offer.  Should PseudoTxMemoryChannel switch to using offer always, or at least have an optional 'failFast' to enable that behavior?

Is there another way I can accomplish truly optional channels?  I do find it encouraging it takes this much effort to make Flume drop events :)

Thanks,

James


Re: Optional Channels

Posted by Allan Feid <al...@gmail.com>.
I just want to add here and say we experienced the same problem that James
is describing. However, we decided to postpone trying to fix it until we
had the resources to look deeper. It seems that the optional option doesn't
actually work in conjunction with memory channel + elasticsearch sink. This
was on flume 1.3.


On Tue, Dec 3, 2013 at 10:24 AM, James Estes <ja...@gmail.com> wrote:

> We're on flume 1.4.0.  Hm.  So looking at the code you are right…I'd not
> looked closely enough at the transaction behavior for the MemoryChannel.
>  When we started backing up, I just saw lots of the "Put queue for
> MemoryTransaction of capacity … full" ChannelExceptions and thought it must
> be retrying them.  I can look into it a bit more, it may just be a
> performance issue?  Maybe the bytesRemaning semaphore could be something
> I'd need to adjust?  In any case, we definitely were not keeping up (we
> were falling further and further behind).  I wound up essentially copying
> PseudoTxMemoryChannel and switched it to use offer instead of put and we
> were able to catch up quickly (dropping events of course).  Would it be
> reasonable to change the PseudoTxMemoryChannel to use offer vs put (even if
> via a config)?
>
> James
>
> On Dec 2, 2013, at 2:48 PM, Hari Shreedharan <hs...@cloudera.com>
> wrote:
>
> > What version of Flume are you using? If the channel does not accept the
> events, the transaction does get rolled back (so that the channel drops the
> references to the events), but the source would not retry the events again
> - since we do not throw a ChannelException to the source. You will see the
> rolled back log message, but the events are dropped and not tried again -
> the next set would get tried.
> >
> >
> > Thanks,
> > Hari
> >
> > On Monday, December 2, 2013 at 9:21 AM, James Estes wrote:
> >
> >> Hoping someone can point me in the right direction. We're indexing our
> logs into elastic search just for added real time convenience and want to
> make that step optional. Essentially, if we fall behind writing to ES, we
> would prefer to just skip ES (since we have a more durable channel for
> higher latency querying of the same data). Optional Channels seemed to fit,
> but we haven't had much success.
> >>
> >> First, we set our config to have a Memory Channel and made it optional.
> If the ES sink fell behind, the channel would fill and reject new events.
> However, the channel throws an exception and the Channel Processor rolls
> back the transaction, causing the events to be put back on the queue to be
> attempted again. The doc for getOptionalChannels says "A failure in writing
> the event to these channels must be ignored." Should the transaction just
> always commit when optional channels fail (basically a best-effort
> commit-what-you-could since it was optional anyway)?
> >>
> >> Second, we tried the PseudoTxMemoryChannel, but found it to also
> continue to bottleneck on ES. Turns out that it uses queue.put instead of
> queue.offer, which means it will block until there is room in the queue to
> add the event. MemoryChannel uses offer. Should PseudoTxMemoryChannel
> switch to using offer always, or at least have an optional 'failFast' to
> enable that behavior?
> >>
> >> Is there another way I can accomplish truly optional channels? I do
> find it encouraging it takes this much effort to make Flume drop events :)
> >>
> >> Thanks,
> >>
> >> James
> >
>
>

Re: Optional Channels

Posted by Jeff Lord <jl...@cloudera.com>.
Sounds reasonable to allow this via a config property.
Can you please submit the Jira?


On Tue, Dec 3, 2013 at 7:24 AM, James Estes <ja...@gmail.com> wrote:

> We're on flume 1.4.0.  Hm.  So looking at the code you are right…I'd not
> looked closely enough at the transaction behavior for the MemoryChannel.
>  When we started backing up, I just saw lots of the "Put queue for
> MemoryTransaction of capacity … full" ChannelExceptions and thought it must
> be retrying them.  I can look into it a bit more, it may just be a
> performance issue?  Maybe the bytesRemaning semaphore could be something
> I'd need to adjust?  In any case, we definitely were not keeping up (we
> were falling further and further behind).  I wound up essentially copying
> PseudoTxMemoryChannel and switched it to use offer instead of put and we
> were able to catch up quickly (dropping events of course).  Would it be
> reasonable to change the PseudoTxMemoryChannel to use offer vs put (even if
> via a config)?
>
> James
>
> On Dec 2, 2013, at 2:48 PM, Hari Shreedharan <hs...@cloudera.com>
> wrote:
>
> > What version of Flume are you using? If the channel does not accept the
> events, the transaction does get rolled back (so that the channel drops the
> references to the events), but the source would not retry the events again
> - since we do not throw a ChannelException to the source. You will see the
> rolled back log message, but the events are dropped and not tried again -
> the next set would get tried.
> >
> >
> > Thanks,
> > Hari
> >
> > On Monday, December 2, 2013 at 9:21 AM, James Estes wrote:
> >
> >> Hoping someone can point me in the right direction. We're indexing our
> logs into elastic search just for added real time convenience and want to
> make that step optional. Essentially, if we fall behind writing to ES, we
> would prefer to just skip ES (since we have a more durable channel for
> higher latency querying of the same data). Optional Channels seemed to fit,
> but we haven't had much success.
> >>
> >> First, we set our config to have a Memory Channel and made it optional.
> If the ES sink fell behind, the channel would fill and reject new events.
> However, the channel throws an exception and the Channel Processor rolls
> back the transaction, causing the events to be put back on the queue to be
> attempted again. The doc for getOptionalChannels says "A failure in writing
> the event to these channels must be ignored." Should the transaction just
> always commit when optional channels fail (basically a best-effort
> commit-what-you-could since it was optional anyway)?
> >>
> >> Second, we tried the PseudoTxMemoryChannel, but found it to also
> continue to bottleneck on ES. Turns out that it uses queue.put instead of
> queue.offer, which means it will block until there is room in the queue to
> add the event. MemoryChannel uses offer. Should PseudoTxMemoryChannel
> switch to using offer always, or at least have an optional 'failFast' to
> enable that behavior?
> >>
> >> Is there another way I can accomplish truly optional channels? I do
> find it encouraging it takes this much effort to make Flume drop events :)
> >>
> >> Thanks,
> >>
> >> James
> >
>
>

Re: Optional Channels

Posted by James Estes <ja...@gmail.com>.
We're on flume 1.4.0.  Hm.  So looking at the code you are right…I'd not looked closely enough at the transaction behavior for the MemoryChannel.  When we started backing up, I just saw lots of the "Put queue for MemoryTransaction of capacity … full" ChannelExceptions and thought it must be retrying them.  I can look into it a bit more, it may just be a performance issue?  Maybe the bytesRemaning semaphore could be something I'd need to adjust?  In any case, we definitely were not keeping up (we were falling further and further behind).  I wound up essentially copying PseudoTxMemoryChannel and switched it to use offer instead of put and we were able to catch up quickly (dropping events of course).  Would it be reasonable to change the PseudoTxMemoryChannel to use offer vs put (even if via a config)?

James

On Dec 2, 2013, at 2:48 PM, Hari Shreedharan <hs...@cloudera.com> wrote:

> What version of Flume are you using? If the channel does not accept the events, the transaction does get rolled back (so that the channel drops the references to the events), but the source would not retry the events again - since we do not throw a ChannelException to the source. You will see the rolled back log message, but the events are dropped and not tried again - the next set would get tried.
> 
> 
> Thanks,
> Hari
> 
> On Monday, December 2, 2013 at 9:21 AM, James Estes wrote:
> 
>> Hoping someone can point me in the right direction. We're indexing our logs into elastic search just for added real time convenience and want to make that step optional. Essentially, if we fall behind writing to ES, we would prefer to just skip ES (since we have a more durable channel for higher latency querying of the same data). Optional Channels seemed to fit, but we haven't had much success.
>> 
>> First, we set our config to have a Memory Channel and made it optional. If the ES sink fell behind, the channel would fill and reject new events. However, the channel throws an exception and the Channel Processor rolls back the transaction, causing the events to be put back on the queue to be attempted again. The doc for getOptionalChannels says "A failure in writing the event to these channels must be ignored." Should the transaction just always commit when optional channels fail (basically a best-effort commit-what-you-could since it was optional anyway)?
>> 
>> Second, we tried the PseudoTxMemoryChannel, but found it to also continue to bottleneck on ES. Turns out that it uses queue.put instead of queue.offer, which means it will block until there is room in the queue to add the event. MemoryChannel uses offer. Should PseudoTxMemoryChannel switch to using offer always, or at least have an optional 'failFast' to enable that behavior?
>> 
>> Is there another way I can accomplish truly optional channels? I do find it encouraging it takes this much effort to make Flume drop events :)
>> 
>> Thanks,
>> 
>> James
> 


Re: Optional Channels

Posted by Hari Shreedharan <hs...@cloudera.com>.
What version of Flume are you using? If the channel does not accept the events, the transaction does get rolled back (so that the channel drops the references to the events), but the source would not retry the events again - since we do not throw a ChannelException to the source. You will see the rolled back log message, but the events are dropped and not tried again - the next set would get tried. 


Thanks,
Hari


On Monday, December 2, 2013 at 9:21 AM, James Estes wrote:

> Hoping someone can point me in the right direction. We're indexing our logs into elastic search just for added real time convenience and want to make that step optional. Essentially, if we fall behind writing to ES, we would prefer to just skip ES (since we have a more durable channel for higher latency querying of the same data). Optional Channels seemed to fit, but we haven't had much success. 
> 
> First, we set our config to have a Memory Channel and made it optional. If the ES sink fell behind, the channel would fill and reject new events. However, the channel throws an exception and the Channel Processor rolls back the transaction, causing the events to be put back on the queue to be attempted again. The doc for getOptionalChannels says "A failure in writing the event to these channels must be ignored." Should the transaction just always commit when optional channels fail (basically a best-effort commit-what-you-could since it was optional anyway)?
> 
> Second, we tried the PseudoTxMemoryChannel, but found it to also continue to bottleneck on ES. Turns out that it uses queue.put instead of queue.offer, which means it will block until there is room in the queue to add the event. MemoryChannel uses offer. Should PseudoTxMemoryChannel switch to using offer always, or at least have an optional 'failFast' to enable that behavior?
> 
> Is there another way I can accomplish truly optional channels? I do find it encouraging it takes this much effort to make Flume drop events :)
> 
> Thanks,
> 
> James