You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flume.apache.org by Guillermo Ortiz <ko...@gmail.com> on 2014/12/03 11:46:36 UTC

Deal with duplicates in Flume with a crash.

Hi,

I would like to know if there's a easy way to deal with data
duplication when an agent crashs and it resends same data again.

Is there any mechanism to deal with it in Flume,

RE: Deal with duplicates in Flume with a crash.

Posted by Mike Keane <mk...@conversantmedia.com>.
>From our view, a single FlumeEvent is atomic, it either writes to HDFS or it does not.  At peak I'm fluming approximated 2 million log lines per second.  That would be 2 million check/and puts per second to HBase.  Putting 200 log lines in a single event results in 10,000 FlumeEvents, each with a UUID resulting in 10,000 Hbase check/puts per second.   When putting a single log line per FlumeEvent I was hammer HBase with 2,000,000 check/puts per second.  Addtionally I found I got orders of magnitude more throughput on a flume flow doing this than I did by increasing batch size.  The obvious trade off, I'm not running stock flume code.  

-Mike

________________________________________
From: Guillermo Ortiz [konstt2000@gmail.com]
Sent: Wednesday, December 03, 2014 4:46 PM
To: user@flume.apache.org
Subject: Re: Deal with duplicates in Flume with a crash.

That's interesting, do you have the RegionServers in different nodes
that your Flume Agents?? Because that could be a lot of traffic.
If you want to check duplicates for each log, the number of
checks/puts are always the same. What's the sense to put several logs
in the same event?

2014-12-03 23:35 GMT+01:00 Mike Keane <mk...@conversantmedia.com>:
> We effectively mitigated this problem by using the UUID interceptor and customizing the HDFS Sink to do a check and put of the UUID to HBase.  In the customized sink we check HBase to see if we have seen the UUID before, if we have it is a duplicate and we log a new duplicate metric with the existing sink metrics and throw the event away.  If we have not seen the UUID before we write the Event to HDFS and do a put of the UUID to hbase.
>
> Because of our volume to minimize the number of check/puts to HBase we put multiple logs in a single FlumeEvent.
>
>
> -Mike
>
> ________________________________________
> From: Guillermo Ortiz [konstt2000@gmail.com]
> Sent: Wednesday, December 03, 2014 4:15 PM
> To: user@flume.apache.org
> Subject: Re: Deal with duplicates in Flume with a crash.
>
> I didn't know anything about a Hive Sink, I'll check the JIRA about it, thanks.
> The pipeline is Flume-Kafka-SparkStreaming-XXX
>
> So I guess I should deal in SparkStreaming with it, right? I guess
> that it would be easy to do it with an UUID interceptor or is there
> another way easier?
>
> 2014-12-03 22:56 GMT+01:00 Roshan Naik <ro...@hortonworks.com>:
>> Using the UUID interceptor at the source closest to data origination.. it
>> will help identify duplicate events after they are delivered.
>>
>> If it satisfies your use case, the upcoming Hive Sink will mitigate the
>> problem a little bit (since it uses transactions to write to destination).
>>
>> -roshan
>>
>>
>> On Wed, Dec 3, 2014 at 8:44 AM, Joey Echeverria <jo...@cloudera.com> wrote:
>>>
>>> There's nothing built into Flume to deal with duplicates, it only
>>> provides at-least-once delivery semantics.
>>>
>>> You'll have to handle it in your data processing applications or add
>>> an ETL step to deal with duplicates before making data available for
>>> other queries.
>>>
>>> -Joey
>>>
>>> On Wed, Dec 3, 2014 at 5:46 AM, Guillermo Ortiz <ko...@gmail.com>
>>> wrote:
>>> > Hi,
>>> >
>>> > I would like to know if there's a easy way to deal with data
>>> > duplication when an agent crashs and it resends same data again.
>>> >
>>> > Is there any mechanism to deal with it in Flume,
>>>
>>>
>>>
>>> --
>>> Joey Echeverria
>>
>>
>>
>> CONFIDENTIALITY NOTICE
>> NOTICE: This message is intended for the use of the individual or entity to
>> which it is addressed and may contain information that is confidential,
>> privileged and exempt from disclosure under applicable law. If the reader of
>> this message is not the intended recipient, you are hereby notified that any
>> printing, copying, dissemination, distribution, disclosure or forwarding of
>> this communication is strictly prohibited. If you have received this
>> communication in error, please contact the sender immediately and delete it
>> from your system. Thank You.
>
>
>
>
> This email and any files included with it may contain privileged,
> proprietary and/or confidential information that is for the sole use
> of the intended recipient(s).  Any disclosure, copying, distribution,
> posting, or use of the information contained in or attached to this
> email is prohibited unless permitted by the sender.  If you have
> received this email in error, please immediately notify the sender
> via return email, telephone, or fax and destroy this original transmission
> and its included files without reading or saving it in any manner.
> Thank you.
>




This email and any files included with it may contain privileged,
proprietary and/or confidential information that is for the sole use
of the intended recipient(s).  Any disclosure, copying, distribution,
posting, or use of the information contained in or attached to this
email is prohibited unless permitted by the sender.  If you have
received this email in error, please immediately notify the sender
via return email, telephone, or fax and destroy this original transmission
and its included files without reading or saving it in any manner.
Thank you.


Re: Deal with duplicates in Flume with a crash.

Posted by Guillermo Ortiz <ko...@gmail.com>.
That's interesting, do you have the RegionServers in different nodes
that your Flume Agents?? Because that could be a lot of traffic.
If you want to check duplicates for each log, the number of
checks/puts are always the same. What's the sense to put several logs
in the same event?

