You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@kafka.apache.org by "Jay Kreps (Created) (JIRA)" <ji...@apache.org> on 2012/02/01 01:00:01 UTC

[jira] [Created] (KAFKA-260) Add audit trail to kafka

Add audit trail to kafka
------------------------

                 Key: KAFKA-260
                 URL: https://issues.apache.org/jira/browse/KAFKA-260
             Project: Kafka
          Issue Type: New Feature
    Affects Versions: 0.8
            Reporter: Jay Kreps
            Assignee: Jay Kreps


LinkedIn has a system that does monitoring on top of our data flow to ensure all data is delivered to all consumers of data. This works by having each logical "tier" through which data passes produce messages to a central "audit-trail" topic; these messages give a time period and the number of messages that passed through that tier in that time period. Example of tiers for data might be "producer", "broker", "hadoop-etl", etc. This makes it possible to compare the total events for a given time period to ensure that all events that are produced are consumed by all consumers.

This turns out to be extremely useful. We also have an application that "balances the books" and checks that all data is consumed in a timely fashion. This gives graphs for each topic and shows any data loss and the lag at which the data is consumed (if any).

This would be an optional feature that would allow you to to this kind of reconciliation automatically for all the topics kafka hosts against all the tiers of applications that interact with the data.

Some details, the proposed format of the data is JSON using the following format for messages:

{
  "time":1301727060032,  // the timestamp at which this audit message is sent
  "topic": "my_topic_name", // the topic this audit data is for
  "tier":"producer", // a user-defined "tier" name
  "bucket_start": 1301726400000, // the beginning of the time bucket this data applies to
  "bucket_end": 1301727000000, // the end of the time bucket this data applies to
  "host":"my_host_name.datacenter.linkedin.com", // the server that this was sent from
  "datacenter":"hlx32", // the datacenter this occurred in
  "application":"newsfeed_service", // a user-defined application name
  "guid":"51656274-a86a-4dff-b824-8e8e20a6348f", // a unique identifier for this message
  "count":43634
}

DISCUSSION

Time is complex:
1. The audit data must be based on a timestamp in the events not the time on machine processing the event. Using this timestamp means that all downstream consumers will report audit data on the right time bucket. This means that there must be a timestamp in the event, which we don't currently require. Arguably we should just add a timestamp to the events, but I think it is sufficient for now just to allow the user to provide a function to extract the time from their events.
2. For counts to reconcile exactly we can only do analysis at a granularity based on the least common multiple of the bucket size used by all tiers. The simplest is just to configure them all to use the same bucket size. We currently use a bucket size of 10 mins, but anything from 1-60 mins is probably reasonable.

For analysis purposes one tier is designated as the source tier and we do reconciliation against this count (e.g. if another tier has less, that is treated as lost, if another tier has more that is duplication).

Note that this system makes false positives possible since you can lose an audit message. It also makes false negatives possible since if you lose both normal messages and the associated audit messages it will appear that everything adds up. The later problem is astronomically unlikely to happen exactly, though.

This would integrate into the client (producer and consumer both) in the following way:
1. The user provides a way to get timestamps from messages (required)
2. The user configures the tier name, host name, datacenter name, and application name as part of the consumer and producer config. We can provide reasonable defaults if not supplied (e.g. if it is a Producer then set tier to "producer" and get the hostname from the OS).

The application that processes this data is currently a Java Jetty app and talks to mysql. It feeds off the audit topic in kafka and runs both automatic monitoring checks and graphical displays of data against this. The data layer is not terribly scalable but because the audit data is sent only periodically this is enough to allow us to audit thousands of servers on very modest hardware, and having sql access makes diving into the data to trace problems to particular hosts easier.

LOGISTICS
I would recommend the following steps:
1. Add the audit application, the proposal would be to add a new top-level directory equivalent to core or perf called "audit" to house this application. At this point it would just be sitting there, not really being used.
2. Integrate these capabilities into the producer as part of the refactoring we are doing now
3. Integrate into consumer when possible



--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators: https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa
For more information on JIRA, see: http://www.atlassian.com/software/jira

        

[jira] [Commented] (KAFKA-260) Add audit trail to kafka

