You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flume.apache.org by Vikram Kulkarni <vi...@expedia.com> on 2013/03/11 04:52:46 UTC

Custom Sink

I am trying to write a custom sink for flume-ng. I looked at the existing sinks and documentation and coded it up. However, the 'process()' method that's supposed to receive the events always ends up with a null event. I am doing Event event = channel.take(); but the event is null. I see in the logs that this method is called repeatedly as the event is still in the channel so I think it is reaching the sink but unable to take it out of the channel.

Can someone point me in the right direction?

Thanks,

Vikram


Re: Custom Sink

Posted by Erik Bertrand <Er...@bertpc.com>.
I wrote a custom TCP Sink recently and would be happy to share the code;
it's based off the Logger sink (Brock Noland gave me a helpful link to the
source:
https://github.com/apache/flume/blob/trunk/flume-ng-core/src/main/java/org/apache/flume/sink/LoggerSink.java).
 My code is quite simple, pretty bare-bones at this point.  I'm planning to
make it more full-featured and robust, and will eventually put it out on
GitHub.  Msg me in the interim.

Erik



On Mon, Mar 11, 2013 at 2:06 AM, Hari Shreedharan <hshreedharan@cloudera.com
> wrote:

> HI Vivek,
>
> I cannot be sure of why that is happening. Channel.take() gets called even
> if there are no events in the channel. If the take() method returns null,
> then there are no events in the channel. You can use the Status.BACKOFF
> return value to tell the sink poller to not retry immediately. But
> eventually, the SinkRunner will poll the sink again. This is because the
> SinkRunner does not know the state of the channel, so by calling the
> process method, the sink can take events if they arrive. Generally, the
> sinks call Channel.take() and if an entire batch was non-empty it will
> return Status.READY, else (that is the batch is null), then return
> Status.BACKOFF. See the code from AvroSink as an example (I have taken out
> some error-handling and counter-handling stuff and added some comments):
>
>       transaction.begin();
>       for (int i = 0; i < client.getBatchSize(); i++) {
>         Event event = channel.take(); //Take an event from the channel
>
>         if (event == null) { //Channel returned null, did not have any
> more events.
>           break;
>         }
>         batch.add(event);
>       }
>
>       int size = batch.size();
>       int batchSize = client.getBatchSize();
>
>       if (size == 0) {           //The batch was empty, so backoff and try
> again later.
>         status = Status.BACKOFF;
>       } else {                   //Batch was not empty, don't backoff, try
> immediately after
>         client.appendBatch(batch);
>       }
>       transaction.commit();
>       transaction.close();
>
>       return status;
>
> I hope this helps. Another thing you could do is to take something like
> AvroSink/AbstractRpcSink and rip out all of the Avro/Rpc stuff and insert
> your logic into it without changing much of the channel/transaction stuff.
>
> Hope this helps.
>
>
> Hari
>
>
> On Sun, Mar 10, 2013 at 8:52 PM, Vikram Kulkarni <vi...@expedia.com>wrote:
>
>> I am trying to write a custom sink for flume-ng. I looked at the existing
>> sinks and documentation and coded it up. However, the 'process()' method
>> that's supposed to receive the events always ends up with a null event. I
>> am doing Event event = channel.take(); but the event is null. I see in the
>> logs that this method is called repeatedly as the event is still in the
>> channel so I think it is reaching the sink but unable to take it out of the
>> channel.
>>
>> Can someone point me in the right direction?
>>
>> Thanks,
>>
>> Vikram
>>
>>
>>
>

Re: Custom Sink

Posted by Vikram Kulkarni <vi...@expedia.com>.
Thanks, for the response. I now understand it and there were couple things going on.
I reverse engineered it from the simple logger sink and was able to receive events from the channel. Although occasionally some events seem to get dropped. I will work on it.

From: Hari Shreedharan <hs...@cloudera.com>>
Reply-To: "user@flume.apache.org<ma...@flume.apache.org>" <us...@flume.apache.org>>
Date: Sunday, March 10, 2013 11:06 PM
To: "user@flume.apache.org<ma...@flume.apache.org>" <us...@flume.apache.org>>
Subject: Re: Custom Sink