2014-12-03 23:35 GMT+01:00 Mike Keane <mk...@conversantmedia.com>:
> We effectively mitigated this problem by using the UUID interceptor and customizing the HDFS Sink to do a check and put of the UUID to HBase.  In the customized sink we check HBase to see if we have seen the UUID before, if we have it is a duplicate and we log a new duplicate metric with the existing sink metrics and throw the event away.  If we have not seen the UUID before we write the Event to HDFS and do a put of the UUID to hbase.
>
> Because of our volume to minimize the number of check/puts to HBase we put multiple logs in a single FlumeEvent.
>
>
> -Mike
>
> ________________________________________
> From: Guillermo Ortiz [konstt2000@gmail.com]
> Sent: Wednesday, December 03, 2014 4:15 PM
> To: user@flume.apache.org
> Subject: Re: Deal with duplicates in Flume with a crash.
>
> I didn't know anything about a Hive Sink, I'll check the JIRA about it, thanks.
> The pipeline is Flume-Kafka-SparkStreaming-XXX
>
> So I guess I should deal in SparkStreaming with it, right? I guess
> that it would be easy to do it with an UUID interceptor or is there
> another way easier?
>
> 2014-12-03 22:56 GMT+01:00 Roshan Naik <ro...@hortonworks.com>:
>> Using the UUID interceptor at the source closest to data origination.. it
>> will help identify duplicate events after they are delivered.
>>
>> If it satisfies your use case, the upcoming Hive Sink will mitigate the
>> problem a little bit (since it uses transactions to write to destination).
>>
>> -roshan
>>
>>
>> On Wed, Dec 3, 2014 at 8:44 AM, Joey Echeverria <jo...@cloudera.com> wrote:
>>>
>>> There's nothing built into Flume to deal with duplicates, it only
>>> provides at-least-once delivery semantics.
>>>
>>> You'll have to handle it in your data processing applications or add
>>> an ETL step to deal with duplicates before making data available for
>>> other queries.
>>>
>>> -Joey
>>>
>>> On Wed, Dec 3, 2014 at 5:46 AM, Guillermo Ortiz <ko...@gmail.com>
>>> wrote:
>>> > Hi,
>>> >
>>> > I would like to know if there's a easy way to deal with data
>>> > duplication when an agent crashs and it resends same data again.
>>> >
>>> > Is there any mechanism to deal with it in Flume,
>>>
>>>
>>>
>>> --
>>> Joey Echeverria
>>
>>
>>
>> CONFIDENTIALITY NOTICE
>> NOTICE: This message is intended for the use of the individual or entity to
>> which it is addressed and may contain information that is confidential,
>> privileged and exempt from disclosure under applicable law. If the reader of
>> this message is not the intended recipient, you are hereby notified that any
>> printing, copying, dissemination, distribution, disclosure or forwarding of
>> this communication is strictly prohibited. If you have received this
>> communication in error, please contact the sender immediately and delete it
>> from your system. Thank You.
>
>
>
>
> This email and any files included with it may contain privileged,
> proprietary and/or confidential information that is for the sole use
> of the intended recipient(s).  Any disclosure, copying, distribution,
> posting, or use of the information contained in or attached to this
> email is prohibited unless permitted by the sender.  If you have
> received this email in error, please immediately notify the sender
> via return email, telephone, or fax and destroy this original transmission
> and its included files without reading or saving it in any manner.
> Thank you.
>

Re: Deal with duplicates in Flume with a crash.

Posted by Guillermo Ortiz <ko...@gmail.com>.
Thank you, pretty clear :)