Posted by "Jonathan Creasy (JIRA)" <ji...@apache.org>.
    [ https://issues.apache.org/jira/browse/KAFKA-260?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13426293#comment-13426293 ] 

Jonathan Creasy commented on KAFKA-260:
---------------------------------------

I have the audit ui up and running in my dev environment as soon as I get a change to patch our producers I should be able to submit a couple of tweaks to this patch for 0.7.1 and 0.8. 
                
> Add audit trail to kafka
> ------------------------
>
>                 Key: KAFKA-260
>                 URL: https://issues.apache.org/jira/browse/KAFKA-260
>             Project: Kafka
>          Issue Type: New Feature
>    Affects Versions: 0.8
>            Reporter: Jay Kreps
>            Assignee: Jay Kreps
>         Attachments: Picture 18.png, kafka-audit-trail-draft.patch
>
>
> LinkedIn has a system that does monitoring on top of our data flow to ensure all data is delivered to all consumers of data. This works by having each logical "tier" through which data passes produce messages to a central "audit-trail" topic; these messages give a time period and the number of messages that passed through that tier in that time period. Example of tiers for data might be "producer", "broker", "hadoop-etl", etc. This makes it possible to compare the total events for a given time period to ensure that all events that are produced are consumed by all consumers.
> This turns out to be extremely useful. We also have an application that "balances the books" and checks that all data is consumed in a timely fashion. This gives graphs for each topic and shows any data loss and the lag at which the data is consumed (if any).
> This would be an optional feature that would allow you to to this kind of reconciliation automatically for all the topics kafka hosts against all the tiers of applications that interact with the data.
> Some details, the proposed format of the data is JSON using the following format for messages:
> {
>   "time":1301727060032,  // the timestamp at which this audit message is sent
>   "topic": "my_topic_name", // the topic this audit data is for
>   "tier":"producer", // a user-defined "tier" name
>   "bucket_start": 1301726400000, // the beginning of the time bucket this data applies to
>   "bucket_end": 1301727000000, // the end of the time bucket this data applies to
>   "host":"my_host_name.datacenter.linkedin.com", // the server that this was sent from
>   "datacenter":"hlx32", // the datacenter this occurred in
>   "application":"newsfeed_service", // a user-defined application name
>   "guid":"51656274-a86a-4dff-b824-8e8e20a6348f", // a unique identifier for this message
>   "count":43634
> }
> DISCUSSION
> Time is complex:
> 1. The audit data must be based on a timestamp in the events not the time on machine processing the event. Using this timestamp means that all downstream consumers will report audit data on the right time bucket. This means that there must be a timestamp in the event, which we don't currently require. Arguably we should just add a timestamp to the events, but I think it is sufficient for now just to allow the user to provide a function to extract the time from their events.
> 2. For counts to reconcile exactly we can only do analysis at a granularity based on the least common multiple of the bucket size used by all tiers. The simplest is just to configure them all to use the same bucket size. We currently use a bucket size of 10 mins, but anything from 1-60 mins is probably reasonable.
> For analysis purposes one tier is designated as the source tier and we do reconciliation against this count (e.g. if another tier has less, that is treated as lost, if another tier has more that is duplication).
> Note that this system makes false positives possible since you can lose an audit message. It also makes false negatives possible since if you lose both normal messages and the associated audit messages it will appear that everything adds up. The later problem is astronomically unlikely to happen exactly, though.
> This would integrate into the client (producer and consumer both) in the following way:
> 1. The user provides a way to get timestamps from messages (required)
> 2. The user configures the tier name, host name, datacenter name, and application name as part of the consumer and producer config. We can provide reasonable defaults if not supplied (e.g. if it is a Producer then set tier to "producer" and get the hostname from the OS).
> The application that processes this data is currently a Java Jetty app and talks to mysql. It feeds off the audit topic in kafka and runs both automatic monitoring checks and graphical displays of data against this. The data layer is not terribly scalable but because the audit data is sent only periodically this is enough to allow us to audit thousands of servers on very modest hardware, and having sql access makes diving into the data to trace problems to particular hosts easier.
> LOGISTICS
> I would recommend the following steps:
> 1. Add the audit application, the proposal would be to add a new top-level directory equivalent to core or perf called "audit" to house this application. At this point it would just be sitting there, not really being used.
> 2. Integrate these capabilities into the producer as part of the refactoring we are doing now
> 3. Integrate into consumer when possible

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators: https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa
For more information on JIRA, see: http://www.atlassian.com/software/jira

        

[jira] [Comment Edited] (KAFKA-260) Add audit trail to kafka

Posted by "Jonathan Creasy (JIRA)" <ji...@apache.org>.
    [ https://issues.apache.org/jira/browse/KAFKA-260?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13426293#comment-13426293 ] 

Jonathan Creasy edited comment on KAFKA-260 at 8/1/12 2:54 AM:
---------------------------------------------------------------

I have the audit ui up and running in my dev environment as soon as I get a change to patch our producers I should be able to submit a couple of tweaks to this patch for 0.7.1 and 0.8. 

If you have the producer code that generates the audit messages that would be pretty useful!
                
      was (Author: jcreasy):
    I have the audit ui up and running in my dev environment as soon as I get a change to patch our producers I should be able to submit a couple of tweaks to this patch for 0.7.1 and 0.8. 
                  
> Add audit trail to kafka
> ------------------------
>
>                 Key: KAFKA-260
>                 URL: https://issues.apache.org/jira/browse/KAFKA-260
>             Project: Kafka
>          Issue Type: New Feature
>    Affects Versions: 0.8
>            Reporter: Jay Kreps
>            Assignee: Jay Kreps
>         Attachments: Picture 18.png, kafka-audit-trail-draft.patch
>
>
> LinkedIn has a system that does monitoring on top of our data flow to ensure all data is delivered to all consumers of data. This works by having each logical "tier" through which data passes produce messages to a central "audit-trail" topic; these messages give a time period and the number of messages that passed through that tier in that time period. Example of tiers for data might be "producer", "broker", "hadoop-etl", etc. This makes it possible to compare the total events for a given time period to ensure that all events that are produced are consumed by all consumers.
> This turns out to be extremely useful. We also have an application that "balances the books" and checks that all data is consumed in a timely fashion. This gives graphs for each topic and shows any data loss and the lag at which the data is consumed (if any).
> This would be an optional feature that would allow you to to this kind of reconciliation automatically for all the topics kafka hosts against all the tiers of applications that interact with the data.
> Some details, the proposed format of the data is JSON using the following format for messages:
> {
>   "time":1301727060032,  // the timestamp at which this audit message is sent
>   "topic": "my_topic_name", // the topic this audit data is for
>   "tier":"producer", // a user-defined "tier" name
>   "bucket_start": 1301726400000, // the beginning of the time bucket this data applies to
>   "bucket_end": 1301727000000, // the end of the time bucket this data applies to
>   "host":"my_host_name.datacenter.linkedin.com", // the server that this was sent from
>   "datacenter":"hlx32", // the datacenter this occurred in
>   "application":"newsfeed_service", // a user-defined application name
>   "guid":"51656274-a86a-4dff-b824-8e8e20a6348f", // a unique identifier for this message
>   "count":43634
> }
> DISCUSSION
> Time is complex:
> 1. The audit data must be based on a timestamp in the events not the time on machine processing the event. Using this timestamp means that all downstream consumers will report audit data on the right time bucket. This means that there must be a timestamp in the event, which we don't currently require. Arguably we should just add a timestamp to the events, but I think it is sufficient for now just to allow the user to provide a function to extract the time from their events.
> 2. For counts to reconcile exactly we can only do analysis at a granularity based on the least common multiple of the bucket size used by all tiers. The simplest is just to configure them all to use the same bucket size. We currently use a bucket size of 10 mins, but anything from 1-60 mins is probably reasonable.
> For analysis purposes one tier is designated as the source tier and we do reconciliation against this count (e.g. if another tier has less, that is treated as lost, if another tier has more that is duplication).
> Note that this system makes false positives possible since you can lose an audit message. It also makes false negatives possible since if you lose both normal messages and the associated audit messages it will appear that everything adds up. The later problem is astronomically unlikely to happen exactly, though.
> This would integrate into the client (producer and consumer both) in the following way:
> 1. The user provides a way to get timestamps from messages (required)
> 2. The user configures the tier name, host name, datacenter name, and application name as part of the consumer and producer config. We can provide reasonable defaults if not supplied (e.g. if it is a Producer then set tier to "producer" and get the hostname from the OS).
> The application that processes this data is currently a Java Jetty app and talks to mysql. It feeds off the audit topic in kafka and runs both automatic monitoring checks and graphical displays of data against this. The data layer is not terribly scalable but because the audit data is sent only periodically this is enough to allow us to audit thousands of servers on very modest hardware, and having sql access makes diving into the data to trace problems to particular hosts easier.
> LOGISTICS
> I would recommend the following steps:
> 1. Add the audit application, the proposal would be to add a new top-level directory equivalent to core or perf called "audit" to house this application. At this point it would just be sitting there, not really being used.
> 2. Integrate these capabilities into the producer as part of the refactoring we are doing now
> 3. Integrate into consumer when possible

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators: https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa
For more information on JIRA, see: http://www.atlassian.com/software/jira

        

[jira] [Commented] (KAFKA-260) Add audit trail to kafka

Posted by "Guy Doulberg (Commented) (JIRA)" <ji...@apache.org>.
    [ https://issues.apache.org/jira/browse/KAFKA-260?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13227056#comment-13227056 ] 

Guy Doulberg commented on KAFKA-260:
------------------------------------

Hi Jay,

I tried to apply this patch in my dev envoirnment, 

To which versiob should apply it?

I tried it on  kafka-0.7.0-incubating.

If it should work on that version, I couldn't see it on action (It did compile), so maybe I don't get the configuration, can you elaborate on how to configure the producer tier to send AuditData?

Thanks.



                
> Add audit trail to kafka
> ------------------------
>
>                 Key: KAFKA-260
>                 URL: https://issues.apache.org/jira/browse/KAFKA-260
>             Project: Kafka
>          Issue Type: New Feature
>    Affects Versions: 0.8
>            Reporter: Jay Kreps
>            Assignee: Jay Kreps
>         Attachments: Picture 18.png, kafka-audit-trail-draft.patch
>
>
> LinkedIn has a system that does monitoring on top of our data flow to ensure all data is delivered to all consumers of data. This works by having each logical "tier" through which data passes produce messages to a central "audit-trail" topic; these messages give a time period and the number of messages that passed through that tier in that time period. Example of tiers for data might be "producer", "broker", "hadoop-etl", etc. This makes it possible to compare the total events for a given time period to ensure that all events that are produced are consumed by all consumers.
> This turns out to be extremely useful. We also have an application that "balances the books" and checks that all data is consumed in a timely fashion. This gives graphs for each topic and shows any data loss and the lag at which the data is consumed (if any).
> This would be an optional feature that would allow you to to this kind of reconciliation automatically for all the topics kafka hosts against all the tiers of applications that interact with the data.
> Some details, the proposed format of the data is JSON using the following format for messages:
> {
>   "time":1301727060032,  // the timestamp at which this audit message is sent
>   "topic": "my_topic_name", // the topic this audit data is for
>   "tier":"producer", // a user-defined "tier" name
>   "bucket_start": 1301726400000, // the beginning of the time bucket this data applies to
>   "bucket_end": 1301727000000, // the end of the time bucket this data applies to
>   "host":"my_host_name.datacenter.linkedin.com", // the server that this was sent from
>   "datacenter":"hlx32", // the datacenter this occurred in
>   "application":"newsfeed_service", // a user-defined application name
>   "guid":"51656274-a86a-4dff-b824-8e8e20a6348f", // a unique identifier for this message
>   "count":43634
> }
> DISCUSSION
> Time is complex:
> 1. The audit data must be based on a timestamp in the events not the time on machine processing the event. Using this timestamp means that all downstream consumers will report audit data on the right time bucket. This means that there must be a timestamp in the event, which we don't currently require. Arguably we should just add a timestamp to the events, but I think it is sufficient for now just to allow the user to provide a function to extract the time from their events.
> 2. For counts to reconcile exactly we can only do analysis at a granularity based on the least common multiple of the bucket size used by all tiers. The simplest is just to configure them all to use the same bucket size. We currently use a bucket size of 10 mins, but anything from 1-60 mins is probably reasonable.
> For analysis purposes one tier is designated as the source tier and we do reconciliation against this count (e.g. if another tier has less, that is treated as lost, if another tier has more that is duplication).
> Note that this system makes false positives possible since you can lose an audit message. It also makes false negatives possible since if you lose both normal messages and the associated audit messages it will appear that everything adds up. The later problem is astronomically unlikely to happen exactly, though.
> This would integrate into the client (producer and consumer both) in the following way:
> 1. The user provides a way to get timestamps from messages (required)
> 2. The user configures the tier name, host name, datacenter name, and application name as part of the consumer and producer config. We can provide reasonable defaults if not supplied (e.g. if it is a Producer then set tier to "producer" and get the hostname from the OS).
> The application that processes this data is currently a Java Jetty app and talks to mysql. It feeds off the audit topic in kafka and runs both automatic monitoring checks and graphical displays of data against this. The data layer is not terribly scalable but because the audit data is sent only periodically this is enough to allow us to audit thousands of servers on very modest hardware, and having sql access makes diving into the data to trace problems to particular hosts easier.
> LOGISTICS
> I would recommend the following steps:
> 1. Add the audit application, the proposal would be to add a new top-level directory equivalent to core or perf called "audit" to house this application. At this point it would just be sitting there, not really being used.
> 2. Integrate these capabilities into the producer as part of the refactoring we are doing now
> 3. Integrate into consumer when possible

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators: https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa
For more information on JIRA, see: http://www.atlassian.com/software/jira

        

[jira] [Commented] (KAFKA-260) Add audit trail to kafka

Posted by "Jay Kreps (JIRA)" <ji...@apache.org>.
    [ https://issues.apache.org/jira/browse/KAFKA-260?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13426765#comment-13426765 ] 

Jay Kreps commented on KAFKA-260:
---------------------------------

We don't have a producer and consumer that emit the audit data that is open source, that logic currently resides in a linkedin-specific wrapper class. We would be interested in fully integrating this with the open source kafka.

One key question is how to get the timestamp used for auditing. Currently we rely on a special field in the message to get the timestamp. To kafka, of course, messages are just opaque byte[], so integrating this is a little challenging. For our usage we just made this a required field for our avro records. Three options for integration:
1. Support auditing only for messages which contain timestamps. Make the user provide a function for extracting the timestamp from the message if they want to use the audit app.
2. Add a special field to all messages that contains the timestamp that comes from message creation time. The downside of this is that it requires a change to the message format and this field might not be useful for everyone.
3. Add a generic key-value style header, and store the timestamp in that. The downside of having a generic header is that you have to store the key too.

I would probably vote for 2. I think timestamp and auditing is useful enough to argue for making it a top-level field.

We could also implement (1) as a first phase, and then later chose to add the timestamp to making auditing automatic. That might be a better approach to get stuff started easily.
                
> Add audit trail to kafka
> ------------------------
>
>                 Key: KAFKA-260
>                 URL: https://issues.apache.org/jira/browse/KAFKA-260
>             Project: Kafka
>          Issue Type: New Feature
>    Affects Versions: 0.8
>            Reporter: Jay Kreps
>            Assignee: Jay Kreps
>         Attachments: Picture 18.png, kafka-audit-trail-draft.patch
>
>
> LinkedIn has a system that does monitoring on top of our data flow to ensure all data is delivered to all consumers of data. This works by having each logical "tier" through which data passes produce messages to a central "audit-trail" topic; these messages give a time period and the number of messages that passed through that tier in that time period. Example of tiers for data might be "producer", "broker", "hadoop-etl", etc. This makes it possible to compare the total events for a given time period to ensure that all events that are produced are consumed by all consumers.
> This turns out to be extremely useful. We also have an application that "balances the books" and checks that all data is consumed in a timely fashion. This gives graphs for each topic and shows any data loss and the lag at which the data is consumed (if any).
> This would be an optional feature that would allow you to to this kind of reconciliation automatically for all the topics kafka hosts against all the tiers of applications that interact with the data.
> Some details, the proposed format of the data is JSON using the following format for messages:
> {
>   "time":1301727060032,  // the timestamp at which this audit message is sent
>   "topic": "my_topic_name", // the topic this audit data is for
>   "tier":"producer", // a user-defined "tier" name
>   "bucket_start": 1301726400000, // the beginning of the time bucket this data applies to
>   "bucket_end": 1301727000000, // the end of the time bucket this data applies to
>   "host":"my_host_name.datacenter.linkedin.com", // the server that this was sent from
>   "datacenter":"hlx32", // the datacenter this occurred in
>   "application":"newsfeed_service", // a user-defined application name
>   "guid":"51656274-a86a-4dff-b824-8e8e20a6348f", // a unique identifier for this message
>   "count":43634
> }
> DISCUSSION
> Time is complex:
> 1. The audit data must be based on a timestamp in the events not the time on machine processing the event. Using this timestamp means that all downstream consumers will report audit data on the right time bucket. This means that there must be a timestamp in the event, which we don't currently require. Arguably we should just add a timestamp to the events, but I think it is sufficient for now just to allow the user to provide a function to extract the time from their events.
> 2. For counts to reconcile exactly we can only do analysis at a granularity based on the least common multiple of the bucket size used by all tiers. The simplest is just to configure them all to use the same bucket size. We currently use a bucket size of 10 mins, but anything from 1-60 mins is probably reasonable.
> For analysis purposes one tier is designated as the source tier and we do reconciliation against this count (e.g. if another tier has less, that is treated as lost, if another tier has more that is duplication).
> Note that this system makes false positives possible since you can lose an audit message. It also makes false negatives possible since if you lose both normal messages and the associated audit messages it will appear that everything adds up. The later problem is astronomically unlikely to happen exactly, though.
> This would integrate into the client (producer and consumer both) in the following way:
> 1. The user provides a way to get timestamps from messages (required)
> 2. The user configures the tier name, host name, datacenter name, and application name as part of the consumer and producer config. We can provide reasonable defaults if not supplied (e.g. if it is a Producer then set tier to "producer" and get the hostname from the OS).
> The application that processes this data is currently a Java Jetty app and talks to mysql. It feeds off the audit topic in kafka and runs both automatic monitoring checks and graphical displays of data against this. The data layer is not terribly scalable but because the audit data is sent only periodically this is enough to allow us to audit thousands of servers on very modest hardware, and having sql access makes diving into the data to trace problems to particular hosts easier.
> LOGISTICS
> I would recommend the following steps:
> 1. Add the audit application, the proposal would be to add a new top-level directory equivalent to core or perf called "audit" to house this application. At this point it would just be sitting there, not really being used.
> 2. Integrate these capabilities into the producer as part of the refactoring we are doing now
> 3. Integrate into consumer when possible

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators: https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa
For more information on JIRA, see: http://www.atlassian.com/software/jira

        

[jira] [Commented] (KAFKA-260) Add audit trail to kafka

Posted by "Jay Kreps (Commented) (JIRA)" <ji...@apache.org>.
    [ https://issues.apache.org/jira/browse/KAFKA-260?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13197963#comment-13197963 ] 

Jay Kreps commented on KAFKA-260:
---------------------------------

Good point. No, in the sense that I am basically just describing two independent trains of thought, but you are absolutely right that there is no reason to have an application_id and a client_id. So let's pretend that that was the plan all along :-)
                
> Add audit trail to kafka
> ------------------------
>
>                 Key: KAFKA-260
>                 URL: https://issues.apache.org/jira/browse/KAFKA-260
>             Project: Kafka
>          Issue Type: New Feature
>    Affects Versions: 0.8
>            Reporter: Jay Kreps
>            Assignee: Jay Kreps
>
> LinkedIn has a system that does monitoring on top of our data flow to ensure all data is delivered to all consumers of data. This works by having each logical "tier" through which data passes produce messages to a central "audit-trail" topic; these messages give a time period and the number of messages that passed through that tier in that time period. Example of tiers for data might be "producer", "broker", "hadoop-etl", etc. This makes it possible to compare the total events for a given time period to ensure that all events that are produced are consumed by all consumers.
> This turns out to be extremely useful. We also have an application that "balances the books" and checks that all data is consumed in a timely fashion. This gives graphs for each topic and shows any data loss and the lag at which the data is consumed (if any).
> This would be an optional feature that would allow you to to this kind of reconciliation automatically for all the topics kafka hosts against all the tiers of applications that interact with the data.
> Some details, the proposed format of the data is JSON using the following format for messages:
> {
>   "time":1301727060032,  // the timestamp at which this audit message is sent
>   "topic": "my_topic_name", // the topic this audit data is for
>   "tier":"producer", // a user-defined "tier" name
>   "bucket_start": 1301726400000, // the beginning of the time bucket this data applies to
>   "bucket_end": 1301727000000, // the end of the time bucket this data applies to
>   "host":"my_host_name.datacenter.linkedin.com", // the server that this was sent from
>   "datacenter":"hlx32", // the datacenter this occurred in
>   "application":"newsfeed_service", // a user-defined application name
>   "guid":"51656274-a86a-4dff-b824-8e8e20a6348f", // a unique identifier for this message
>   "count":43634
> }
> DISCUSSION
> Time is complex:
> 1. The audit data must be based on a timestamp in the events not the time on machine processing the event. Using this timestamp means that all downstream consumers will report audit data on the right time bucket. This means that there must be a timestamp in the event, which we don't currently require. Arguably we should just add a timestamp to the events, but I think it is sufficient for now just to allow the user to provide a function to extract the time from their events.
> 2. For counts to reconcile exactly we can only do analysis at a granularity based on the least common multiple of the bucket size used by all tiers. The simplest is just to configure them all to use the same bucket size. We currently use a bucket size of 10 mins, but anything from 1-60 mins is probably reasonable.
> For analysis purposes one tier is designated as the source tier and we do reconciliation against this count (e.g. if another tier has less, that is treated as lost, if another tier has more that is duplication).
> Note that this system makes false positives possible since you can lose an audit message. It also makes false negatives possible since if you lose both normal messages and the associated audit messages it will appear that everything adds up. The later problem is astronomically unlikely to happen exactly, though.
> This would integrate into the client (producer and consumer both) in the following way:
> 1. The user provides a way to get timestamps from messages (required)
> 2. The user configures the tier name, host name, datacenter name, and application name as part of the consumer and producer config. We can provide reasonable defaults if not supplied (e.g. if it is a Producer then set tier to "producer" and get the hostname from the OS).
> The application that processes this data is currently a Java Jetty app and talks to mysql. It feeds off the audit topic in kafka and runs both automatic monitoring checks and graphical displays of data against this. The data layer is not terribly scalable but because the audit data is sent only periodically this is enough to allow us to audit thousands of servers on very modest hardware, and having sql access makes diving into the data to trace problems to particular hosts easier.
> LOGISTICS
> I would recommend the following steps:
> 1. Add the audit application, the proposal would be to add a new top-level directory equivalent to core or perf called "audit" to house this application. At this point it would just be sitting there, not really being used.
> 2. Integrate these capabilities into the producer as part of the refactoring we are doing now
> 3. Integrate into consumer when possible

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators: https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa
For more information on JIRA, see: http://www.atlassian.com/software/jira

        

[jira] [Updated] (KAFKA-260) Add audit trail to kafka

Posted by "Jay Kreps (Updated) (JIRA)" <ji...@apache.org>.
     [ https://issues.apache.org/jira/browse/KAFKA-260?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Jay Kreps updated KAFKA-260:
----------------------------

    Attachment: Picture 18.png

Attached screenshot of audit app. This shows the aggregate data over a given time range, and graphs any dependencies as well as estimating the lag for loading the data (off the page on the screenshot).
                
> Add audit trail to kafka
> ------------------------
>
>                 Key: KAFKA-260
>                 URL: https://issues.apache.org/jira/browse/KAFKA-260
>             Project: Kafka
>          Issue Type: New Feature
>    Affects Versions: 0.8
>            Reporter: Jay Kreps
>            Assignee: Jay Kreps
>         Attachments: Picture 18.png
>
>
> LinkedIn has a system that does monitoring on top of our data flow to ensure all data is delivered to all consumers of data. This works by having each logical "tier" through which data passes produce messages to a central "audit-trail" topic; these messages give a time period and the number of messages that passed through that tier in that time period. Example of tiers for data might be "producer", "broker", "hadoop-etl", etc. This makes it possible to compare the total events for a given time period to ensure that all events that are produced are consumed by all consumers.
> This turns out to be extremely useful. We also have an application that "balances the books" and checks that all data is consumed in a timely fashion. This gives graphs for each topic and shows any data loss and the lag at which the data is consumed (if any).
> This would be an optional feature that would allow you to to this kind of reconciliation automatically for all the topics kafka hosts against all the tiers of applications that interact with the data.
> Some details, the proposed format of the data is JSON using the following format for messages:
> {
>   "time":1301727060032,  // the timestamp at which this audit message is sent
>   "topic": "my_topic_name", // the topic this audit data is for
>   "tier":"producer", // a user-defined "tier" name
>   "bucket_start": 1301726400000, // the beginning of the time bucket this data applies to
>   "bucket_end": 1301727000000, // the end of the time bucket this data applies to
>   "host":"my_host_name.datacenter.linkedin.com", // the server that this was sent from
>   "datacenter":"hlx32", // the datacenter this occurred in
>   "application":"newsfeed_service", // a user-defined application name
>   "guid":"51656274-a86a-4dff-b824-8e8e20a6348f", // a unique identifier for this message
>   "count":43634
> }
> DISCUSSION
> Time is complex:
> 1. The audit data must be based on a timestamp in the events not the time on machine processing the event. Using this timestamp means that all downstream consumers will report audit data on the right time bucket. This means that there must be a timestamp in the event, which we don't currently require. Arguably we should just add a timestamp to the events, but I think it is sufficient for now just to allow the user to provide a function to extract the time from their events.
> 2. For counts to reconcile exactly we can only do analysis at a granularity based on the least common multiple of the bucket size used by all tiers. The simplest is just to configure them all to use the same bucket size. We currently use a bucket size of 10 mins, but anything from 1-60 mins is probably reasonable.
> For analysis purposes one tier is designated as the source tier and we do reconciliation against this count (e.g. if another tier has less, that is treated as lost, if another tier has more that is duplication).
> Note that this system makes false positives possible since you can lose an audit message. It also makes false negatives possible since if you lose both normal messages and the associated audit messages it will appear that everything adds up. The later problem is astronomically unlikely to happen exactly, though.
> This would integrate into the client (producer and consumer both) in the following way:
> 1. The user provides a way to get timestamps from messages (required)
> 2. The user configures the tier name, host name, datacenter name, and application name as part of the consumer and producer config. We can provide reasonable defaults if not supplied (e.g. if it is a Producer then set tier to "producer" and get the hostname from the OS).
> The application that processes this data is currently a Java Jetty app and talks to mysql. It feeds off the audit topic in kafka and runs both automatic monitoring checks and graphical displays of data against this. The data layer is not terribly scalable but because the audit data is sent only periodically this is enough to allow us to audit thousands of servers on very modest hardware, and having sql access makes diving into the data to trace problems to particular hosts easier.
> LOGISTICS
> I would recommend the following steps:
> 1. Add the audit application, the proposal would be to add a new top-level directory equivalent to core or perf called "audit" to house this application. At this point it would just be sitting there, not really being used.
> 2. Integrate these capabilities into the producer as part of the refactoring we are doing now
> 3. Integrate into consumer when possible

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators: https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa
For more information on JIRA, see: http://www.atlassian.com/software/jira

        

[jira] [Commented] (KAFKA-260) Add audit trail to kafka

Posted by "Lorenz Knies (JIRA)" <ji...@apache.org>.
    [ https://issues.apache.org/jira/browse/KAFKA-260?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13504487#comment-13504487 ] 

Lorenz Knies commented on KAFKA-260:
------------------------------------

Hi,

i like option 2. I wrote a generic consumer that stores the messages of a topic in small batches in Amazon S3. Everything was really simple (and generic) until i added partitioning based on a timestamp. From my experience in most cases messages/events have a creation time. I would imagine, that there are other 'high level' clients, that could make use of a (producer provided) timestamp and could be implemented much simpler, if they would not have to deal with some sort of timestamp extraction api.

lorenz 
                
> Add audit trail to kafka
> ------------------------
>
>                 Key: KAFKA-260
>                 URL: https://issues.apache.org/jira/browse/KAFKA-260
>             Project: Kafka
>          Issue Type: New Feature
>    Affects Versions: 0.8
>            Reporter: Jay Kreps
>            Assignee: Jay Kreps
>         Attachments: kafka-audit-trail-draft.patch, Picture 18.png
>
>
> LinkedIn has a system that does monitoring on top of our data flow to ensure all data is delivered to all consumers of data. This works by having each logical "tier" through which data passes produce messages to a central "audit-trail" topic; these messages give a time period and the number of messages that passed through that tier in that time period. Example of tiers for data might be "producer", "broker", "hadoop-etl", etc. This makes it possible to compare the total events for a given time period to ensure that all events that are produced are consumed by all consumers.
> This turns out to be extremely useful. We also have an application that "balances the books" and checks that all data is consumed in a timely fashion. This gives graphs for each topic and shows any data loss and the lag at which the data is consumed (if any).
> This would be an optional feature that would allow you to to this kind of reconciliation automatically for all the topics kafka hosts against all the tiers of applications that interact with the data.
> Some details, the proposed format of the data is JSON using the following format for messages:
> {
>   "time":1301727060032,  // the timestamp at which this audit message is sent
>   "topic": "my_topic_name", // the topic this audit data is for
>   "tier":"producer", // a user-defined "tier" name
>   "bucket_start": 1301726400000, // the beginning of the time bucket this data applies to
>   "bucket_end": 1301727000000, // the end of the time bucket this data applies to
>   "host":"my_host_name.datacenter.linkedin.com", // the server that this was sent from
>   "datacenter":"hlx32", // the datacenter this occurred in
>   "application":"newsfeed_service", // a user-defined application name
>   "guid":"51656274-a86a-4dff-b824-8e8e20a6348f", // a unique identifier for this message
>   "count":43634
> }
> DISCUSSION
> Time is complex:
> 1. The audit data must be based on a timestamp in the events not the time on machine processing the event. Using this timestamp means that all downstream consumers will report audit data on the right time bucket. This means that there must be a timestamp in the event, which we don't currently require. Arguably we should just add a timestamp to the events, but I think it is sufficient for now just to allow the user to provide a function to extract the time from their events.
> 2. For counts to reconcile exactly we can only do analysis at a granularity based on the least common multiple of the bucket size used by all tiers. The simplest is just to configure them all to use the same bucket size. We currently use a bucket size of 10 mins, but anything from 1-60 mins is probably reasonable.
> For analysis purposes one tier is designated as the source tier and we do reconciliation against this count (e.g. if another tier has less, that is treated as lost, if another tier has more that is duplication).
> Note that this system makes false positives possible since you can lose an audit message. It also makes false negatives possible since if you lose both normal messages and the associated audit messages it will appear that everything adds up. The later problem is astronomically unlikely to happen exactly, though.
> This would integrate into the client (producer and consumer both) in the following way:
> 1. The user provides a way to get timestamps from messages (required)
> 2. The user configures the tier name, host name, datacenter name, and application name as part of the consumer and producer config. We can provide reasonable defaults if not supplied (e.g. if it is a Producer then set tier to "producer" and get the hostname from the OS).
> The application that processes this data is currently a Java Jetty app and talks to mysql. It feeds off the audit topic in kafka and runs both automatic monitoring checks and graphical displays of data against this. The data layer is not terribly scalable but because the audit data is sent only periodically this is enough to allow us to audit thousands of servers on very modest hardware, and having sql access makes diving into the data to trace problems to particular hosts easier.
> LOGISTICS
> I would recommend the following steps:
> 1. Add the audit application, the proposal would be to add a new top-level directory equivalent to core or perf called "audit" to house this application. At this point it would just be sitting there, not really being used.
> 2. Integrate these capabilities into the producer as part of the refactoring we are doing now
> 3. Integrate into consumer when possible

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira

[jira] [Commented] (KAFKA-260) Add audit trail to kafka

Posted by "Joe Stein (Commented) (JIRA)" <ji...@apache.org>.
    [ https://issues.apache.org/jira/browse/KAFKA-260?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13197775#comment-13197775 ] 

Joe Stein commented on KAFKA-260:
---------------------------------

is "application" synonymous with the new wired format "client_id" is so we should standardize on one or the other. +1 on the feature.
                
> Add audit trail to kafka
> ------------------------
>
>                 Key: KAFKA-260
>                 URL: https://issues.apache.org/jira/browse/KAFKA-260
>             Project: Kafka
>          Issue Type: New Feature
>    Affects Versions: 0.8
>            Reporter: Jay Kreps
>            Assignee: Jay Kreps
>
> LinkedIn has a system that does monitoring on top of our data flow to ensure all data is delivered to all consumers of data. This works by having each logical "tier" through which data passes produce messages to a central "audit-trail" topic; these messages give a time period and the number of messages that passed through that tier in that time period. Example of tiers for data might be "producer", "broker", "hadoop-etl", etc. This makes it possible to compare the total events for a given time period to ensure that all events that are produced are consumed by all consumers.
> This turns out to be extremely useful. We also have an application that "balances the books" and checks that all data is consumed in a timely fashion. This gives graphs for each topic and shows any data loss and the lag at which the data is consumed (if any).
> This would be an optional feature that would allow you to to this kind of reconciliation automatically for all the topics kafka hosts against all the tiers of applications that interact with the data.
> Some details, the proposed format of the data is JSON using the following format for messages:
> {
>   "time":1301727060032,  // the timestamp at which this audit message is sent
>   "topic": "my_topic_name", // the topic this audit data is for
>   "tier":"producer", // a user-defined "tier" name
>   "bucket_start": 1301726400000, // the beginning of the time bucket this data applies to
>   "bucket_end": 1301727000000, // the end of the time bucket this data applies to
>   "host":"my_host_name.datacenter.linkedin.com", // the server that this was sent from
>   "datacenter":"hlx32", // the datacenter this occurred in
>   "application":"newsfeed_service", // a user-defined application name
>   "guid":"51656274-a86a-4dff-b824-8e8e20a6348f", // a unique identifier for this message
>   "count":43634
> }
> DISCUSSION
> Time is complex:
> 1. The audit data must be based on a timestamp in the events not the time on machine processing the event. Using this timestamp means that all downstream consumers will report audit data on the right time bucket. This means that there must be a timestamp in the event, which we don't currently require. Arguably we should just add a timestamp to the events, but I think it is sufficient for now just to allow the user to provide a function to extract the time from their events.
> 2. For counts to reconcile exactly we can only do analysis at a granularity based on the least common multiple of the bucket size used by all tiers. The simplest is just to configure them all to use the same bucket size. We currently use a bucket size of 10 mins, but anything from 1-60 mins is probably reasonable.
> For analysis purposes one tier is designated as the source tier and we do reconciliation against this count (e.g. if another tier has less, that is treated as lost, if another tier has more that is duplication).
> Note that this system makes false positives possible since you can lose an audit message. It also makes false negatives possible since if you lose both normal messages and the associated audit messages it will appear that everything adds up. The later problem is astronomically unlikely to happen exactly, though.
> This would integrate into the client (producer and consumer both) in the following way:
> 1. The user provides a way to get timestamps from messages (required)
> 2. The user configures the tier name, host name, datacenter name, and application name as part of the consumer and producer config. We can provide reasonable defaults if not supplied (e.g. if it is a Producer then set tier to "producer" and get the hostname from the OS).
> The application that processes this data is currently a Java Jetty app and talks to mysql. It feeds off the audit topic in kafka and runs both automatic monitoring checks and graphical displays of data against this. The data layer is not terribly scalable but because the audit data is sent only periodically this is enough to allow us to audit thousands of servers on very modest hardware, and having sql access makes diving into the data to trace problems to particular hosts easier.
> LOGISTICS
> I would recommend the following steps:
> 1. Add the audit application, the proposal would be to add a new top-level directory equivalent to core or perf called "audit" to house this application. At this point it would just be sitting there, not really being used.
> 2. Integrate these capabilities into the producer as part of the refactoring we are doing now
> 3. Integrate into consumer when possible

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators: https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa
For more information on JIRA, see: http://www.atlassian.com/software/jira

        

[jira] [Updated] (KAFKA-260) Add audit trail to kafka

Posted by "Jay Kreps (Updated) (JIRA)" <ji...@apache.org>.
     [ https://issues.apache.org/jira/browse/KAFKA-260?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Jay Kreps updated KAFKA-260:
----------------------------

    Attachment: kafka-audit-trail-draft.patch

Draft patch for adding audit trail app to kafka.
                
> Add audit trail to kafka
> ------------------------
>
>                 Key: KAFKA-260
>                 URL: https://issues.apache.org/jira/browse/KAFKA-260
>             Project: Kafka
>          Issue Type: New Feature
>    Affects Versions: 0.8
>            Reporter: Jay Kreps
>            Assignee: Jay Kreps
>         Attachments: Picture 18.png, kafka-audit-trail-draft.patch
>
>
> LinkedIn has a system that does monitoring on top of our data flow to ensure all data is delivered to all consumers of data. This works by having each logical "tier" through which data passes produce messages to a central "audit-trail" topic; these messages give a time period and the number of messages that passed through that tier in that time period. Example of tiers for data might be "producer", "broker", "hadoop-etl", etc. This makes it possible to compare the total events for a given time period to ensure that all events that are produced are consumed by all consumers.
> This turns out to be extremely useful. We also have an application that "balances the books" and checks that all data is consumed in a timely fashion. This gives graphs for each topic and shows any data loss and the lag at which the data is consumed (if any).
> This would be an optional feature that would allow you to to this kind of reconciliation automatically for all the topics kafka hosts against all the tiers of applications that interact with the data.
> Some details, the proposed format of the data is JSON using the following format for messages:
> {
>   "time":1301727060032,  // the timestamp at which this audit message is sent
>   "topic": "my_topic_name", // the topic this audit data is for
>   "tier":"producer", // a user-defined "tier" name
>   "bucket_start": 1301726400000, // the beginning of the time bucket this data applies to
>   "bucket_end": 1301727000000, // the end of the time bucket this data applies to
>   "host":"my_host_name.datacenter.linkedin.com", // the server that this was sent from
>   "datacenter":"hlx32", // the datacenter this occurred in
>   "application":"newsfeed_service", // a user-defined application name
>   "guid":"51656274-a86a-4dff-b824-8e8e20a6348f", // a unique identifier for this message
>   "count":43634
> }
> DISCUSSION
> Time is complex:
> 1. The audit data must be based on a timestamp in the events not the time on machine processing the event. Using this timestamp means that all downstream consumers will report audit data on the right time bucket. This means that there must be a timestamp in the event, which we don't currently require. Arguably we should just add a timestamp to the events, but I think it is sufficient for now just to allow the user to provide a function to extract the time from their events.
> 2. For counts to reconcile exactly we can only do analysis at a granularity based on the least common multiple of the bucket size used by all tiers. The simplest is just to configure them all to use the same bucket size. We currently use a bucket size of 10 mins, but anything from 1-60 mins is probably reasonable.
> For analysis purposes one tier is designated as the source tier and we do reconciliation against this count (e.g. if another tier has less, that is treated as lost, if another tier has more that is duplication).
> Note that this system makes false positives possible since you can lose an audit message. It also makes false negatives possible since if you lose both normal messages and the associated audit messages it will appear that everything adds up. The later problem is astronomically unlikely to happen exactly, though.
> This would integrate into the client (producer and consumer both) in the following way:
> 1. The user provides a way to get timestamps from messages (required)
> 2. The user configures the tier name, host name, datacenter name, and application name as part of the consumer and producer config. We can provide reasonable defaults if not supplied (e.g. if it is a Producer then set tier to "producer" and get the hostname from the OS).
> The application that processes this data is currently a Java Jetty app and talks to mysql. It feeds off the audit topic in kafka and runs both automatic monitoring checks and graphical displays of data against this. The data layer is not terribly scalable but because the audit data is sent only periodically this is enough to allow us to audit thousands of servers on very modest hardware, and having sql access makes diving into the data to trace problems to particular hosts easier.
> LOGISTICS
> I would recommend the following steps:
> 1. Add the audit application, the proposal would be to add a new top-level directory equivalent to core or perf called "audit" to house this application. At this point it would just be sitting there, not really being used.
> 2. Integrate these capabilities into the producer as part of the refactoring we are doing now
> 3. Integrate into consumer when possible

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators: https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa
For more information on JIRA, see: http://www.atlassian.com/software/jira

        

[jira] [Comment Edited] (KAFKA-260) Add audit trail to kafka

Posted by "Jonathan Creasy (JIRA)" <ji...@apache.org>.
    [ https://issues.apache.org/jira/browse/KAFKA-260?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13426293#comment-13426293 ] 

Jonathan Creasy edited comment on KAFKA-260 at 8/1/12 2:58 AM:
---------------------------------------------------------------

I have the audit ui up and running in my dev environment as soon as I get a chance to patch our producers I should be able to submit a couple of tweaks to this patch for 0.7.1 and 0.8. 

If you have the producer code that generates the audit messages that would be pretty useful!
                
      was (Author: jcreasy):
    I have the audit ui up and running in my dev environment as soon as I get a change to patch our producers I should be able to submit a couple of tweaks to this patch for 0.7.1 and 0.8. 

If you have the producer code that generates the audit messages that would be pretty useful!
                  
> Add audit trail to kafka
> ------------------------
>
>                 Key: KAFKA-260
>                 URL: https://issues.apache.org/jira/browse/KAFKA-260
>             Project: Kafka
>          Issue Type: New Feature
>    Affects Versions: 0.8
>            Reporter: Jay Kreps
>            Assignee: Jay Kreps
>         Attachments: Picture 18.png, kafka-audit-trail-draft.patch
>
>
> LinkedIn has a system that does monitoring on top of our data flow to ensure all data is delivered to all consumers of data. This works by having each logical "tier" through which data passes produce messages to a central "audit-trail" topic; these messages give a time period and the number of messages that passed through that tier in that time period. Example of tiers for data might be "producer", "broker", "hadoop-etl", etc. This makes it possible to compare the total events for a given time period to ensure that all events that are produced are consumed by all consumers.
> This turns out to be extremely useful. We also have an application that "balances the books" and checks that all data is consumed in a timely fashion. This gives graphs for each topic and shows any data loss and the lag at which the data is consumed (if any).
> This would be an optional feature that would allow you to to this kind of reconciliation automatically for all the topics kafka hosts against all the tiers of applications that interact with the data.
> Some details, the proposed format of the data is JSON using the following format for messages:
> {
>   "time":1301727060032,  // the timestamp at which this audit message is sent
>   "topic": "my_topic_name", // the topic this audit data is for
>   "tier":"producer", // a user-defined "tier" name
>   "bucket_start": 1301726400000, // the beginning of the time bucket this data applies to
>   "bucket_end": 1301727000000, // the end of the time bucket this data applies to
>   "host":"my_host_name.datacenter.linkedin.com", // the server that this was sent from
>   "datacenter":"hlx32", // the datacenter this occurred in
>   "application":"newsfeed_service", // a user-defined application name
>   "guid":"51656274-a86a-4dff-b824-8e8e20a6348f", // a unique identifier for this message
>   "count":43634
> }
> DISCUSSION
> Time is complex:
> 1. The audit data must be based on a timestamp in the events not the time on machine processing the event. Using this timestamp means that all downstream consumers will report audit data on the right time bucket. This means that there must be a timestamp in the event, which we don't currently require. Arguably we should just add a timestamp to the events, but I think it is sufficient for now just to allow the user to provide a function to extract the time from their events.
> 2. For counts to reconcile exactly we can only do analysis at a granularity based on the least common multiple of the bucket size used by all tiers. The simplest is just to configure them all to use the same bucket size. We currently use a bucket size of 10 mins, but anything from 1-60 mins is probably reasonable.
> For analysis purposes one tier is designated as the source tier and we do reconciliation against this count (e.g. if another tier has less, that is treated as lost, if another tier has more that is duplication).
> Note that this system makes false positives possible since you can lose an audit message. It also makes false negatives possible since if you lose both normal messages and the associated audit messages it will appear that everything adds up. The later problem is astronomically unlikely to happen exactly, though.
> This would integrate into the client (producer and consumer both) in the following way:
> 1. The user provides a way to get timestamps from messages (required)
> 2. The user configures the tier name, host name, datacenter name, and application name as part of the consumer and producer config. We can provide reasonable defaults if not supplied (e.g. if it is a Producer then set tier to "producer" and get the hostname from the OS).
> The application that processes this data is currently a Java Jetty app and talks to mysql. It feeds off the audit topic in kafka and runs both automatic monitoring checks and graphical displays of data against this. The data layer is not terribly scalable but because the audit data is sent only periodically this is enough to allow us to audit thousands of servers on very modest hardware, and having sql access makes diving into the data to trace problems to particular hosts easier.
> LOGISTICS
> I would recommend the following steps:
> 1. Add the audit application, the proposal would be to add a new top-level directory equivalent to core or perf called "audit" to house this application. At this point it would just be sitting there, not really being used.
> 2. Integrate these capabilities into the producer as part of the refactoring we are doing now
> 3. Integrate into consumer when possible

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators: https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa
For more information on JIRA, see: http://www.atlassian.com/software/jira

        

[jira] [Commented] (KAFKA-260) Add audit trail to kafka

Posted by "Neha Narkhede (JIRA)" <ji...@apache.org>.
    [ https://issues.apache.org/jira/browse/KAFKA-260?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13427519#comment-13427519 ] 

Neha Narkhede commented on KAFKA-260:
-------------------------------------

I like option 2. Auditing is useful to have out-of-the-box.
                
> Add audit trail to kafka
> ------------------------
>
>                 Key: KAFKA-260
>                 URL: https://issues.apache.org/jira/browse/KAFKA-260
>             Project: Kafka
>          Issue Type: New Feature
>    Affects Versions: 0.8
>            Reporter: Jay Kreps
>            Assignee: Jay Kreps
>         Attachments: Picture 18.png, kafka-audit-trail-draft.patch
>
>
> LinkedIn has a system that does monitoring on top of our data flow to ensure all data is delivered to all consumers of data. This works by having each logical "tier" through which data passes produce messages to a central "audit-trail" topic; these messages give a time period and the number of messages that passed through that tier in that time period. Example of tiers for data might be "producer", "broker", "hadoop-etl", etc. This makes it possible to compare the total events for a given time period to ensure that all events that are produced are consumed by all consumers.
> This turns out to be extremely useful. We also have an application that "balances the books" and checks that all data is consumed in a timely fashion. This gives graphs for each topic and shows any data loss and the lag at which the data is consumed (if any).
> This would be an optional feature that would allow you to to this kind of reconciliation automatically for all the topics kafka hosts against all the tiers of applications that interact with the data.
> Some details, the proposed format of the data is JSON using the following format for messages:
> {
>   "time":1301727060032,  // the timestamp at which this audit message is sent
>   "topic": "my_topic_name", // the topic this audit data is for
>   "tier":"producer", // a user-defined "tier" name
>   "bucket_start": 1301726400000, // the beginning of the time bucket this data applies to
>   "bucket_end": 1301727000000, // the end of the time bucket this data applies to
>   "host":"my_host_name.datacenter.linkedin.com", // the server that this was sent from
>   "datacenter":"hlx32", // the datacenter this occurred in
>   "application":"newsfeed_service", // a user-defined application name
>   "guid":"51656274-a86a-4dff-b824-8e8e20a6348f", // a unique identifier for this message
>   "count":43634
> }
> DISCUSSION
> Time is complex:
> 1. The audit data must be based on a timestamp in the events not the time on machine processing the event. Using this timestamp means that all downstream consumers will report audit data on the right time bucket. This means that there must be a timestamp in the event, which we don't currently require. Arguably we should just add a timestamp to the events, but I think it is sufficient for now just to allow the user to provide a function to extract the time from their events.
> 2. For counts to reconcile exactly we can only do analysis at a granularity based on the least common multiple of the bucket size used by all tiers. The simplest is just to configure them all to use the same bucket size. We currently use a bucket size of 10 mins, but anything from 1-60 mins is probably reasonable.
> For analysis purposes one tier is designated as the source tier and we do reconciliation against this count (e.g. if another tier has less, that is treated as lost, if another tier has more that is duplication).
> Note that this system makes false positives possible since you can lose an audit message. It also makes false negatives possible since if you lose both normal messages and the associated audit messages it will appear that everything adds up. The later problem is astronomically unlikely to happen exactly, though.
> This would integrate into the client (producer and consumer both) in the following way:
> 1. The user provides a way to get timestamps from messages (required)
> 2. The user configures the tier name, host name, datacenter name, and application name as part of the consumer and producer config. We can provide reasonable defaults if not supplied (e.g. if it is a Producer then set tier to "producer" and get the hostname from the OS).
> The application that processes this data is currently a Java Jetty app and talks to mysql. It feeds off the audit topic in kafka and runs both automatic monitoring checks and graphical displays of data against this. The data layer is not terribly scalable but because the audit data is sent only periodically this is enough to allow us to audit thousands of servers on very modest hardware, and having sql access makes diving into the data to trace problems to particular hosts easier.
> LOGISTICS
> I would recommend the following steps:
> 1. Add the audit application, the proposal would be to add a new top-level directory equivalent to core or perf called "audit" to house this application. At this point it would just be sitting there, not really being used.
> 2. Integrate these capabilities into the producer as part of the refactoring we are doing now
> 3. Integrate into consumer when possible

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators: https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa
For more information on JIRA, see: http://www.atlassian.com/software/jira