HI Vivek,

I cannot be sure of why that is happening. Channel.take() gets called even if there are no events in the channel. If the take() method returns null, then there are no events in the channel. You can use the Status.BACKOFF return value to tell the sink poller to not retry immediately. But eventually, the SinkRunner will poll the sink again. This is because the SinkRunner does not know the state of the channel, so by calling the process method, the sink can take events if they arrive. Generally, the sinks call Channel.take() and if an entire batch was non-empty it will return Status.READY, else (that is the batch is null), then return Status.BACKOFF. See the code from AvroSink as an example (I have taken out some error-handling and counter-handling stuff and added some comments):

      transaction.begin();
      for (int i = 0; i < client.getBatchSize(); i++) {
        Event event = channel.take(); //Take an event from the channel

        if (event == null) { //Channel returned null, did not have any more events.
          break;
        }
        batch.add(event);
      }

      int size = batch.size();
      int batchSize = client.getBatchSize();

      if (size == 0) {           //The batch was empty, so backoff and try again later.
        status = Status.BACKOFF;
      } else {                   //Batch was not empty, don't backoff, try immediately after
        client.appendBatch(batch);
      }
      transaction.commit();
      transaction.close();

      return status;

I hope this helps. Another thing you could do is to take something like AvroSink/AbstractRpcSink and rip out all of the Avro/Rpc stuff and insert your logic into it without changing much of the channel/transaction stuff.

Hope this helps.


Hari


On Sun, Mar 10, 2013 at 8:52 PM, Vikram Kulkarni <vi...@expedia.com>> wrote:

I am trying to write a custom sink for flume-ng. I looked at the existing sinks and documentation and coded it up. However, the 'process()' method that's supposed to receive the events always ends up with a null event. I am doing Event event = channel.take(); but the event is null. I see in the logs that this method is called repeatedly as the event is still in the channel so I think it is reaching the sink but unable to take it out of the channel.

Can someone point me in the right direction?

Thanks,

Vikram



Re: Custom Sink

Posted by Hari Shreedharan <hs...@cloudera.com>.
HI Vivek,

I cannot be sure of why that is happening. Channel.take() gets called even
if there are no events in the channel. If the take() method returns null,
then there are no events in the channel. You can use the Status.BACKOFF
return value to tell the sink poller to not retry immediately. But
eventually, the SinkRunner will poll the sink again. This is because the
SinkRunner does not know the state of the channel, so by calling the
process method, the sink can take events if they arrive. Generally, the
sinks call Channel.take() and if an entire batch was non-empty it will
return Status.READY, else (that is the batch is null), then return
Status.BACKOFF. See the code from AvroSink as an example (I have taken out
some error-handling and counter-handling stuff and added some comments):

      transaction.begin();
      for (int i = 0; i < client.getBatchSize(); i++) {
        Event event = channel.take(); //Take an event from the channel

        if (event == null) { //Channel returned null, did not have any more
events.
          break;
        }
        batch.add(event);
      }

      int size = batch.size();
      int batchSize = client.getBatchSize();

      if (size == 0) {           //The batch was empty, so backoff and try
again later.
        status = Status.BACKOFF;
      } else {                   //Batch was not empty, don't backoff, try
immediately after
        client.appendBatch(batch);
      }
      transaction.commit();
      transaction.close();

      return status;

I hope this helps. Another thing you could do is to take something like
AvroSink/AbstractRpcSink and rip out all of the Avro/Rpc stuff and insert
your logic into it without changing much of the channel/transaction stuff.

Hope this helps.


Hari


On Sun, Mar 10, 2013 at 8:52 PM, Vikram Kulkarni <vi...@expedia.com>wrote:

> I am trying to write a custom sink for flume-ng. I looked at the existing
> sinks and documentation and coded it up. However, the 'process()' method
> that's supposed to receive the events always ends up with a null event. I
> am doing Event event = channel.take(); but the event is null. I see in the
> logs that this method is called repeatedly as the event is still in the
> channel so I think it is reaching the sink but unable to take it out of the
> channel.
>
> Can someone point me in the right direction?
>
> Thanks,
>
> Vikram
>
>
>