2014-12-04 22:09 GMT+01:00 Mike Keane <mk...@conversantmedia.com>:
> UUID is on the FlumeEvent header. 10,000 FlumeEvents per second = 10,000
> check & puts to HBase.
>
> Each FlumeEvent has 200 log lines in it. If I was NOT doing a check & put to
> HBase for each FlumeEvent, each duplicated FlumeEvent results in all 200 log
> lines being duplicated.
>
> We evolved away from the UUID interceptor when we refactored our servers to
> use the EmbeddedAgent in our server stack as the starting point for all of
> our flume flows.
>
> At the highest level here is what we do:
>
> 1. Servers generating log data add to a LinkedBlockingQueue.
> 2. LinkedBlockingQueue appends logLines to StringBuffer until 200 lines
> added or 1 second is reached
> 3. Create a FlumeEvent with a UUID header
>
> Map<String, String> hdrs = new HashMap<String, String>();
> hdrs.put(EVENT_UNIQUE_ID, eventUniqueId);
> embeddedAgent.put(EventBuilder.withBody(<StringBuffer from
> LinkedBlockingQueue>, hdrs));
>
> 4. Add FlumeEvent to EmbeddedAgent object in server.
> 5.  Embedded agent sinks to collector tier
> 6.  Collector Tier Sinks to Storage Tier with custom sink that does the
> check and put.
>
> Prior to the EmbeddedAgent refactor our servers would create a FlumeEvent
> and use an RpcClient to send the event to a Application Tier agent which
> would use the UUID interceptor to add the UUID.   Our server refactory
> replaced the ApplicationTier agent with the EmbeddedAgent in our servers.
> For a diagram of the Tiers check out the Apache flume blog:
> https://blogs.apache.org/flume/entry/flume_performance_tuning_part_1
>
>
> -Mike
>
>
>
>
> ________________________________________
> From: Guillermo Ortiz [konstt2000@gmail.com]
> Sent: Thursday, December 04, 2014 2:14 AM
>
> To: user@flume.apache.org
> Subject: Re: Deal with duplicates in Flume with a crash.
>
> What I don't understand it's that you are getting an UUID for sets of
> 1000 lines, am I right? how could you know if there're duplicates if
> you are evaluating set of lines and not line per line with UUID?
>
> I thought that what you were doing:
> 1.Get a line from the Source X.
> 2.Calculate an UUID for a single line with an interceptor
> 3.Another interceptor checks this UUID in HBase. If it doesn't exist,
> you send to the channel and put the UUID in Hbase,
>
> If you are grouping the lines.. aren't you checking duplicates to set
> level??
>
> Maybe you're checking the UUID in the Sink, although I see the same
> problem. Where am I wrong??
>
> 2014-12-04 0:50 GMT+01:00 Mike Keane <mk...@conversantmedia.com>:
>> I'm not sure I understand your question but I'll be the first to admit
>> this is not fool proof.
>>
>> That said here are a couple inherent risks I am taking. Assume FlumeEventA
>> is one of 1000 events in a batch. If FlumeEventA makes it to FlumeAgent1 but
>> the batch fails it is entirely possible when the batch is resent it goes to
>> FlumeAgent2. Now this event is on 2 separate file channels, separate jvms
>> and separate servers. It is possible but extremely unlikely that FlumeEventA
>> is processed at the exact same time in FlumeAgent1 and FlumeAgent2. Both
>> agents pop the event off the channel, pull the UUID off the header and check
>> if it is in HBase. Both do not find it so both write to HDFS and we have a
>> duplicate. Considering the archetecture we believe the odds of this are
>> incredibly small and we are OK with the risk.
>>
>> Since the write to HDFS is in a transaction if it fails I don't do a HBase
>> put of the UUID, the transaction rolls back and we try again. I did a fair
>> amount studying the sink and bucketwriter code at the time to understand
>> what the fail conditions are when writing to HDFS. If I remember right it
>> could fail creating the file, writing to the file, closing the file and
>> renaming the file. We all have or own SLAs to meet. After a pretty thorough
>> review and amount of testing we were comfortable this met our SLA better
>> than a mapreduce job to dedupe 90 billion log lines per day.
>>
>> Joey Echeverria <jo...@cloudera.com> wrote:
>>
>>
>> What happens if the write to HDFS succeeds before the HBase put?
>>
>> -Joey
>>
>> On Wed, Dec 3, 2014 at 2:35 PM, Mike Keane <mk...@conversantmedia.com>
>> wrote:
>>> We effectively mitigated this problem by using the UUID interceptor and
>>> customizing the HDFS Sink to do a check and put of the UUID to HBase. In the
>>> customized sink we check HBase to see if we have seen the UUID before, if we
>>> have it is a duplicate and we log a new duplicate metric with the existing
>>> sink metrics and throw the event away. If we have not seen the UUID before
>>> we write the Event to HDFS and do a put of the UUID to hbase.
>>>
>>> Because of our volume to minimize the number of check/puts to HBase we
>>> put multiple logs in a single FlumeEvent.
>>>
>>>
>>> -Mike
>>>
>>> ________________________________________
>>> From: Guillermo Ortiz [konstt2000@gmail.com]
>>> Sent: Wednesday, December 03, 2014 4:15 PM
>>> To: user@flume.apache.org
>>> Subject: Re: Deal with duplicates in Flume with a crash.
>>>
>>> I didn't know anything about a Hive Sink, I'll check the JIRA about it,
>>> thanks.
>>> The pipeline is Flume-Kafka-SparkStreaming-XXX
>>>
>>> So I guess I should deal in SparkStreaming with it, right? I guess
>>> that it would be easy to do it with an UUID interceptor or is there
>>> another way easier?
>>>
>>> 2014-12-03 22:56 GMT+01:00 Roshan Naik <ro...@hortonworks.com>:
>>>> Using the UUID interceptor at the source closest to data origination..
>>>> it
>>>> will help identify duplicate events after they are delivered.
>>>>
>>>> If it satisfies your use case, the upcoming Hive Sink will mitigate the
>>>> problem a little bit (since it uses transactions to write to
>>>> destination).
>>>>
>>>> -roshan
>>>>
>>>>
>>>> On Wed, Dec 3, 2014 at 8:44 AM, Joey Echeverria <jo...@cloudera.com>
>>>> wrote:
>>>>>
>>>>> There's nothing built into Flume to deal with duplicates, it only
>>>>> provides at-least-once delivery semantics.
>>>>>
>>>>> You'll have to handle it in your data processing applications or add
>>>>> an ETL step to deal with duplicates before making data available for
>>>>> other queries.
>>>>>
>>>>> -Joey
>>>>>
>>>>> On Wed, Dec 3, 2014 at 5:46 AM, Guillermo Ortiz <ko...@gmail.com>
>>>>> wrote:
>>>>> > Hi,
>>>>> >
>>>>> > I would like to know if there's a easy way to deal with data
>>>>> > duplication when an agent crashs and it resends same data again.
>>>>> >
>>>>> > Is there any mechanism to deal with it in Flume,
>>>>>
>>>>>
>>>>>
>>>>> --
>>>>> Joey Echeverria
>>>>
>>>>
>>>>
>>>> CONFIDENTIALITY NOTICE
>>>> NOTICE: This message is intended for the use of the individual or entity
>>>> to
>>>> which it is addressed and may contain information that is confidential,
>>>> privileged and exempt from disclosure under applicable law. If the
>>>> reader of
>>>> this message is not the intended recipient, you are hereby notified that
>>>> any
>>>> printing, copying, dissemination, distribution, disclosure or forwarding
>>>> of
>>>> this communication is strictly prohibited. If you have received this
>>>> communication in error, please contact the sender immediately and delete
>>>> it
>>>> from your system. Thank You.
>>>
>>>
>>>
>>>
>>> This email and any files included with it may contain privileged,
>>> proprietary and/or confidential information that is for the sole use
>>> of the intended recipient(s). Any disclosure, copying, distribution,
>>> posting, or use of the information contained in or attached to this
>>> email is prohibited unless permitted by the sender. If you have
>>> received this email in error, please immediately notify the sender
>>> via return email, telephone, or fax and destroy this original
>>> transmission
>>> and its included files without reading or saving it in any manner.
>>> Thank you.
>>>
>>
>>
>>
>> --
>> Joey Echeverria
>>
>>
>>
>>
>> This email and any files included with it may contain privileged,
>> proprietary and/or confidential information that is for the sole use
>> of the intended recipient(s). Any disclosure, copying, distribution,
>> posting, or use of the information contained in or attached to this
>> email is prohibited unless permitted by the sender. If you have
>> received this email in error, please immediately notify the sender
>> via return email, telephone, or fax and destroy this original transmission
>> and its included files without reading or saving it in any manner.
>> Thank you.
>>
>
> This email and any files included with it may contain privileged,
> proprietary and/or confidential information that is for the sole use
> of the intended recipient(s).  Any disclosure, copying, distribution,
> posting, or use of the information contained in or attached to this
> email is prohibited unless permitted by the sender.  If you have
> received this email in error, please immediately notify the sender
> via return email, telephone, or fax and destroy this original transmission
> and its included files without reading or saving it in any manner.
> Thank you.

RE: Deal with duplicates in Flume with a crash.

Posted by Mike Keane <mk...@conversantmedia.com>.
UUID is on the FlumeEvent header. 10,000 FlumeEvents per second = 10,000 check & puts to HBase.

Each FlumeEvent has 200 log lines in it. If I was NOT doing a check & put to HBase for each FlumeEvent, each duplicated FlumeEvent results in all 200 log lines being duplicated.

We evolved away from the UUID interceptor when we refactored our servers to use the EmbeddedAgent in our server stack as the starting point for all of our flume flows.

At the highest level here is what we do:

1. Servers generating log data add to a LinkedBlockingQueue.
2. LinkedBlockingQueue appends logLines to StringBuffer until 200 lines added or 1 second is reached
3. Create a FlumeEvent with a UUID header
Map<String, String> hdrs = new HashMap<String, String>();
hdrs.put(EVENT_UNIQUE_ID, eventUniqueId);
embeddedAgent.put(EventBuilder.withBody(<StringBuffer from LinkedBlockingQueue>, hdrs));
4. Add FlumeEvent to EmbeddedAgent object in server.
5.  Embedded agent sinks to collector tier
6.  Collector Tier Sinks to Storage Tier with custom sink that does the check and put.

Prior to the EmbeddedAgent refactor our servers would create a FlumeEvent and use an RpcClient to send the event to a Application Tier agent which would use the UUID interceptor to add the UUID.   Our server refactory replaced the ApplicationTier agent with the EmbeddedAgent in our servers.  For a diagram of the Tiers check out the Apache flume blog: https://blogs.apache.org/flume/entry/flume_performance_tuning_part_1


-Mike




________________________________________
From: Guillermo Ortiz [konstt2000@gmail.com]
Sent: Thursday, December 04, 2014 2:14 AM
To: user@flume.apache.org
Subject: Re: Deal with duplicates in Flume with a crash.

What I don't understand it's that you are getting an UUID for sets of
1000 lines, am I right? how could you know if there're duplicates if
you are evaluating set of lines and not line per line with UUID?

I thought that what you were doing:
1.Get a line from the Source X.
2.Calculate an UUID for a single line with an interceptor
3.Another interceptor checks this UUID in HBase. If it doesn't exist,
you send to the channel and put the UUID in Hbase,

If you are grouping the lines.. aren't you checking duplicates to set level??

Maybe you're checking the UUID in the Sink, although I see the same
problem. Where am I wrong??

2014-12-04 0:50 GMT+01:00 Mike Keane <mk...@conversantmedia.com>:
> I'm not sure I understand your question but I'll be the first to admit this is not fool proof.
>
> That said here are a couple inherent risks I am taking. Assume FlumeEventA is one of 1000 events in a batch. If FlumeEventA makes it to FlumeAgent1 but the batch fails it is entirely possible when the batch is resent it goes to FlumeAgent2. Now this event is on 2 separate file channels, separate jvms and separate servers. It is possible but extremely unlikely that FlumeEventA is processed at the exact same time in FlumeAgent1 and FlumeAgent2. Both agents pop the event off the channel, pull the UUID off the header and check if it is in HBase. Both do not find it so both write to HDFS and we have a duplicate. Considering the archetecture we believe the odds of this are incredibly small and we are OK with the risk.
>
> Since the write to HDFS is in a transaction if it fails I don't do a HBase put of the UUID, the transaction rolls back and we try again. I did a fair amount studying the sink and bucketwriter code at the time to understand what the fail conditions are when writing to HDFS. If I remember right it could fail creating the file, writing to the file, closing the file and renaming the file. We all have or own SLAs to meet. After a pretty thorough review and amount of testing we were comfortable this met our SLA better than a mapreduce job to dedupe 90 billion log lines per day.
>
> Joey Echeverria <jo...@cloudera.com> wrote:
>
>
> What happens if the write to HDFS succeeds before the HBase put?
>
> -Joey
>
> On Wed, Dec 3, 2014 at 2:35 PM, Mike Keane <mk...@conversantmedia.com> wrote:
>> We effectively mitigated this problem by using the UUID interceptor and customizing the HDFS Sink to do a check and put of the UUID to HBase. In the customized sink we check HBase to see if we have seen the UUID before, if we have it is a duplicate and we log a new duplicate metric with the existing sink metrics and throw the event away. If we have not seen the UUID before we write the Event to HDFS and do a put of the UUID to hbase.
>>
>> Because of our volume to minimize the number of check/puts to HBase we put multiple logs in a single FlumeEvent.
>>
>>
>> -Mike
>>
>> ________________________________________
>> From: Guillermo Ortiz [konstt2000@gmail.com]
>> Sent: Wednesday, December 03, 2014 4:15 PM
>> To: user@flume.apache.org
>> Subject: Re: Deal with duplicates in Flume with a crash.
>>
>> I didn't know anything about a Hive Sink, I'll check the JIRA about it, thanks.
>> The pipeline is Flume-Kafka-SparkStreaming-XXX
>>
>> So I guess I should deal in SparkStreaming with it, right? I guess
>> that it would be easy to do it with an UUID interceptor or is there
>> another way easier?
>>
>> 2014-12-03 22:56 GMT+01:00 Roshan Naik <ro...@hortonworks.com>:
>>> Using the UUID interceptor at the source closest to data origination.. it
>>> will help identify duplicate events after they are delivered.
>>>
>>> If it satisfies your use case, the upcoming Hive Sink will mitigate the
>>> problem a little bit (since it uses transactions to write to destination).
>>>
>>> -roshan
>>>
>>>
>>> On Wed, Dec 3, 2014 at 8:44 AM, Joey Echeverria <jo...@cloudera.com> wrote:
>>>>
>>>> There's nothing built into Flume to deal with duplicates, it only
>>>> provides at-least-once delivery semantics.
>>>>
>>>> You'll have to handle it in your data processing applications or add
>>>> an ETL step to deal with duplicates before making data available for
>>>> other queries.
>>>>
>>>> -Joey
>>>>
>>>> On Wed, Dec 3, 2014 at 5:46 AM, Guillermo Ortiz <ko...@gmail.com>
>>>> wrote:
>>>> > Hi,
>>>> >
>>>> > I would like to know if there's a easy way to deal with data
>>>> > duplication when an agent crashs and it resends same data again.
>>>> >
>>>> > Is there any mechanism to deal with it in Flume,
>>>>
>>>>
>>>>
>>>> --
>>>> Joey Echeverria
>>>
>>>
>>>
>>> CONFIDENTIALITY NOTICE
>>> NOTICE: This message is intended for the use of the individual or entity to
>>> which it is addressed and may contain information that is confidential,
>>> privileged and exempt from disclosure under applicable law. If the reader of
>>> this message is not the intended recipient, you are hereby notified that any
>>> printing, copying, dissemination, distribution, disclosure or forwarding of
>>> this communication is strictly prohibited. If you have received this
>>> communication in error, please contact the sender immediately and delete it
>>> from your system. Thank You.
>>
>>
>>
>>
>> This email and any files included with it may contain privileged,
>> proprietary and/or confidential information that is for the sole use
>> of the intended recipient(s). Any disclosure, copying, distribution,
>> posting, or use of the information contained in or attached to this
>> email is prohibited unless permitted by the sender. If you have
>> received this email in error, please immediately notify the sender
>> via return email, telephone, or fax and destroy this original transmission
>> and its included files without reading or saving it in any manner.
>> Thank you.
>>
>
>
>
> --
> Joey Echeverria
>
>
>
>
> This email and any files included with it may contain privileged,
> proprietary and/or confidential information that is for the sole use
> of the intended recipient(s). Any disclosure, copying, distribution,
> posting, or use of the information contained in or attached to this
> email is prohibited unless permitted by the sender. If you have
> received this email in error, please immediately notify the sender
> via return email, telephone, or fax and destroy this original transmission
> and its included files without reading or saving it in any manner.
> Thank you.
>




This email and any files included with it may contain privileged,
proprietary and/or confidential information that is for the sole use
of the intended recipient(s).  Any disclosure, copying, distribution,
posting, or use of the information contained in or attached to this
email is prohibited unless permitted by the sender.  If you have
received this email in error, please immediately notify the sender
via return email, telephone, or fax and destroy this original transmission
and its included files without reading or saving it in any manner.
Thank you.

Re: Deal with duplicates in Flume with a crash.

Posted by Guillermo Ortiz <ko...@gmail.com>.
What I don't understand it's that you are getting an UUID for sets of
1000 lines, am I right? how could you know if there're duplicates if
you are evaluating set of lines and not line per line with UUID?

I thought that what you were doing:
1.Get a line from the Source X.
2.Calculate an UUID for a single line with an interceptor
3.Another interceptor checks this UUID in HBase. If it doesn't exist,
you send to the channel and put the UUID in Hbase,

If you are grouping the lines.. aren't you checking duplicates to set level??

Maybe you're checking the UUID in the Sink, although I see the same
problem. Where am I wrong??

2014-12-04 0:50 GMT+01:00 Mike Keane <mk...@conversantmedia.com>:
> I'm not sure I understand your question but I'll be the first to admit this is not fool proof.
>
> That said here are a couple inherent risks I am taking.  Assume FlumeEventA is one of 1000 events in a batch.  If FlumeEventA makes it to FlumeAgent1 but the batch fails it is entirely possible when the batch is resent it goes to FlumeAgent2.  Now this event is on 2 separate file channels, separate jvms and separate servers.  It is possible but extremely unlikely that FlumeEventA is processed at the exact same time in FlumeAgent1 and FlumeAgent2.  Both agents pop the event off the channel, pull the UUID off the header and check if it is in HBase.  Both do not find it so both write to HDFS and we have a duplicate.  Considering the archetecture we believe the odds of this are incredibly small and we are OK with the risk.
>
> Since the write to HDFS is in a transaction if it fails I don't do a HBase put of the UUID, the transaction rolls back and we try again.  I did a fair amount studying the sink and bucketwriter code at the time to understand what the fail conditions are when writing to HDFS.  If I remember right it could fail creating the file, writing to the file, closing the file and renaming the file.  We all have or own SLAs to meet.  After a pretty thorough review and amount of testing we were comfortable this met our SLA better than a mapreduce job to dedupe 90 billion log lines per day.
>
> Joey Echeverria <jo...@cloudera.com> wrote:
>
>
> What happens if the write to HDFS succeeds before the HBase put?
>
> -Joey
>
> On Wed, Dec 3, 2014 at 2:35 PM, Mike Keane <mk...@conversantmedia.com> wrote:
>> We effectively mitigated this problem by using the UUID interceptor and customizing the HDFS Sink to do a check and put of the UUID to HBase.  In the customized sink we check HBase to see if we have seen the UUID before, if we have it is a duplicate and we log a new duplicate metric with the existing sink metrics and throw the event away.  If we have not seen the UUID before we write the Event to HDFS and do a put of the UUID to hbase.
>>
>> Because of our volume to minimize the number of check/puts to HBase we put multiple logs in a single FlumeEvent.
>>
>>
>> -Mike
>>
>> ________________________________________
>> From: Guillermo Ortiz [konstt2000@gmail.com]
>> Sent: Wednesday, December 03, 2014 4:15 PM
>> To: user@flume.apache.org
>> Subject: Re: Deal with duplicates in Flume with a crash.
>>
>> I didn't know anything about a Hive Sink, I'll check the JIRA about it, thanks.
>> The pipeline is Flume-Kafka-SparkStreaming-XXX
>>
>> So I guess I should deal in SparkStreaming with it, right? I guess
>> that it would be easy to do it with an UUID interceptor or is there
>> another way easier?
>>
>> 2014-12-03 22:56 GMT+01:00 Roshan Naik <ro...@hortonworks.com>:
>>> Using the UUID interceptor at the source closest to data origination.. it
>>> will help identify duplicate events after they are delivered.
>>>
>>> If it satisfies your use case, the upcoming Hive Sink will mitigate the
>>> problem a little bit (since it uses transactions to write to destination).
>>>
>>> -roshan
>>>
>>>
>>> On Wed, Dec 3, 2014 at 8:44 AM, Joey Echeverria <jo...@cloudera.com> wrote:
>>>>
>>>> There's nothing built into Flume to deal with duplicates, it only
>>>> provides at-least-once delivery semantics.
>>>>
>>>> You'll have to handle it in your data processing applications or add
>>>> an ETL step to deal with duplicates before making data available for
>>>> other queries.
>>>>
>>>> -Joey
>>>>
>>>> On Wed, Dec 3, 2014 at 5:46 AM, Guillermo Ortiz <ko...@gmail.com>
>>>> wrote:
>>>> > Hi,
>>>> >
>>>> > I would like to know if there's a easy way to deal with data
>>>> > duplication when an agent crashs and it resends same data again.
>>>> >
>>>> > Is there any mechanism to deal with it in Flume,
>>>>
>>>>
>>>>
>>>> --
>>>> Joey Echeverria
>>>
>>>
>>>
>>> CONFIDENTIALITY NOTICE
>>> NOTICE: This message is intended for the use of the individual or entity to
>>> which it is addressed and may contain information that is confidential,
>>> privileged and exempt from disclosure under applicable law. If the reader of
>>> this message is not the intended recipient, you are hereby notified that any
>>> printing, copying, dissemination, distribution, disclosure or forwarding of
>>> this communication is strictly prohibited. If you have received this
>>> communication in error, please contact the sender immediately and delete it
>>> from your system. Thank You.
>>
>>
>>
>>
>> This email and any files included with it may contain privileged,
>> proprietary and/or confidential information that is for the sole use
>> of the intended recipient(s).  Any disclosure, copying, distribution,
>> posting, or use of the information contained in or attached to this
>> email is prohibited unless permitted by the sender.  If you have
>> received this email in error, please immediately notify the sender
>> via return email, telephone, or fax and destroy this original transmission
>> and its included files without reading or saving it in any manner.
>> Thank you.
>>
>
>
>
> --
> Joey Echeverria
>
>
>
>
> This email and any files included with it may contain privileged,
> proprietary and/or confidential information that is for the sole use
> of the intended recipient(s).  Any disclosure, copying, distribution,
> posting, or use of the information contained in or attached to this
> email is prohibited unless permitted by the sender.  If you have
> received this email in error, please immediately notify the sender
> via return email, telephone, or fax and destroy this original transmission
> and its included files without reading or saving it in any manner.
> Thank you.
>

Re: Deal with duplicates in Flume with a crash.

Posted by Mike Keane <mk...@conversantmedia.com>.
I'm not sure I understand your question but I'll be the first to admit this is not fool proof.

That said here are a couple inherent risks I am taking.  Assume FlumeEventA is one of 1000 events in a batch.  If FlumeEventA makes it to FlumeAgent1 but the batch fails it is entirely possible when the batch is resent it goes to FlumeAgent2.  Now this event is on 2 separate file channels, separate jvms and separate servers.  It is possible but extremely unlikely that FlumeEventA is processed at the exact same time in FlumeAgent1 and FlumeAgent2.  Both agents pop the event off the channel, pull the UUID off the header and check if it is in HBase.  Both do not find it so both write to HDFS and we have a duplicate.  Considering the archetecture we believe the odds of this are incredibly small and we are OK with the risk.

Since the write to HDFS is in a transaction if it fails I don't do a HBase put of the UUID, the transaction rolls back and we try again.  I did a fair amount studying the sink and bucketwriter code at the time to understand what the fail conditions are when writing to HDFS.  If I remember right it could fail creating the file, writing to the file, closing the file and renaming the file.  We all have or own SLAs to meet.  After a pretty thorough review and amount of testing we were comfortable this met our SLA better than a mapreduce job to dedupe 90 billion log lines per day.

Joey Echeverria <jo...@cloudera.com> wrote:


What happens if the write to HDFS succeeds before the HBase put?

-Joey

On Wed, Dec 3, 2014 at 2:35 PM, Mike Keane <mk...@conversantmedia.com> wrote:
> We effectively mitigated this problem by using the UUID interceptor and customizing the HDFS Sink to do a check and put of the UUID to HBase.  In the customized sink we check HBase to see if we have seen the UUID before, if we have it is a duplicate and we log a new duplicate metric with the existing sink metrics and throw the event away.  If we have not seen the UUID before we write the Event to HDFS and do a put of the UUID to hbase.
>
> Because of our volume to minimize the number of check/puts to HBase we put multiple logs in a single FlumeEvent.
>
>
> -Mike
>
> ________________________________________
> From: Guillermo Ortiz [konstt2000@gmail.com]
> Sent: Wednesday, December 03, 2014 4:15 PM
> To: user@flume.apache.org
> Subject: Re: Deal with duplicates in Flume with a crash.
>
> I didn't know anything about a Hive Sink, I'll check the JIRA about it, thanks.
> The pipeline is Flume-Kafka-SparkStreaming-XXX
>
> So I guess I should deal in SparkStreaming with it, right? I guess
> that it would be easy to do it with an UUID interceptor or is there
> another way easier?
>
> 2014-12-03 22:56 GMT+01:00 Roshan Naik <ro...@hortonworks.com>:
>> Using the UUID interceptor at the source closest to data origination.. it
>> will help identify duplicate events after they are delivered.
>>
>> If it satisfies your use case, the upcoming Hive Sink will mitigate the
>> problem a little bit (since it uses transactions to write to destination).
>>
>> -roshan
>>
>>
>> On Wed, Dec 3, 2014 at 8:44 AM, Joey Echeverria <jo...@cloudera.com> wrote:
>>>
>>> There's nothing built into Flume to deal with duplicates, it only
>>> provides at-least-once delivery semantics.
>>>
>>> You'll have to handle it in your data processing applications or add
>>> an ETL step to deal with duplicates before making data available for
>>> other queries.
>>>
>>> -Joey
>>>
>>> On Wed, Dec 3, 2014 at 5:46 AM, Guillermo Ortiz <ko...@gmail.com>
>>> wrote:
>>> > Hi,
>>> >
>>> > I would like to know if there's a easy way to deal with data
>>> > duplication when an agent crashs and it resends same data again.
>>> >
>>> > Is there any mechanism to deal with it in Flume,
>>>
>>>
>>>
>>> --
>>> Joey Echeverria
>>
>>
>>
>> CONFIDENTIALITY NOTICE
>> NOTICE: This message is intended for the use of the individual or entity to
>> which it is addressed and may contain information that is confidential,
>> privileged and exempt from disclosure under applicable law. If the reader of
>> this message is not the intended recipient, you are hereby notified that any
>> printing, copying, dissemination, distribution, disclosure or forwarding of
>> this communication is strictly prohibited. If you have received this
>> communication in error, please contact the sender immediately and delete it
>> from your system. Thank You.
>
>
>
>
> This email and any files included with it may contain privileged,
> proprietary and/or confidential information that is for the sole use
> of the intended recipient(s).  Any disclosure, copying, distribution,
> posting, or use of the information contained in or attached to this
> email is prohibited unless permitted by the sender.  If you have
> received this email in error, please immediately notify the sender
> via return email, telephone, or fax and destroy this original transmission
> and its included files without reading or saving it in any manner.
> Thank you.
>



--
Joey Echeverria




This email and any files included with it may contain privileged,
proprietary and/or confidential information that is for the sole use
of the intended recipient(s).  Any disclosure, copying, distribution,
posting, or use of the information contained in or attached to this
email is prohibited unless permitted by the sender.  If you have
received this email in error, please immediately notify the sender
via return email, telephone, or fax and destroy this original transmission
and its included files without reading or saving it in any manner.
Thank you.


Re: Deal with duplicates in Flume with a crash.

Posted by Joey Echeverria <jo...@cloudera.com>.
What happens if the write to HDFS succeeds before the HBase put?

-Joey

On Wed, Dec 3, 2014 at 2:35 PM, Mike Keane <mk...@conversantmedia.com> wrote:
> We effectively mitigated this problem by using the UUID interceptor and customizing the HDFS Sink to do a check and put of the UUID to HBase.  In the customized sink we check HBase to see if we have seen the UUID before, if we have it is a duplicate and we log a new duplicate metric with the existing sink metrics and throw the event away.  If we have not seen the UUID before we write the Event to HDFS and do a put of the UUID to hbase.
>
> Because of our volume to minimize the number of check/puts to HBase we put multiple logs in a single FlumeEvent.
>
>
> -Mike
>
> ________________________________________
> From: Guillermo Ortiz [konstt2000@gmail.com]
> Sent: Wednesday, December 03, 2014 4:15 PM
> To: user@flume.apache.org
> Subject: Re: Deal with duplicates in Flume with a crash.
>
> I didn't know anything about a Hive Sink, I'll check the JIRA about it, thanks.
> The pipeline is Flume-Kafka-SparkStreaming-XXX
>
> So I guess I should deal in SparkStreaming with it, right? I guess
> that it would be easy to do it with an UUID interceptor or is there
> another way easier?
>
> 2014-12-03 22:56 GMT+01:00 Roshan Naik <ro...@hortonworks.com>:
>> Using the UUID interceptor at the source closest to data origination.. it
>> will help identify duplicate events after they are delivered.
>>
>> If it satisfies your use case, the upcoming Hive Sink will mitigate the
>> problem a little bit (since it uses transactions to write to destination).
>>
>> -roshan
>>
>>
>> On Wed, Dec 3, 2014 at 8:44 AM, Joey Echeverria <jo...@cloudera.com> wrote:
>>>
>>> There's nothing built into Flume to deal with duplicates, it only
>>> provides at-least-once delivery semantics.
>>>
>>> You'll have to handle it in your data processing applications or add
>>> an ETL step to deal with duplicates before making data available for
>>> other queries.
>>>
>>> -Joey
>>>
>>> On Wed, Dec 3, 2014 at 5:46 AM, Guillermo Ortiz <ko...@gmail.com>
>>> wrote:
>>> > Hi,
>>> >
>>> > I would like to know if there's a easy way to deal with data
>>> > duplication when an agent crashs and it resends same data again.
>>> >
>>> > Is there any mechanism to deal with it in Flume,
>>>
>>>
>>>
>>> --
>>> Joey Echeverria
>>
>>
>>
>> CONFIDENTIALITY NOTICE
>> NOTICE: This message is intended for the use of the individual or entity to
>> which it is addressed and may contain information that is confidential,
>> privileged and exempt from disclosure under applicable law. If the reader of
>> this message is not the intended recipient, you are hereby notified that any
>> printing, copying, dissemination, distribution, disclosure or forwarding of
>> this communication is strictly prohibited. If you have received this
>> communication in error, please contact the sender immediately and delete it
>> from your system. Thank You.
>
>
>
>
> This email and any files included with it may contain privileged,
> proprietary and/or confidential information that is for the sole use
> of the intended recipient(s).  Any disclosure, copying, distribution,
> posting, or use of the information contained in or attached to this
> email is prohibited unless permitted by the sender.  If you have
> received this email in error, please immediately notify the sender
> via return email, telephone, or fax and destroy this original transmission
> and its included files without reading or saving it in any manner.
> Thank you.
>



-- 
Joey Echeverria

RE: Deal with duplicates in Flume with a crash.

Posted by Mike Keane <mk...@conversantmedia.com>.
We effectively mitigated this problem by using the UUID interceptor and customizing the HDFS Sink to do a check and put of the UUID to HBase.  In the customized sink we check HBase to see if we have seen the UUID before, if we have it is a duplicate and we log a new duplicate metric with the existing sink metrics and throw the event away.  If we have not seen the UUID before we write the Event to HDFS and do a put of the UUID to hbase.

Because of our volume to minimize the number of check/puts to HBase we put multiple logs in a single FlumeEvent. 


-Mike

________________________________________
From: Guillermo Ortiz [konstt2000@gmail.com]
Sent: Wednesday, December 03, 2014 4:15 PM
To: user@flume.apache.org
Subject: Re: Deal with duplicates in Flume with a crash.

I didn't know anything about a Hive Sink, I'll check the JIRA about it, thanks.
The pipeline is Flume-Kafka-SparkStreaming-XXX

So I guess I should deal in SparkStreaming with it, right? I guess
that it would be easy to do it with an UUID interceptor or is there
another way easier?

2014-12-03 22:56 GMT+01:00 Roshan Naik <ro...@hortonworks.com>:
> Using the UUID interceptor at the source closest to data origination.. it
> will help identify duplicate events after they are delivered.
>
> If it satisfies your use case, the upcoming Hive Sink will mitigate the
> problem a little bit (since it uses transactions to write to destination).
>
> -roshan
>
>
> On Wed, Dec 3, 2014 at 8:44 AM, Joey Echeverria <jo...@cloudera.com> wrote:
>>
>> There's nothing built into Flume to deal with duplicates, it only
>> provides at-least-once delivery semantics.
>>
>> You'll have to handle it in your data processing applications or add
>> an ETL step to deal with duplicates before making data available for
>> other queries.
>>
>> -Joey
>>
>> On Wed, Dec 3, 2014 at 5:46 AM, Guillermo Ortiz <ko...@gmail.com>
>> wrote:
>> > Hi,
>> >
>> > I would like to know if there's a easy way to deal with data
>> > duplication when an agent crashs and it resends same data again.
>> >
>> > Is there any mechanism to deal with it in Flume,
>>
>>
>>
>> --
>> Joey Echeverria
>
>
>
> CONFIDENTIALITY NOTICE
> NOTICE: This message is intended for the use of the individual or entity to
> which it is addressed and may contain information that is confidential,
> privileged and exempt from disclosure under applicable law. If the reader of
> this message is not the intended recipient, you are hereby notified that any
> printing, copying, dissemination, distribution, disclosure or forwarding of
> this communication is strictly prohibited. If you have received this
> communication in error, please contact the sender immediately and delete it
> from your system. Thank You.




This email and any files included with it may contain privileged,
proprietary and/or confidential information that is for the sole use
of the intended recipient(s).  Any disclosure, copying, distribution,
posting, or use of the information contained in or attached to this
email is prohibited unless permitted by the sender.  If you have
received this email in error, please immediately notify the sender
via return email, telephone, or fax and destroy this original transmission
and its included files without reading or saving it in any manner.
Thank you.


Re: Deal with duplicates in Flume with a crash.

Posted by Guillermo Ortiz <ko...@gmail.com>.
I didn't know anything about a Hive Sink, I'll check the JIRA about it, thanks.
The pipeline is Flume-Kafka-SparkStreaming-XXX

So I guess I should deal in SparkStreaming with it, right? I guess
that it would be easy to do it with an UUID interceptor or is there
another way easier?

2014-12-03 22:56 GMT+01:00 Roshan Naik <ro...@hortonworks.com>:
> Using the UUID interceptor at the source closest to data origination.. it
> will help identify duplicate events after they are delivered.
>
> If it satisfies your use case, the upcoming Hive Sink will mitigate the
> problem a little bit (since it uses transactions to write to destination).
>
> -roshan
>
>
> On Wed, Dec 3, 2014 at 8:44 AM, Joey Echeverria <jo...@cloudera.com> wrote:
>>
>> There's nothing built into Flume to deal with duplicates, it only
>> provides at-least-once delivery semantics.
>>
>> You'll have to handle it in your data processing applications or add
>> an ETL step to deal with duplicates before making data available for
>> other queries.
>>
>> -Joey
>>
>> On Wed, Dec 3, 2014 at 5:46 AM, Guillermo Ortiz <ko...@gmail.com>
>> wrote:
>> > Hi,
>> >
>> > I would like to know if there's a easy way to deal with data
>> > duplication when an agent crashs and it resends same data again.
>> >
>> > Is there any mechanism to deal with it in Flume,
>>
>>
>>
>> --
>> Joey Echeverria
>
>
>
> CONFIDENTIALITY NOTICE
> NOTICE: This message is intended for the use of the individual or entity to
> which it is addressed and may contain information that is confidential,
> privileged and exempt from disclosure under applicable law. If the reader of
> this message is not the intended recipient, you are hereby notified that any
> printing, copying, dissemination, distribution, disclosure or forwarding of
> this communication is strictly prohibited. If you have received this
> communication in error, please contact the sender immediately and delete it
> from your system. Thank You.

Re: Deal with duplicates in Flume with a crash.

Posted by Roshan Naik <ro...@hortonworks.com>.
Using the UUID interceptor at the source closest to data origination.. it
will help identify duplicate events after they are delivered.

If it satisfies your use case, the upcoming Hive Sink will mitigate the
problem a little bit (since it uses transactions to write to destination).

-roshan


On Wed, Dec 3, 2014 at 8:44 AM, Joey Echeverria <jo...@cloudera.com> wrote:

> There's nothing built into Flume to deal with duplicates, it only
> provides at-least-once delivery semantics.
>
> You'll have to handle it in your data processing applications or add
> an ETL step to deal with duplicates before making data available for
> other queries.
>
> -Joey
>
> On Wed, Dec 3, 2014 at 5:46 AM, Guillermo Ortiz <ko...@gmail.com>
> wrote:
> > Hi,
> >
> > I would like to know if there's a easy way to deal with data
> > duplication when an agent crashs and it resends same data again.
> >
> > Is there any mechanism to deal with it in Flume,
>
>
>
> --
> Joey Echeverria
>

-- 
CONFIDENTIALITY NOTICE
NOTICE: This message is intended for the use of the individual or entity to 
which it is addressed and may contain information that is confidential, 
privileged and exempt from disclosure under applicable law. If the reader 
of this message is not the intended recipient, you are hereby notified that 
any printing, copying, dissemination, distribution, disclosure or 
forwarding of this communication is strictly prohibited. If you have 
received this communication in error, please contact the sender immediately 
and delete it from your system. Thank You.

Re: Deal with duplicates in Flume with a crash.

Posted by Joey Echeverria <jo...@cloudera.com>.
There's nothing built into Flume to deal with duplicates, it only
provides at-least-once delivery semantics.

You'll have to handle it in your data processing applications or add
an ETL step to deal with duplicates before making data available for
other queries.

-Joey

On Wed, Dec 3, 2014 at 5:46 AM, Guillermo Ortiz <ko...@gmail.com> wrote:
> Hi,
>
> I would like to know if there's a easy way to deal with data
> duplication when an agent crashs and it resends same data again.
>
> Is there any mechanism to deal with it in Flume,



-- 
Joey Echeverria