You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@kafka.apache.org by Guozhang Wang <wa...@gmail.com> on 2014/10/11 02:33:52 UTC

[DISCUSSION] Message Metadata

Hello all,

I put some thoughts on enhancing our current message metadata format to
solve a bunch of existing issues:

https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Enriched+Message+Metadata

This wiki page is for kicking off some discussions about the feasibility of
adding more info into the message header, and if possible how we would add
them.

-- Guozhang

Re: [DISCUSSION] Message Metadata

Posted by Todd Palino <tp...@gmail.com>.
A couple notes on this

1 / 2 - Message format should not be important to Kafka or the common infrastructure (audit, mirror maker).  As I have noted elsewhere, having to ensure the data contract just to provide a general messaging service is additional overhead that is not always needed. It either requires specific functionality from the consumers and producers (which makes implementing them harder), or it requires the broker to care about the message format, which it should not. This is even before we get to the operational problems of enforcing data formatting.

3 - while this is interesting functionality, it is less general purpose than the simple mirroring that is mostly done now. If you need a customized mirror maker and audit that requires message parsing, go ahead and write that. But the general use case does not. Currently, producing a message requires it to be compressed twice (once by the producer and once by the broker), plus an additional 2 compressions for every additional cluster it needs to pass through. In my case, this usually means 6 compressions. Even one extra is an incredible waste of resources.

-Todd

> On Oct 19, 2014, at 7:42 PM, Jun Rao <ju...@gmail.com> wrote:
> 
> Hi, Guozhang,
> 
> Thanks for the writeup.
> 
> A few high level comments.
> 
> 1. Associating (versioned) schemas to a topic can be a good thing overall.
> Yes, this could add a bit more management overhead in Kafka. However, it
> makes sure that the data format contract between a producer and a consumer
> is kept and managed in a central place, instead of in the application. The
> latter is probably easier to start with, but is likely to be brittle in the
> long run.
> 
> 2. Auditing can be a general feature that's useful for many applications.
> Such a feature can be implemented by extending the low level message format
> with a header. However, it can also be added as part of the schema
> management. For example, you can imagine a type of audited schema that adds
> additional auditing info to an existing schema automatically. Performance
> wise, it probably doesn't make a big difference whether the auditing info
> is added in the message header or the schema header.
> 
> 3. We talked about avoiding the overhead of decompressing in both the
> broker and the mirror maker. We probably need to think through how this
> works with auditing. In the more general case where you want to audit every
> message, one has to do the decompression to get the individual message,
> independent of how the auditing info is stored. This means that if we want
> to audit the broker directly or the consumer in mirror maker, we have to
> pay the decompression cost anyway. Similarly, if we want to extend mirror
> maker to support some customized filtering/transformation logic, we also
> have to pay the decompression cost.
> 
> Some low level comments.
> 
> 4. Broker offset reassignment (kafka-527):  This probably can be done with
> just a format change on the compressed message set.
> 
> 5. MirrorMaker refactoring: We probably can think through how general we
> want mirror maker to be. If we want to it to be more general, we likely
> need to decompress every message just like in a normal consumer. There will
> definitely be overhead. However, as long as mirror maker is made scalable,
> we can overcome the overhead by just running more instances on more
> hardware resources. As for the proposed message format change, we probably
> need to think through it a bit more. The honor-ship flag seems a bit hacky
> to me.
> 
> 6. Adding a timestamp in each message can be a useful thing. It (1) allows
> log segments to be rolled more accurately; (2) allows finding an offset for
> a particular timestamp more accurately. I am thinking that the timestamp in
> the message should probably be the time when the leader receives the
> message. Followers preserve the timestamp set by leader. To avoid time
> going back during leader change, the leader can probably set the timestamp
> to be the  max of current time and the timestamp of the last message, if
> present. That timestamp can potentially be added to the index file to
> answer offsetBeforeTimestamp queries more efficiently.
> 
> 7. Log compaction: It seems that you are suggesting an improvement to
> compact the active segment as well. This can be tricky and we need to
> figure out the details on how to do this. This improvement seems to be
> orthogonal to the message format change though.
> 
> 8. Data inconsistency from unclean election: I am not sure if we need  to
> add a controlled message to the log during leadership change. The <leader
> generation, starting offset> map can be maintained in a separate checkpoint
> file. The follower just need to get that map from the leader during startup.
> 
> Thanks,
> 
> Jun
> 
>> On Fri, Oct 10, 2014 at 5:33 PM, Guozhang Wang <wa...@gmail.com> wrote:
>> 
>> Hello all,
>> 
>> I put some thoughts on enhancing our current message metadata format to
>> solve a bunch of existing issues:
>> 
>> 
>> https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Enriched+Message+Metadata
>> 
>> This wiki page is for kicking off some discussions about the feasibility of
>> adding more info into the message header, and if possible how we would add
>> them.
>> 
>> -- Guozhang
>> 

Re: [DISCUSSION] Message Metadata

Posted by Jun Rao <ju...@gmail.com>.
8. Perhaps I need to see the details of the alternative solution. During
the startup of a follower, in general, it's not enough for the follower to
just see the latest generation of the current leader, since the follower
can be several generations behind. So, if the controlled message only
contains the latest generation, it may not be not enough for the follower
to resolve the discrepancy with the leader. Another thing is that when a
follower starts up, the first thing it needs to do is to figure out how
much to truncate. This needs to happen before the follower can start
fetching. So, there is also a chicken and egg problem. The follower can't
figure out how much to truncate before it can fetch the control message.
However, it doesn't know where to start fetching until the truncation is
properly done.

Thanks,

Jun

On Sun, Oct 19, 2014 at 11:41 PM, Guozhang Wang <wa...@gmail.com> wrote:

> Thanks for the detailed comments Jun! Some replies inlined.
>
> On Sun, Oct 19, 2014 at 7:42 PM, Jun Rao <ju...@gmail.com> wrote:
>
> > Hi, Guozhang,
> >
> > Thanks for the writeup.
> >
> > A few high level comments.
> >
> > 1. Associating (versioned) schemas to a topic can be a good thing
> overall.
> > Yes, this could add a bit more management overhead in Kafka. However, it
> > makes sure that the data format contract between a producer and a
> consumer
> > is kept and managed in a central place, instead of in the application.
> The
> > latter is probably easier to start with, but is likely to be brittle in
> the
> > long run.
> >
>
> I am actually not proposing to not support associated versioned schemas for
> topics, but to not let some core Kafka functionalities like auditing being
> depend on schemas. I think this alone can separate the schema management
> from Kafka piping management (i.e. making sure every single message is
> delivered, and within some latency, etc). Adding additional auditing info
> into an existing schema will force Kafka to be aware of the schema systems
> (Avro, JSON, etc).
>
>
> >
> > 2. Auditing can be a general feature that's useful for many applications.
> > Such a feature can be implemented by extending the low level message
> format
> > with a header. However, it can also be added as part of the schema
> > management. For example, you can imagine a type of audited schema that
> adds
> > additional auditing info to an existing schema automatically. Performance
> > wise, it probably doesn't make a big difference whether the auditing info
> > is added in the message header or the schema header.
> >
> >
> See replies above.
>
>
> > 3. We talked about avoiding the overhead of decompressing in both the
> > broker and the mirror maker. We probably need to think through how this
> > works with auditing. In the more general case where you want to audit
> every
> > message, one has to do the decompression to get the individual message,
> > independent of how the auditing info is stored. This means that if we
> want
> > to audit the broker directly or the consumer in mirror maker, we have to
> > pay the decompression cost anyway. Similarly, if we want to extend mirror
> > maker to support some customized filtering/transformation logic, we also
> > have to pay the decompression cost.
> >
> >
> I see your point. For that I would prefer to have a MM implementation that
> is able to do de-compress / re-compress ONLY if required, for example by
> auditing, etc. I agree that we have not thought through whether we should
> enable auditing on MM, and if yes how to do that, and we could discuss
> about that in a different thread. Overall, this proposal is not just for
> tackling de-compression on MM but about the feasibility of extending Kafka
> message header for system properties / app properties.
>
>
> > Some low level comments.
> >
> > 4. Broker offset reassignment (kafka-527):  This probably can be done
> with
> > just a format change on the compressed message set.
> >
> > That is true. As I mentioned in the wiki each of the problems may be
> resolvable separately but I am thinking about a general way to get all of
> them.
>
>
> > 5. MirrorMaker refactoring: We probably can think through how general we
> > want mirror maker to be. If we want to it to be more general, we likely
> > need to decompress every message just like in a normal consumer. There
> will
> > definitely be overhead. However, as long as mirror maker is made
> scalable,
> > we can overcome the overhead by just running more instances on more
> > hardware resources. As for the proposed message format change, we
> probably
> > need to think through it a bit more. The honor-ship flag seems a bit
> hacky
> > to me.
> >
> >
> Replied as part of 3). Sure we can discuss more about that, will update the
> wiki for collected comments.
>
>
> > 6. Adding a timestamp in each message can be a useful thing. It (1)
> allows
> > log segments to be rolled more accurately; (2) allows finding an offset
> for
> > a particular timestamp more accurately. I am thinking that the timestamp
> in
> > the message should probably be the time when the leader receives the
> > message. Followers preserve the timestamp set by leader. To avoid time
> > going back during leader change, the leader can probably set the
> timestamp
> > to be the  max of current time and the timestamp of the last message, if
> > present. That timestamp can potentially be added to the index file to
> > answer offsetBeforeTimestamp queries more efficiently.
> >
> >
> Agreed.
>
>
> > 7. Log compaction: It seems that you are suggesting an improvement to
> > compact the active segment as well. This can be tricky and we need to
> > figure out the details on how to do this. This improvement seems to be
> > orthogonal to the message format change though.
> >
> >
> I think the improvements is more effective with the timestamps as in 6), we
> can discuss more about this.
>
>
> > 8. Data inconsistency from unclean election: I am not sure if we need  to
> > add a controlled message to the log during leadership change. The <leader
> > generation, starting offset> map can be maintained in a separate
> checkpoint
> > file. The follower just need to get that map from the leader during
> > startup.
> >
> > What I was proposing is an alternative solution given that we have this
> message header enhancement; with this we do not need to add another logic
> for leadership map and checkpoint file, but just the logic on
> replica-manager to handle this extra controlled message and remembering the
> current leader epoch instead of a map.
>
>
> > Thanks,
> >
> > Jun
> >
> > On Fri, Oct 10, 2014 at 5:33 PM, Guozhang Wang <wa...@gmail.com>
> wrote:
> >
> > > Hello all,
> > >
> > > I put some thoughts on enhancing our current message metadata format to
> > > solve a bunch of existing issues:
> > >
> > >
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Enriched+Message+Metadata
> > >
> > > This wiki page is for kicking off some discussions about the
> feasibility
> > of
> > > adding more info into the message header, and if possible how we would
> > add
> > > them.
> > >
> > > -- Guozhang
> > >
> >
>
>
>
> --
> -- Guozhang
>

Re: [DISCUSSION] Message Metadata

Posted by Guozhang Wang <wa...@gmail.com>.
My bad, the link should be this:

https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Enriched+Message+Metadata

On Fri, Nov 21, 2014 at 5:29 PM, Timothy Chen <tn...@gmail.com> wrote:

> Hi Guozhang,
>
> I don't think that is publically accessible, can you update it to the
> Kafka wiki?
>
> Tim
>
> On Fri, Nov 21, 2014 at 5:24 PM, Guozhang Wang <wa...@gmail.com> wrote:
> > Hi all,
> >
> > I have updated the wiki page (
> >
> https://iwww.corp.linkedin.com/wiki/cf/display/ENGS/Kafka+Enriched+Message+Metadata
> )
> > according to people's comments and discussions offline.
> >
> > Guozhang
> >
> > On Thu, Nov 13, 2014 at 9:43 AM, Guozhang Wang <wa...@gmail.com>
> wrote:
> >
> >> Hi Jun,
> >>
> >> Sorry for the delay on your comments in the wiki page as well as this
> >> thread; quite swamped now. I will get back to you as soon as I find some
> >> time.
> >>
> >> Guozhang
> >>
> >> On Tue, Nov 11, 2014 at 6:26 PM, Jun Rao <ju...@gmail.com> wrote:
> >>
> >>> Thinking about this a bit more. For adding the auditing support, I am
> not
> >>> sure if we need to change the message format by adding the application
> >>> tags. An alternative way to do that is to add it in the producer
> client.
> >>> For example, for each message payload (doesn't matter what the
> >>> serialization mechanism is) that a producer receives, the producer can
> >>> just
> >>> add a header before the original payload. The header will contain all
> >>> needed fields (e.g. timestamp, host, etc) for the purpose of auditing.
> >>> This
> >>> way, we don't need to change the message format and the auditing info
> can
> >>> be added independent of the serialization mechanism of the message. The
> >>> header can use a different serialization mechanism for better
> efficiency.
> >>> For example, if we use Avro to serialize the header, the encoded bytes
> >>> won't include the field names in the header. This is potentially more
> >>> efficient than representing those fields as application tags in the
> >>> message
> >>> where the tags have to be explicitly store in every message.
> >>>
> >>> To make it easier for the client to add and make use of this kind of
> >>> auditing support, I was imagining that we can add a ProducerFactory in
> the
> >>> new java client. The ProducerFactory will create an instance of
> Producer
> >>> based on a config property. By default, the current KafkaProducer will
> be
> >>> returned. However, a user can plug in a different implementation of
> >>> Producer that does auditing. For example, an implementation of an
> >>> AuditProducer.send() can take the original ProducerRecord, add the
> header
> >>> to the value byte array and then forward the record to an underlying
> >>> KafkaProducer. We can add a similar ConsumerFactory to the new consumer
> >>> client. If a user plugs in an implementation of the AuditingConsumer,
> the
> >>> consumer will then be audited automatically.
> >>>
> >>> Thanks,
> >>>
> >>> Jun
> >>>
> >>> On Tue, Oct 21, 2014 at 4:06 PM, Guozhang Wang <wa...@gmail.com>
> >>> wrote:
> >>>
> >>> > Hi Jun,
> >>> >
> >>> > Regarding 4) in your comment, after thinking it for a while I cannot
> >>> come
> >>> > up a way to it along with log compaction without adding new fields
> into
> >>> the
> >>> > current format on message set. Do you have a better way that do not
> >>> require
> >>> > protocol changes?
> >>> >
> >>> > Guozhang
> >>> >
> >>> > On Mon, Oct 20, 2014 at 9:53 AM, Guozhang Wang <wa...@gmail.com>
> >>> wrote:
> >>> >
> >>> > > I have updated the wiki page incorporating received comments. We
> can
> >>> > > discuss some more details on:
> >>> > >
> >>> > > 1. How we want to do audit? Whether we want to have in-built
> auditing
> >>> on
> >>> > > brokers or even MMs or use  an audit consumer to fetch all messages
> >>> from
> >>> > > just brokers.
> >>> > >
> >>> > > 2. How we can avoid de-/re-compression on brokers and MMs with log
> >>> > > compaction turned on.
> >>> > >
> >>> > > 3. How we can resolve unclean leader election resulted data
> >>> inconsistency
> >>> > > with control messages.
> >>> > >
> >>> > > Guozhang
> >>> > >
> >>> > > On Sun, Oct 19, 2014 at 11:41 PM, Guozhang Wang <
> wangguoz@gmail.com>
> >>> > > wrote:
> >>> > >
> >>> > >> Thanks for the detailed comments Jun! Some replies inlined.
> >>> > >>
> >>> > >> On Sun, Oct 19, 2014 at 7:42 PM, Jun Rao <ju...@gmail.com>
> wrote:
> >>> > >>
> >>> > >>> Hi, Guozhang,
> >>> > >>>
> >>> > >>> Thanks for the writeup.
> >>> > >>>
> >>> > >>> A few high level comments.
> >>> > >>>
> >>> > >>> 1. Associating (versioned) schemas to a topic can be a good thing
> >>> > >>> overall.
> >>> > >>> Yes, this could add a bit more management overhead in Kafka.
> >>> However,
> >>> > it
> >>> > >>> makes sure that the data format contract between a producer and a
> >>> > >>> consumer
> >>> > >>> is kept and managed in a central place, instead of in the
> >>> application.
> >>> > >>> The
> >>> > >>> latter is probably easier to start with, but is likely to be
> >>> brittle in
> >>> > >>> the
> >>> > >>> long run.
> >>> > >>>
> >>> > >>
> >>> > >> I am actually not proposing to not support associated versioned
> >>> schemas
> >>> > >> for topics, but to not let some core Kafka functionalities like
> >>> auditing
> >>> > >> being depend on schemas. I think this alone can separate the
> schema
> >>> > >> management from Kafka piping management (i.e. making sure every
> >>> single
> >>> > >> message is delivered, and within some latency, etc). Adding
> >>> additional
> >>> > >> auditing info into an existing schema will force Kafka to be
> aware of
> >>> > the
> >>> > >> schema systems (Avro, JSON, etc).
> >>> > >>
> >>> > >>
> >>> > >>>
> >>> > >>> 2. Auditing can be a general feature that's useful for many
> >>> > applications.
> >>> > >>> Such a feature can be implemented by extending the low level
> message
> >>> > >>> format
> >>> > >>> with a header. However, it can also be added as part of the
> schema
> >>> > >>> management. For example, you can imagine a type of audited schema
> >>> that
> >>> > >>> adds
> >>> > >>> additional auditing info to an existing schema automatically.
> >>> > Performance
> >>> > >>> wise, it probably doesn't make a big difference whether the
> auditing
> >>> > info
> >>> > >>> is added in the message header or the schema header.
> >>> > >>>
> >>> > >>>
> >>> > >> See replies above.
> >>> > >>
> >>> > >>
> >>> > >>> 3. We talked about avoiding the overhead of decompressing in both
> >>> the
> >>> > >>> broker and the mirror maker. We probably need to think through
> how
> >>> this
> >>> > >>> works with auditing. In the more general case where you want to
> >>> audit
> >>> > >>> every
> >>> > >>> message, one has to do the decompression to get the individual
> >>> message,
> >>> > >>> independent of how the auditing info is stored. This means that
> if
> >>> we
> >>> > >>> want
> >>> > >>> to audit the broker directly or the consumer in mirror maker, we
> >>> have
> >>> > to
> >>> > >>> pay the decompression cost anyway. Similarly, if we want to
> extend
> >>> > mirror
> >>> > >>> maker to support some customized filtering/transformation logic,
> we
> >>> > also
> >>> > >>> have to pay the decompression cost.
> >>> > >>>
> >>> > >>>
> >>> > >> I see your point. For that I would prefer to have a MM
> implementation
> >>> > >> that is able to do de-compress / re-compress ONLY if required, for
> >>> > example
> >>> > >> by auditing, etc. I agree that we have not thought through
> whether we
> >>> > >> should enable auditing on MM, and if yes how to do that, and we
> could
> >>> > >> discuss about that in a different thread. Overall, this proposal
> is
> >>> not
> >>> > >> just for tackling de-compression on MM but about the feasibility
> of
> >>> > >> extending Kafka message header for system properties / app
> >>> properties.
> >>> > >>
> >>> > >>
> >>> > >>> Some low level comments.
> >>> > >>>
> >>> > >>> 4. Broker offset reassignment (kafka-527):  This probably can be
> >>> done
> >>> > >>> with
> >>> > >>> just a format change on the compressed message set.
> >>> > >>>
> >>> > >>> That is true. As I mentioned in the wiki each of the problems
> may be
> >>> > >> resolvable separately but I am thinking about a general way to get
> >>> all
> >>> > of
> >>> > >> them.
> >>> > >>
> >>> > >>
> >>> > >>> 5. MirrorMaker refactoring: We probably can think through how
> >>> general
> >>> > we
> >>> > >>> want mirror maker to be. If we want to it to be more general, we
> >>> likely
> >>> > >>> need to decompress every message just like in a normal consumer.
> >>> There
> >>> > >>> will
> >>> > >>> definitely be overhead. However, as long as mirror maker is made
> >>> > >>> scalable,
> >>> > >>> we can overcome the overhead by just running more instances on
> more
> >>> > >>> hardware resources. As for the proposed message format change, we
> >>> > >>> probably
> >>> > >>> need to think through it a bit more. The honor-ship flag seems a
> bit
> >>> > >>> hacky
> >>> > >>> to me.
> >>> > >>>
> >>> > >>>
> >>> > >> Replied as part of 3). Sure we can discuss more about that, will
> >>> update
> >>> > >> the wiki for collected comments.
> >>> > >>
> >>> > >>
> >>> > >>> 6. Adding a timestamp in each message can be a useful thing. It
> (1)
> >>> > >>> allows
> >>> > >>> log segments to be rolled more accurately; (2) allows finding an
> >>> offset
> >>> > >>> for
> >>> > >>> a particular timestamp more accurately. I am thinking that the
> >>> > timestamp
> >>> > >>> in
> >>> > >>> the message should probably be the time when the leader receives
> the
> >>> > >>> message. Followers preserve the timestamp set by leader. To avoid
> >>> time
> >>> > >>> going back during leader change, the leader can probably set the
> >>> > >>> timestamp
> >>> > >>> to be the  max of current time and the timestamp of the last
> >>> message,
> >>> > if
> >>> > >>> present. That timestamp can potentially be added to the index
> file
> >>> to
> >>> > >>> answer offsetBeforeTimestamp queries more efficiently.
> >>> > >>>
> >>> > >>>
> >>> > >> Agreed.
> >>> > >>
> >>> > >>
> >>> > >>> 7. Log compaction: It seems that you are suggesting an
> improvement
> >>> to
> >>> > >>> compact the active segment as well. This can be tricky and we
> need
> >>> to
> >>> > >>> figure out the details on how to do this. This improvement seems
> to
> >>> be
> >>> > >>> orthogonal to the message format change though.
> >>> > >>>
> >>> > >>>
> >>> > >> I think the improvements is more effective with the timestamps as
> in
> >>> 6),
> >>> > >> we can discuss more about this.
> >>> > >>
> >>> > >>
> >>> > >>> 8. Data inconsistency from unclean election: I am not sure if we
> >>> need
> >>> > to
> >>> > >>> add a controlled message to the log during leadership change. The
> >>> > <leader
> >>> > >>> generation, starting offset> map can be maintained in a separate
> >>> > >>> checkpoint
> >>> > >>> file. The follower just need to get that map from the leader
> during
> >>> > >>> startup.
> >>> > >>>
> >>> > >>> What I was proposing is an alternative solution given that we
> have
> >>> this
> >>> > >> message header enhancement; with this we do not need to add
> another
> >>> > logic
> >>> > >> for leadership map and checkpoint file, but just the logic on
> >>> > >> replica-manager to handle this extra controlled message and
> >>> remembering
> >>> > the
> >>> > >> current leader epoch instead of a map.
> >>> > >>
> >>> > >>
> >>> > >>> Thanks,
> >>> > >>>
> >>> > >>> Jun
> >>> > >>>
> >>> > >>> On Fri, Oct 10, 2014 at 5:33 PM, Guozhang Wang <
> wangguoz@gmail.com>
> >>> > >>> wrote:
> >>> > >>>
> >>> > >>> > Hello all,
> >>> > >>> >
> >>> > >>> > I put some thoughts on enhancing our current message metadata
> >>> format
> >>> > to
> >>> > >>> > solve a bunch of existing issues:
> >>> > >>> >
> >>> > >>> >
> >>> > >>> >
> >>> > >>>
> >>> >
> >>>
> https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Enriched+Message+Metadata
> >>> > >>> >
> >>> > >>> > This wiki page is for kicking off some discussions about the
> >>> > >>> feasibility of
> >>> > >>> > adding more info into the message header, and if possible how
> we
> >>> > would
> >>> > >>> add
> >>> > >>> > them.
> >>> > >>> >
> >>> > >>> > -- Guozhang
> >>> > >>> >
> >>> > >>>
> >>> > >>
> >>> > >>
> >>> > >>
> >>> > >> --
> >>> > >> -- Guozhang
> >>> > >>
> >>> > >
> >>> > >
> >>> > >
> >>> > > --
> >>> > > -- Guozhang
> >>> > >
> >>> >
> >>> >
> >>> >
> >>> > --
> >>> > -- Guozhang
> >>> >
> >>>
> >>
> >>
> >>
> >> --
> >> -- Guozhang
> >>
> >
> >
> >
> > --
> > -- Guozhang
>



-- 
-- Guozhang

Re: [DISCUSSION] Message Metadata

Posted by Timothy Chen <tn...@gmail.com>.
Hi Guozhang,

I don't think that is publically accessible, can you update it to the
Kafka wiki?

Tim

On Fri, Nov 21, 2014 at 5:24 PM, Guozhang Wang <wa...@gmail.com> wrote:
> Hi all,
>
> I have updated the wiki page (
> https://iwww.corp.linkedin.com/wiki/cf/display/ENGS/Kafka+Enriched+Message+Metadata)
> according to people's comments and discussions offline.
>
> Guozhang
>
> On Thu, Nov 13, 2014 at 9:43 AM, Guozhang Wang <wa...@gmail.com> wrote:
>
>> Hi Jun,
>>
>> Sorry for the delay on your comments in the wiki page as well as this
>> thread; quite swamped now. I will get back to you as soon as I find some
>> time.
>>
>> Guozhang
>>
>> On Tue, Nov 11, 2014 at 6:26 PM, Jun Rao <ju...@gmail.com> wrote:
>>
>>> Thinking about this a bit more. For adding the auditing support, I am not
>>> sure if we need to change the message format by adding the application
>>> tags. An alternative way to do that is to add it in the producer client.
>>> For example, for each message payload (doesn't matter what the
>>> serialization mechanism is) that a producer receives, the producer can
>>> just
>>> add a header before the original payload. The header will contain all
>>> needed fields (e.g. timestamp, host, etc) for the purpose of auditing.
>>> This
>>> way, we don't need to change the message format and the auditing info can
>>> be added independent of the serialization mechanism of the message. The
>>> header can use a different serialization mechanism for better efficiency.
>>> For example, if we use Avro to serialize the header, the encoded bytes
>>> won't include the field names in the header. This is potentially more
>>> efficient than representing those fields as application tags in the
>>> message
>>> where the tags have to be explicitly store in every message.
>>>
>>> To make it easier for the client to add and make use of this kind of
>>> auditing support, I was imagining that we can add a ProducerFactory in the
>>> new java client. The ProducerFactory will create an instance of Producer
>>> based on a config property. By default, the current KafkaProducer will be
>>> returned. However, a user can plug in a different implementation of
>>> Producer that does auditing. For example, an implementation of an
>>> AuditProducer.send() can take the original ProducerRecord, add the header
>>> to the value byte array and then forward the record to an underlying
>>> KafkaProducer. We can add a similar ConsumerFactory to the new consumer
>>> client. If a user plugs in an implementation of the AuditingConsumer, the
>>> consumer will then be audited automatically.
>>>
>>> Thanks,
>>>
>>> Jun
>>>
>>> On Tue, Oct 21, 2014 at 4:06 PM, Guozhang Wang <wa...@gmail.com>
>>> wrote:
>>>
>>> > Hi Jun,
>>> >
>>> > Regarding 4) in your comment, after thinking it for a while I cannot
>>> come
>>> > up a way to it along with log compaction without adding new fields into
>>> the
>>> > current format on message set. Do you have a better way that do not
>>> require
>>> > protocol changes?
>>> >
>>> > Guozhang
>>> >
>>> > On Mon, Oct 20, 2014 at 9:53 AM, Guozhang Wang <wa...@gmail.com>
>>> wrote:
>>> >
>>> > > I have updated the wiki page incorporating received comments. We can
>>> > > discuss some more details on:
>>> > >
>>> > > 1. How we want to do audit? Whether we want to have in-built auditing
>>> on
>>> > > brokers or even MMs or use  an audit consumer to fetch all messages
>>> from
>>> > > just brokers.
>>> > >
>>> > > 2. How we can avoid de-/re-compression on brokers and MMs with log
>>> > > compaction turned on.
>>> > >
>>> > > 3. How we can resolve unclean leader election resulted data
>>> inconsistency
>>> > > with control messages.
>>> > >
>>> > > Guozhang
>>> > >
>>> > > On Sun, Oct 19, 2014 at 11:41 PM, Guozhang Wang <wa...@gmail.com>
>>> > > wrote:
>>> > >
>>> > >> Thanks for the detailed comments Jun! Some replies inlined.
>>> > >>
>>> > >> On Sun, Oct 19, 2014 at 7:42 PM, Jun Rao <ju...@gmail.com> wrote:
>>> > >>
>>> > >>> Hi, Guozhang,
>>> > >>>
>>> > >>> Thanks for the writeup.
>>> > >>>
>>> > >>> A few high level comments.
>>> > >>>
>>> > >>> 1. Associating (versioned) schemas to a topic can be a good thing
>>> > >>> overall.
>>> > >>> Yes, this could add a bit more management overhead in Kafka.
>>> However,
>>> > it
>>> > >>> makes sure that the data format contract between a producer and a
>>> > >>> consumer
>>> > >>> is kept and managed in a central place, instead of in the
>>> application.
>>> > >>> The
>>> > >>> latter is probably easier to start with, but is likely to be
>>> brittle in
>>> > >>> the
>>> > >>> long run.
>>> > >>>
>>> > >>
>>> > >> I am actually not proposing to not support associated versioned
>>> schemas
>>> > >> for topics, but to not let some core Kafka functionalities like
>>> auditing
>>> > >> being depend on schemas. I think this alone can separate the schema
>>> > >> management from Kafka piping management (i.e. making sure every
>>> single
>>> > >> message is delivered, and within some latency, etc). Adding
>>> additional
>>> > >> auditing info into an existing schema will force Kafka to be aware of
>>> > the
>>> > >> schema systems (Avro, JSON, etc).
>>> > >>
>>> > >>
>>> > >>>
>>> > >>> 2. Auditing can be a general feature that's useful for many
>>> > applications.
>>> > >>> Such a feature can be implemented by extending the low level message
>>> > >>> format
>>> > >>> with a header. However, it can also be added as part of the schema
>>> > >>> management. For example, you can imagine a type of audited schema
>>> that
>>> > >>> adds
>>> > >>> additional auditing info to an existing schema automatically.
>>> > Performance
>>> > >>> wise, it probably doesn't make a big difference whether the auditing
>>> > info
>>> > >>> is added in the message header or the schema header.
>>> > >>>
>>> > >>>
>>> > >> See replies above.
>>> > >>
>>> > >>
>>> > >>> 3. We talked about avoiding the overhead of decompressing in both
>>> the
>>> > >>> broker and the mirror maker. We probably need to think through how
>>> this
>>> > >>> works with auditing. In the more general case where you want to
>>> audit
>>> > >>> every
>>> > >>> message, one has to do the decompression to get the individual
>>> message,
>>> > >>> independent of how the auditing info is stored. This means that if
>>> we
>>> > >>> want
>>> > >>> to audit the broker directly or the consumer in mirror maker, we
>>> have
>>> > to
>>> > >>> pay the decompression cost anyway. Similarly, if we want to extend
>>> > mirror
>>> > >>> maker to support some customized filtering/transformation logic, we
>>> > also
>>> > >>> have to pay the decompression cost.
>>> > >>>
>>> > >>>
>>> > >> I see your point. For that I would prefer to have a MM implementation
>>> > >> that is able to do de-compress / re-compress ONLY if required, for
>>> > example
>>> > >> by auditing, etc. I agree that we have not thought through whether we
>>> > >> should enable auditing on MM, and if yes how to do that, and we could
>>> > >> discuss about that in a different thread. Overall, this proposal is
>>> not
>>> > >> just for tackling de-compression on MM but about the feasibility of
>>> > >> extending Kafka message header for system properties / app
>>> properties.
>>> > >>
>>> > >>
>>> > >>> Some low level comments.
>>> > >>>
>>> > >>> 4. Broker offset reassignment (kafka-527):  This probably can be
>>> done
>>> > >>> with
>>> > >>> just a format change on the compressed message set.
>>> > >>>
>>> > >>> That is true. As I mentioned in the wiki each of the problems may be
>>> > >> resolvable separately but I am thinking about a general way to get
>>> all
>>> > of
>>> > >> them.
>>> > >>
>>> > >>
>>> > >>> 5. MirrorMaker refactoring: We probably can think through how
>>> general
>>> > we
>>> > >>> want mirror maker to be. If we want to it to be more general, we
>>> likely
>>> > >>> need to decompress every message just like in a normal consumer.
>>> There
>>> > >>> will
>>> > >>> definitely be overhead. However, as long as mirror maker is made
>>> > >>> scalable,
>>> > >>> we can overcome the overhead by just running more instances on more
>>> > >>> hardware resources. As for the proposed message format change, we
>>> > >>> probably
>>> > >>> need to think through it a bit more. The honor-ship flag seems a bit
>>> > >>> hacky
>>> > >>> to me.
>>> > >>>
>>> > >>>
>>> > >> Replied as part of 3). Sure we can discuss more about that, will
>>> update
>>> > >> the wiki for collected comments.
>>> > >>
>>> > >>
>>> > >>> 6. Adding a timestamp in each message can be a useful thing. It (1)
>>> > >>> allows
>>> > >>> log segments to be rolled more accurately; (2) allows finding an
>>> offset
>>> > >>> for
>>> > >>> a particular timestamp more accurately. I am thinking that the
>>> > timestamp
>>> > >>> in
>>> > >>> the message should probably be the time when the leader receives the
>>> > >>> message. Followers preserve the timestamp set by leader. To avoid
>>> time
>>> > >>> going back during leader change, the leader can probably set the
>>> > >>> timestamp
>>> > >>> to be the  max of current time and the timestamp of the last
>>> message,
>>> > if
>>> > >>> present. That timestamp can potentially be added to the index file
>>> to
>>> > >>> answer offsetBeforeTimestamp queries more efficiently.
>>> > >>>
>>> > >>>
>>> > >> Agreed.
>>> > >>
>>> > >>
>>> > >>> 7. Log compaction: It seems that you are suggesting an improvement
>>> to
>>> > >>> compact the active segment as well. This can be tricky and we need
>>> to
>>> > >>> figure out the details on how to do this. This improvement seems to
>>> be
>>> > >>> orthogonal to the message format change though.
>>> > >>>
>>> > >>>
>>> > >> I think the improvements is more effective with the timestamps as in
>>> 6),
>>> > >> we can discuss more about this.
>>> > >>
>>> > >>
>>> > >>> 8. Data inconsistency from unclean election: I am not sure if we
>>> need
>>> > to
>>> > >>> add a controlled message to the log during leadership change. The
>>> > <leader
>>> > >>> generation, starting offset> map can be maintained in a separate
>>> > >>> checkpoint
>>> > >>> file. The follower just need to get that map from the leader during
>>> > >>> startup.
>>> > >>>
>>> > >>> What I was proposing is an alternative solution given that we have
>>> this
>>> > >> message header enhancement; with this we do not need to add another
>>> > logic
>>> > >> for leadership map and checkpoint file, but just the logic on
>>> > >> replica-manager to handle this extra controlled message and
>>> remembering
>>> > the
>>> > >> current leader epoch instead of a map.
>>> > >>
>>> > >>
>>> > >>> Thanks,
>>> > >>>
>>> > >>> Jun
>>> > >>>
>>> > >>> On Fri, Oct 10, 2014 at 5:33 PM, Guozhang Wang <wa...@gmail.com>
>>> > >>> wrote:
>>> > >>>
>>> > >>> > Hello all,
>>> > >>> >
>>> > >>> > I put some thoughts on enhancing our current message metadata
>>> format
>>> > to
>>> > >>> > solve a bunch of existing issues:
>>> > >>> >
>>> > >>> >
>>> > >>> >
>>> > >>>
>>> >
>>> https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Enriched+Message+Metadata
>>> > >>> >
>>> > >>> > This wiki page is for kicking off some discussions about the
>>> > >>> feasibility of
>>> > >>> > adding more info into the message header, and if possible how we
>>> > would
>>> > >>> add
>>> > >>> > them.
>>> > >>> >
>>> > >>> > -- Guozhang
>>> > >>> >
>>> > >>>
>>> > >>
>>> > >>
>>> > >>
>>> > >> --
>>> > >> -- Guozhang
>>> > >>
>>> > >
>>> > >
>>> > >
>>> > > --
>>> > > -- Guozhang
>>> > >
>>> >
>>> >
>>> >
>>> > --
>>> > -- Guozhang
>>> >
>>>
>>
>>
>>
>> --
>> -- Guozhang
>>
>
>
>
> --
> -- Guozhang

Re: [DISCUSSION] Message Metadata

Posted by Guozhang Wang <wa...@gmail.com>.
Hi all,

I have updated the wiki page (
https://iwww.corp.linkedin.com/wiki/cf/display/ENGS/Kafka+Enriched+Message+Metadata)
according to people's comments and discussions offline.

Guozhang

On Thu, Nov 13, 2014 at 9:43 AM, Guozhang Wang <wa...@gmail.com> wrote:

> Hi Jun,
>
> Sorry for the delay on your comments in the wiki page as well as this
> thread; quite swamped now. I will get back to you as soon as I find some
> time.
>
> Guozhang
>
> On Tue, Nov 11, 2014 at 6:26 PM, Jun Rao <ju...@gmail.com> wrote:
>
>> Thinking about this a bit more. For adding the auditing support, I am not
>> sure if we need to change the message format by adding the application
>> tags. An alternative way to do that is to add it in the producer client.
>> For example, for each message payload (doesn't matter what the
>> serialization mechanism is) that a producer receives, the producer can
>> just
>> add a header before the original payload. The header will contain all
>> needed fields (e.g. timestamp, host, etc) for the purpose of auditing.
>> This
>> way, we don't need to change the message format and the auditing info can
>> be added independent of the serialization mechanism of the message. The
>> header can use a different serialization mechanism for better efficiency.
>> For example, if we use Avro to serialize the header, the encoded bytes
>> won't include the field names in the header. This is potentially more
>> efficient than representing those fields as application tags in the
>> message
>> where the tags have to be explicitly store in every message.
>>
>> To make it easier for the client to add and make use of this kind of
>> auditing support, I was imagining that we can add a ProducerFactory in the
>> new java client. The ProducerFactory will create an instance of Producer
>> based on a config property. By default, the current KafkaProducer will be
>> returned. However, a user can plug in a different implementation of
>> Producer that does auditing. For example, an implementation of an
>> AuditProducer.send() can take the original ProducerRecord, add the header
>> to the value byte array and then forward the record to an underlying
>> KafkaProducer. We can add a similar ConsumerFactory to the new consumer
>> client. If a user plugs in an implementation of the AuditingConsumer, the
>> consumer will then be audited automatically.
>>
>> Thanks,
>>
>> Jun
>>
>> On Tue, Oct 21, 2014 at 4:06 PM, Guozhang Wang <wa...@gmail.com>
>> wrote:
>>
>> > Hi Jun,
>> >
>> > Regarding 4) in your comment, after thinking it for a while I cannot
>> come
>> > up a way to it along with log compaction without adding new fields into
>> the
>> > current format on message set. Do you have a better way that do not
>> require
>> > protocol changes?
>> >
>> > Guozhang
>> >
>> > On Mon, Oct 20, 2014 at 9:53 AM, Guozhang Wang <wa...@gmail.com>
>> wrote:
>> >
>> > > I have updated the wiki page incorporating received comments. We can
>> > > discuss some more details on:
>> > >
>> > > 1. How we want to do audit? Whether we want to have in-built auditing
>> on
>> > > brokers or even MMs or use  an audit consumer to fetch all messages
>> from
>> > > just brokers.
>> > >
>> > > 2. How we can avoid de-/re-compression on brokers and MMs with log
>> > > compaction turned on.
>> > >
>> > > 3. How we can resolve unclean leader election resulted data
>> inconsistency
>> > > with control messages.
>> > >
>> > > Guozhang
>> > >
>> > > On Sun, Oct 19, 2014 at 11:41 PM, Guozhang Wang <wa...@gmail.com>
>> > > wrote:
>> > >
>> > >> Thanks for the detailed comments Jun! Some replies inlined.
>> > >>
>> > >> On Sun, Oct 19, 2014 at 7:42 PM, Jun Rao <ju...@gmail.com> wrote:
>> > >>
>> > >>> Hi, Guozhang,
>> > >>>
>> > >>> Thanks for the writeup.
>> > >>>
>> > >>> A few high level comments.
>> > >>>
>> > >>> 1. Associating (versioned) schemas to a topic can be a good thing
>> > >>> overall.
>> > >>> Yes, this could add a bit more management overhead in Kafka.
>> However,
>> > it
>> > >>> makes sure that the data format contract between a producer and a
>> > >>> consumer
>> > >>> is kept and managed in a central place, instead of in the
>> application.
>> > >>> The
>> > >>> latter is probably easier to start with, but is likely to be
>> brittle in
>> > >>> the
>> > >>> long run.
>> > >>>
>> > >>
>> > >> I am actually not proposing to not support associated versioned
>> schemas
>> > >> for topics, but to not let some core Kafka functionalities like
>> auditing
>> > >> being depend on schemas. I think this alone can separate the schema
>> > >> management from Kafka piping management (i.e. making sure every
>> single
>> > >> message is delivered, and within some latency, etc). Adding
>> additional
>> > >> auditing info into an existing schema will force Kafka to be aware of
>> > the
>> > >> schema systems (Avro, JSON, etc).
>> > >>
>> > >>
>> > >>>
>> > >>> 2. Auditing can be a general feature that's useful for many
>> > applications.
>> > >>> Such a feature can be implemented by extending the low level message
>> > >>> format
>> > >>> with a header. However, it can also be added as part of the schema
>> > >>> management. For example, you can imagine a type of audited schema
>> that
>> > >>> adds
>> > >>> additional auditing info to an existing schema automatically.
>> > Performance
>> > >>> wise, it probably doesn't make a big difference whether the auditing
>> > info
>> > >>> is added in the message header or the schema header.
>> > >>>
>> > >>>
>> > >> See replies above.
>> > >>
>> > >>
>> > >>> 3. We talked about avoiding the overhead of decompressing in both
>> the
>> > >>> broker and the mirror maker. We probably need to think through how
>> this
>> > >>> works with auditing. In the more general case where you want to
>> audit
>> > >>> every
>> > >>> message, one has to do the decompression to get the individual
>> message,
>> > >>> independent of how the auditing info is stored. This means that if
>> we
>> > >>> want
>> > >>> to audit the broker directly or the consumer in mirror maker, we
>> have
>> > to
>> > >>> pay the decompression cost anyway. Similarly, if we want to extend
>> > mirror
>> > >>> maker to support some customized filtering/transformation logic, we
>> > also
>> > >>> have to pay the decompression cost.
>> > >>>
>> > >>>
>> > >> I see your point. For that I would prefer to have a MM implementation
>> > >> that is able to do de-compress / re-compress ONLY if required, for
>> > example
>> > >> by auditing, etc. I agree that we have not thought through whether we
>> > >> should enable auditing on MM, and if yes how to do that, and we could
>> > >> discuss about that in a different thread. Overall, this proposal is
>> not
>> > >> just for tackling de-compression on MM but about the feasibility of
>> > >> extending Kafka message header for system properties / app
>> properties.
>> > >>
>> > >>
>> > >>> Some low level comments.
>> > >>>
>> > >>> 4. Broker offset reassignment (kafka-527):  This probably can be
>> done
>> > >>> with
>> > >>> just a format change on the compressed message set.
>> > >>>
>> > >>> That is true. As I mentioned in the wiki each of the problems may be
>> > >> resolvable separately but I am thinking about a general way to get
>> all
>> > of
>> > >> them.
>> > >>
>> > >>
>> > >>> 5. MirrorMaker refactoring: We probably can think through how
>> general
>> > we
>> > >>> want mirror maker to be. If we want to it to be more general, we
>> likely
>> > >>> need to decompress every message just like in a normal consumer.
>> There
>> > >>> will
>> > >>> definitely be overhead. However, as long as mirror maker is made
>> > >>> scalable,
>> > >>> we can overcome the overhead by just running more instances on more
>> > >>> hardware resources. As for the proposed message format change, we
>> > >>> probably
>> > >>> need to think through it a bit more. The honor-ship flag seems a bit
>> > >>> hacky
>> > >>> to me.
>> > >>>
>> > >>>
>> > >> Replied as part of 3). Sure we can discuss more about that, will
>> update
>> > >> the wiki for collected comments.
>> > >>
>> > >>
>> > >>> 6. Adding a timestamp in each message can be a useful thing. It (1)
>> > >>> allows
>> > >>> log segments to be rolled more accurately; (2) allows finding an
>> offset
>> > >>> for
>> > >>> a particular timestamp more accurately. I am thinking that the
>> > timestamp
>> > >>> in
>> > >>> the message should probably be the time when the leader receives the
>> > >>> message. Followers preserve the timestamp set by leader. To avoid
>> time
>> > >>> going back during leader change, the leader can probably set the
>> > >>> timestamp
>> > >>> to be the  max of current time and the timestamp of the last
>> message,
>> > if
>> > >>> present. That timestamp can potentially be added to the index file
>> to
>> > >>> answer offsetBeforeTimestamp queries more efficiently.
>> > >>>
>> > >>>
>> > >> Agreed.
>> > >>
>> > >>
>> > >>> 7. Log compaction: It seems that you are suggesting an improvement
>> to
>> > >>> compact the active segment as well. This can be tricky and we need
>> to
>> > >>> figure out the details on how to do this. This improvement seems to
>> be
>> > >>> orthogonal to the message format change though.
>> > >>>
>> > >>>
>> > >> I think the improvements is more effective with the timestamps as in
>> 6),
>> > >> we can discuss more about this.
>> > >>
>> > >>
>> > >>> 8. Data inconsistency from unclean election: I am not sure if we
>> need
>> > to
>> > >>> add a controlled message to the log during leadership change. The
>> > <leader
>> > >>> generation, starting offset> map can be maintained in a separate
>> > >>> checkpoint
>> > >>> file. The follower just need to get that map from the leader during
>> > >>> startup.
>> > >>>
>> > >>> What I was proposing is an alternative solution given that we have
>> this
>> > >> message header enhancement; with this we do not need to add another
>> > logic
>> > >> for leadership map and checkpoint file, but just the logic on
>> > >> replica-manager to handle this extra controlled message and
>> remembering
>> > the
>> > >> current leader epoch instead of a map.
>> > >>
>> > >>
>> > >>> Thanks,
>> > >>>
>> > >>> Jun
>> > >>>
>> > >>> On Fri, Oct 10, 2014 at 5:33 PM, Guozhang Wang <wa...@gmail.com>
>> > >>> wrote:
>> > >>>
>> > >>> > Hello all,
>> > >>> >
>> > >>> > I put some thoughts on enhancing our current message metadata
>> format
>> > to
>> > >>> > solve a bunch of existing issues:
>> > >>> >
>> > >>> >
>> > >>> >
>> > >>>
>> >
>> https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Enriched+Message+Metadata
>> > >>> >
>> > >>> > This wiki page is for kicking off some discussions about the
>> > >>> feasibility of
>> > >>> > adding more info into the message header, and if possible how we
>> > would
>> > >>> add
>> > >>> > them.
>> > >>> >
>> > >>> > -- Guozhang
>> > >>> >
>> > >>>
>> > >>
>> > >>
>> > >>
>> > >> --
>> > >> -- Guozhang
>> > >>
>> > >
>> > >
>> > >
>> > > --
>> > > -- Guozhang
>> > >
>> >
>> >
>> >
>> > --
>> > -- Guozhang
>> >
>>
>
>
>
> --
> -- Guozhang
>



-- 
-- Guozhang

Re: [DISCUSSION] Message Metadata

Posted by Guozhang Wang <wa...@gmail.com>.
Hi Jun,

Sorry for the delay on your comments in the wiki page as well as this
thread; quite swamped now. I will get back to you as soon as I find some
time.

Guozhang

On Tue, Nov 11, 2014 at 6:26 PM, Jun Rao <ju...@gmail.com> wrote:

> Thinking about this a bit more. For adding the auditing support, I am not
> sure if we need to change the message format by adding the application
> tags. An alternative way to do that is to add it in the producer client.
> For example, for each message payload (doesn't matter what the
> serialization mechanism is) that a producer receives, the producer can just
> add a header before the original payload. The header will contain all
> needed fields (e.g. timestamp, host, etc) for the purpose of auditing. This
> way, we don't need to change the message format and the auditing info can
> be added independent of the serialization mechanism of the message. The
> header can use a different serialization mechanism for better efficiency.
> For example, if we use Avro to serialize the header, the encoded bytes
> won't include the field names in the header. This is potentially more
> efficient than representing those fields as application tags in the message
> where the tags have to be explicitly store in every message.
>
> To make it easier for the client to add and make use of this kind of
> auditing support, I was imagining that we can add a ProducerFactory in the
> new java client. The ProducerFactory will create an instance of Producer
> based on a config property. By default, the current KafkaProducer will be
> returned. However, a user can plug in a different implementation of
> Producer that does auditing. For example, an implementation of an
> AuditProducer.send() can take the original ProducerRecord, add the header
> to the value byte array and then forward the record to an underlying
> KafkaProducer. We can add a similar ConsumerFactory to the new consumer
> client. If a user plugs in an implementation of the AuditingConsumer, the
> consumer will then be audited automatically.
>
> Thanks,
>
> Jun
>
> On Tue, Oct 21, 2014 at 4:06 PM, Guozhang Wang <wa...@gmail.com> wrote:
>
> > Hi Jun,
> >
> > Regarding 4) in your comment, after thinking it for a while I cannot come
> > up a way to it along with log compaction without adding new fields into
> the
> > current format on message set. Do you have a better way that do not
> require
> > protocol changes?
> >
> > Guozhang
> >
> > On Mon, Oct 20, 2014 at 9:53 AM, Guozhang Wang <wa...@gmail.com>
> wrote:
> >
> > > I have updated the wiki page incorporating received comments. We can
> > > discuss some more details on:
> > >
> > > 1. How we want to do audit? Whether we want to have in-built auditing
> on
> > > brokers or even MMs or use  an audit consumer to fetch all messages
> from
> > > just brokers.
> > >
> > > 2. How we can avoid de-/re-compression on brokers and MMs with log
> > > compaction turned on.
> > >
> > > 3. How we can resolve unclean leader election resulted data
> inconsistency
> > > with control messages.
> > >
> > > Guozhang
> > >
> > > On Sun, Oct 19, 2014 at 11:41 PM, Guozhang Wang <wa...@gmail.com>
> > > wrote:
> > >
> > >> Thanks for the detailed comments Jun! Some replies inlined.
> > >>
> > >> On Sun, Oct 19, 2014 at 7:42 PM, Jun Rao <ju...@gmail.com> wrote:
> > >>
> > >>> Hi, Guozhang,
> > >>>
> > >>> Thanks for the writeup.
> > >>>
> > >>> A few high level comments.
> > >>>
> > >>> 1. Associating (versioned) schemas to a topic can be a good thing
> > >>> overall.
> > >>> Yes, this could add a bit more management overhead in Kafka. However,
> > it
> > >>> makes sure that the data format contract between a producer and a
> > >>> consumer
> > >>> is kept and managed in a central place, instead of in the
> application.
> > >>> The
> > >>> latter is probably easier to start with, but is likely to be brittle
> in
> > >>> the
> > >>> long run.
> > >>>
> > >>
> > >> I am actually not proposing to not support associated versioned
> schemas
> > >> for topics, but to not let some core Kafka functionalities like
> auditing
> > >> being depend on schemas. I think this alone can separate the schema
> > >> management from Kafka piping management (i.e. making sure every single
> > >> message is delivered, and within some latency, etc). Adding additional
> > >> auditing info into an existing schema will force Kafka to be aware of
> > the
> > >> schema systems (Avro, JSON, etc).
> > >>
> > >>
> > >>>
> > >>> 2. Auditing can be a general feature that's useful for many
> > applications.
> > >>> Such a feature can be implemented by extending the low level message
> > >>> format
> > >>> with a header. However, it can also be added as part of the schema
> > >>> management. For example, you can imagine a type of audited schema
> that
> > >>> adds
> > >>> additional auditing info to an existing schema automatically.
> > Performance
> > >>> wise, it probably doesn't make a big difference whether the auditing
> > info
> > >>> is added in the message header or the schema header.
> > >>>
> > >>>
> > >> See replies above.
> > >>
> > >>
> > >>> 3. We talked about avoiding the overhead of decompressing in both the
> > >>> broker and the mirror maker. We probably need to think through how
> this
> > >>> works with auditing. In the more general case where you want to audit
> > >>> every
> > >>> message, one has to do the decompression to get the individual
> message,
> > >>> independent of how the auditing info is stored. This means that if we
> > >>> want
> > >>> to audit the broker directly or the consumer in mirror maker, we have
> > to
> > >>> pay the decompression cost anyway. Similarly, if we want to extend
> > mirror
> > >>> maker to support some customized filtering/transformation logic, we
> > also
> > >>> have to pay the decompression cost.
> > >>>
> > >>>
> > >> I see your point. For that I would prefer to have a MM implementation
> > >> that is able to do de-compress / re-compress ONLY if required, for
> > example
> > >> by auditing, etc. I agree that we have not thought through whether we
> > >> should enable auditing on MM, and if yes how to do that, and we could
> > >> discuss about that in a different thread. Overall, this proposal is
> not
> > >> just for tackling de-compression on MM but about the feasibility of
> > >> extending Kafka message header for system properties / app properties.
> > >>
> > >>
> > >>> Some low level comments.
> > >>>
> > >>> 4. Broker offset reassignment (kafka-527):  This probably can be done
> > >>> with
> > >>> just a format change on the compressed message set.
> > >>>
> > >>> That is true. As I mentioned in the wiki each of the problems may be
> > >> resolvable separately but I am thinking about a general way to get all
> > of
> > >> them.
> > >>
> > >>
> > >>> 5. MirrorMaker refactoring: We probably can think through how general
> > we
> > >>> want mirror maker to be. If we want to it to be more general, we
> likely
> > >>> need to decompress every message just like in a normal consumer.
> There
> > >>> will
> > >>> definitely be overhead. However, as long as mirror maker is made
> > >>> scalable,
> > >>> we can overcome the overhead by just running more instances on more
> > >>> hardware resources. As for the proposed message format change, we
> > >>> probably
> > >>> need to think through it a bit more. The honor-ship flag seems a bit
> > >>> hacky
> > >>> to me.
> > >>>
> > >>>
> > >> Replied as part of 3). Sure we can discuss more about that, will
> update
> > >> the wiki for collected comments.
> > >>
> > >>
> > >>> 6. Adding a timestamp in each message can be a useful thing. It (1)
> > >>> allows
> > >>> log segments to be rolled more accurately; (2) allows finding an
> offset
> > >>> for
> > >>> a particular timestamp more accurately. I am thinking that the
> > timestamp
> > >>> in
> > >>> the message should probably be the time when the leader receives the
> > >>> message. Followers preserve the timestamp set by leader. To avoid
> time
> > >>> going back during leader change, the leader can probably set the
> > >>> timestamp
> > >>> to be the  max of current time and the timestamp of the last message,
> > if
> > >>> present. That timestamp can potentially be added to the index file to
> > >>> answer offsetBeforeTimestamp queries more efficiently.
> > >>>
> > >>>
> > >> Agreed.
> > >>
> > >>
> > >>> 7. Log compaction: It seems that you are suggesting an improvement to
> > >>> compact the active segment as well. This can be tricky and we need to
> > >>> figure out the details on how to do this. This improvement seems to
> be
> > >>> orthogonal to the message format change though.
> > >>>
> > >>>
> > >> I think the improvements is more effective with the timestamps as in
> 6),
> > >> we can discuss more about this.
> > >>
> > >>
> > >>> 8. Data inconsistency from unclean election: I am not sure if we need
> > to
> > >>> add a controlled message to the log during leadership change. The
> > <leader
> > >>> generation, starting offset> map can be maintained in a separate
> > >>> checkpoint
> > >>> file. The follower just need to get that map from the leader during
> > >>> startup.
> > >>>
> > >>> What I was proposing is an alternative solution given that we have
> this
> > >> message header enhancement; with this we do not need to add another
> > logic
> > >> for leadership map and checkpoint file, but just the logic on
> > >> replica-manager to handle this extra controlled message and
> remembering
> > the
> > >> current leader epoch instead of a map.
> > >>
> > >>
> > >>> Thanks,
> > >>>
> > >>> Jun
> > >>>
> > >>> On Fri, Oct 10, 2014 at 5:33 PM, Guozhang Wang <wa...@gmail.com>
> > >>> wrote:
> > >>>
> > >>> > Hello all,
> > >>> >
> > >>> > I put some thoughts on enhancing our current message metadata
> format
> > to
> > >>> > solve a bunch of existing issues:
> > >>> >
> > >>> >
> > >>> >
> > >>>
> >
> https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Enriched+Message+Metadata
> > >>> >
> > >>> > This wiki page is for kicking off some discussions about the
> > >>> feasibility of
> > >>> > adding more info into the message header, and if possible how we
> > would
> > >>> add
> > >>> > them.
> > >>> >
> > >>> > -- Guozhang
> > >>> >
> > >>>
> > >>
> > >>
> > >>
> > >> --
> > >> -- Guozhang
> > >>
> > >
> > >
> > >
> > > --
> > > -- Guozhang
> > >
> >
> >
> >
> > --
> > -- Guozhang
> >
>



-- 
-- Guozhang

Re: [DISCUSSION] Message Metadata

Posted by Jun Rao <ju...@gmail.com>.
Thinking about this a bit more. For adding the auditing support, I am not
sure if we need to change the message format by adding the application
tags. An alternative way to do that is to add it in the producer client.
For example, for each message payload (doesn't matter what the
serialization mechanism is) that a producer receives, the producer can just
add a header before the original payload. The header will contain all
needed fields (e.g. timestamp, host, etc) for the purpose of auditing. This
way, we don't need to change the message format and the auditing info can
be added independent of the serialization mechanism of the message. The
header can use a different serialization mechanism for better efficiency.
For example, if we use Avro to serialize the header, the encoded bytes
won't include the field names in the header. This is potentially more
efficient than representing those fields as application tags in the message
where the tags have to be explicitly store in every message.

To make it easier for the client to add and make use of this kind of
auditing support, I was imagining that we can add a ProducerFactory in the
new java client. The ProducerFactory will create an instance of Producer
based on a config property. By default, the current KafkaProducer will be
returned. However, a user can plug in a different implementation of
Producer that does auditing. For example, an implementation of an
AuditProducer.send() can take the original ProducerRecord, add the header
to the value byte array and then forward the record to an underlying
KafkaProducer. We can add a similar ConsumerFactory to the new consumer
client. If a user plugs in an implementation of the AuditingConsumer, the
consumer will then be audited automatically.

Thanks,

Jun

On Tue, Oct 21, 2014 at 4:06 PM, Guozhang Wang <wa...@gmail.com> wrote:

> Hi Jun,
>
> Regarding 4) in your comment, after thinking it for a while I cannot come
> up a way to it along with log compaction without adding new fields into the
> current format on message set. Do you have a better way that do not require
> protocol changes?
>
> Guozhang
>
> On Mon, Oct 20, 2014 at 9:53 AM, Guozhang Wang <wa...@gmail.com> wrote:
>
> > I have updated the wiki page incorporating received comments. We can
> > discuss some more details on:
> >
> > 1. How we want to do audit? Whether we want to have in-built auditing on
> > brokers or even MMs or use  an audit consumer to fetch all messages from
> > just brokers.
> >
> > 2. How we can avoid de-/re-compression on brokers and MMs with log
> > compaction turned on.
> >
> > 3. How we can resolve unclean leader election resulted data inconsistency
> > with control messages.
> >
> > Guozhang
> >
> > On Sun, Oct 19, 2014 at 11:41 PM, Guozhang Wang <wa...@gmail.com>
> > wrote:
> >
> >> Thanks for the detailed comments Jun! Some replies inlined.
> >>
> >> On Sun, Oct 19, 2014 at 7:42 PM, Jun Rao <ju...@gmail.com> wrote:
> >>
> >>> Hi, Guozhang,
> >>>
> >>> Thanks for the writeup.
> >>>
> >>> A few high level comments.
> >>>
> >>> 1. Associating (versioned) schemas to a topic can be a good thing
> >>> overall.
> >>> Yes, this could add a bit more management overhead in Kafka. However,
> it
> >>> makes sure that the data format contract between a producer and a
> >>> consumer
> >>> is kept and managed in a central place, instead of in the application.
> >>> The
> >>> latter is probably easier to start with, but is likely to be brittle in
> >>> the
> >>> long run.
> >>>
> >>
> >> I am actually not proposing to not support associated versioned schemas
> >> for topics, but to not let some core Kafka functionalities like auditing
> >> being depend on schemas. I think this alone can separate the schema
> >> management from Kafka piping management (i.e. making sure every single
> >> message is delivered, and within some latency, etc). Adding additional
> >> auditing info into an existing schema will force Kafka to be aware of
> the
> >> schema systems (Avro, JSON, etc).
> >>
> >>
> >>>
> >>> 2. Auditing can be a general feature that's useful for many
> applications.
> >>> Such a feature can be implemented by extending the low level message
> >>> format
> >>> with a header. However, it can also be added as part of the schema
> >>> management. For example, you can imagine a type of audited schema that
> >>> adds
> >>> additional auditing info to an existing schema automatically.
> Performance
> >>> wise, it probably doesn't make a big difference whether the auditing
> info
> >>> is added in the message header or the schema header.
> >>>
> >>>
> >> See replies above.
> >>
> >>
> >>> 3. We talked about avoiding the overhead of decompressing in both the
> >>> broker and the mirror maker. We probably need to think through how this
> >>> works with auditing. In the more general case where you want to audit
> >>> every
> >>> message, one has to do the decompression to get the individual message,
> >>> independent of how the auditing info is stored. This means that if we
> >>> want
> >>> to audit the broker directly or the consumer in mirror maker, we have
> to
> >>> pay the decompression cost anyway. Similarly, if we want to extend
> mirror
> >>> maker to support some customized filtering/transformation logic, we
> also
> >>> have to pay the decompression cost.
> >>>
> >>>
> >> I see your point. For that I would prefer to have a MM implementation
> >> that is able to do de-compress / re-compress ONLY if required, for
> example
> >> by auditing, etc. I agree that we have not thought through whether we
> >> should enable auditing on MM, and if yes how to do that, and we could
> >> discuss about that in a different thread. Overall, this proposal is not
> >> just for tackling de-compression on MM but about the feasibility of
> >> extending Kafka message header for system properties / app properties.
> >>
> >>
> >>> Some low level comments.
> >>>
> >>> 4. Broker offset reassignment (kafka-527):  This probably can be done
> >>> with
> >>> just a format change on the compressed message set.
> >>>
> >>> That is true. As I mentioned in the wiki each of the problems may be
> >> resolvable separately but I am thinking about a general way to get all
> of
> >> them.
> >>
> >>
> >>> 5. MirrorMaker refactoring: We probably can think through how general
> we
> >>> want mirror maker to be. If we want to it to be more general, we likely
> >>> need to decompress every message just like in a normal consumer. There
> >>> will
> >>> definitely be overhead. However, as long as mirror maker is made
> >>> scalable,
> >>> we can overcome the overhead by just running more instances on more
> >>> hardware resources. As for the proposed message format change, we
> >>> probably
> >>> need to think through it a bit more. The honor-ship flag seems a bit
> >>> hacky
> >>> to me.
> >>>
> >>>
> >> Replied as part of 3). Sure we can discuss more about that, will update
> >> the wiki for collected comments.
> >>
> >>
> >>> 6. Adding a timestamp in each message can be a useful thing. It (1)
> >>> allows
> >>> log segments to be rolled more accurately; (2) allows finding an offset
> >>> for
> >>> a particular timestamp more accurately. I am thinking that the
> timestamp
> >>> in
> >>> the message should probably be the time when the leader receives the
> >>> message. Followers preserve the timestamp set by leader. To avoid time
> >>> going back during leader change, the leader can probably set the
> >>> timestamp
> >>> to be the  max of current time and the timestamp of the last message,
> if
> >>> present. That timestamp can potentially be added to the index file to
> >>> answer offsetBeforeTimestamp queries more efficiently.
> >>>
> >>>
> >> Agreed.
> >>
> >>
> >>> 7. Log compaction: It seems that you are suggesting an improvement to
> >>> compact the active segment as well. This can be tricky and we need to
> >>> figure out the details on how to do this. This improvement seems to be
> >>> orthogonal to the message format change though.
> >>>
> >>>
> >> I think the improvements is more effective with the timestamps as in 6),
> >> we can discuss more about this.
> >>
> >>
> >>> 8. Data inconsistency from unclean election: I am not sure if we need
> to
> >>> add a controlled message to the log during leadership change. The
> <leader
> >>> generation, starting offset> map can be maintained in a separate
> >>> checkpoint
> >>> file. The follower just need to get that map from the leader during
> >>> startup.
> >>>
> >>> What I was proposing is an alternative solution given that we have this
> >> message header enhancement; with this we do not need to add another
> logic
> >> for leadership map and checkpoint file, but just the logic on
> >> replica-manager to handle this extra controlled message and remembering
> the
> >> current leader epoch instead of a map.
> >>
> >>
> >>> Thanks,
> >>>
> >>> Jun
> >>>
> >>> On Fri, Oct 10, 2014 at 5:33 PM, Guozhang Wang <wa...@gmail.com>
> >>> wrote:
> >>>
> >>> > Hello all,
> >>> >
> >>> > I put some thoughts on enhancing our current message metadata format
> to
> >>> > solve a bunch of existing issues:
> >>> >
> >>> >
> >>> >
> >>>
> https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Enriched+Message+Metadata
> >>> >
> >>> > This wiki page is for kicking off some discussions about the
> >>> feasibility of
> >>> > adding more info into the message header, and if possible how we
> would
> >>> add
> >>> > them.
> >>> >
> >>> > -- Guozhang
> >>> >
> >>>
> >>
> >>
> >>
> >> --
> >> -- Guozhang
> >>
> >
> >
> >
> > --
> > -- Guozhang
> >
>
>
>
> --
> -- Guozhang
>

Re: [DISCUSSION] Message Metadata

Posted by Guozhang Wang <wa...@gmail.com>.
Hi Jun,

Regarding 4) in your comment, after thinking it for a while I cannot come
up a way to it along with log compaction without adding new fields into the
current format on message set. Do you have a better way that do not require
protocol changes?

Guozhang

On Mon, Oct 20, 2014 at 9:53 AM, Guozhang Wang <wa...@gmail.com> wrote:

> I have updated the wiki page incorporating received comments. We can
> discuss some more details on:
>
> 1. How we want to do audit? Whether we want to have in-built auditing on
> brokers or even MMs or use  an audit consumer to fetch all messages from
> just brokers.
>
> 2. How we can avoid de-/re-compression on brokers and MMs with log
> compaction turned on.
>
> 3. How we can resolve unclean leader election resulted data inconsistency
> with control messages.
>
> Guozhang
>
> On Sun, Oct 19, 2014 at 11:41 PM, Guozhang Wang <wa...@gmail.com>
> wrote:
>
>> Thanks for the detailed comments Jun! Some replies inlined.
>>
>> On Sun, Oct 19, 2014 at 7:42 PM, Jun Rao <ju...@gmail.com> wrote:
>>
>>> Hi, Guozhang,
>>>
>>> Thanks for the writeup.
>>>
>>> A few high level comments.
>>>
>>> 1. Associating (versioned) schemas to a topic can be a good thing
>>> overall.
>>> Yes, this could add a bit more management overhead in Kafka. However, it
>>> makes sure that the data format contract between a producer and a
>>> consumer
>>> is kept and managed in a central place, instead of in the application.
>>> The
>>> latter is probably easier to start with, but is likely to be brittle in
>>> the
>>> long run.
>>>
>>
>> I am actually not proposing to not support associated versioned schemas
>> for topics, but to not let some core Kafka functionalities like auditing
>> being depend on schemas. I think this alone can separate the schema
>> management from Kafka piping management (i.e. making sure every single
>> message is delivered, and within some latency, etc). Adding additional
>> auditing info into an existing schema will force Kafka to be aware of the
>> schema systems (Avro, JSON, etc).
>>
>>
>>>
>>> 2. Auditing can be a general feature that's useful for many applications.
>>> Such a feature can be implemented by extending the low level message
>>> format
>>> with a header. However, it can also be added as part of the schema
>>> management. For example, you can imagine a type of audited schema that
>>> adds
>>> additional auditing info to an existing schema automatically. Performance
>>> wise, it probably doesn't make a big difference whether the auditing info
>>> is added in the message header or the schema header.
>>>
>>>
>> See replies above.
>>
>>
>>> 3. We talked about avoiding the overhead of decompressing in both the
>>> broker and the mirror maker. We probably need to think through how this
>>> works with auditing. In the more general case where you want to audit
>>> every
>>> message, one has to do the decompression to get the individual message,
>>> independent of how the auditing info is stored. This means that if we
>>> want
>>> to audit the broker directly or the consumer in mirror maker, we have to
>>> pay the decompression cost anyway. Similarly, if we want to extend mirror
>>> maker to support some customized filtering/transformation logic, we also
>>> have to pay the decompression cost.
>>>
>>>
>> I see your point. For that I would prefer to have a MM implementation
>> that is able to do de-compress / re-compress ONLY if required, for example
>> by auditing, etc. I agree that we have not thought through whether we
>> should enable auditing on MM, and if yes how to do that, and we could
>> discuss about that in a different thread. Overall, this proposal is not
>> just for tackling de-compression on MM but about the feasibility of
>> extending Kafka message header for system properties / app properties.
>>
>>
>>> Some low level comments.
>>>
>>> 4. Broker offset reassignment (kafka-527):  This probably can be done
>>> with
>>> just a format change on the compressed message set.
>>>
>>> That is true. As I mentioned in the wiki each of the problems may be
>> resolvable separately but I am thinking about a general way to get all of
>> them.
>>
>>
>>> 5. MirrorMaker refactoring: We probably can think through how general we
>>> want mirror maker to be. If we want to it to be more general, we likely
>>> need to decompress every message just like in a normal consumer. There
>>> will
>>> definitely be overhead. However, as long as mirror maker is made
>>> scalable,
>>> we can overcome the overhead by just running more instances on more
>>> hardware resources. As for the proposed message format change, we
>>> probably
>>> need to think through it a bit more. The honor-ship flag seems a bit
>>> hacky
>>> to me.
>>>
>>>
>> Replied as part of 3). Sure we can discuss more about that, will update
>> the wiki for collected comments.
>>
>>
>>> 6. Adding a timestamp in each message can be a useful thing. It (1)
>>> allows
>>> log segments to be rolled more accurately; (2) allows finding an offset
>>> for
>>> a particular timestamp more accurately. I am thinking that the timestamp
>>> in
>>> the message should probably be the time when the leader receives the
>>> message. Followers preserve the timestamp set by leader. To avoid time
>>> going back during leader change, the leader can probably set the
>>> timestamp
>>> to be the  max of current time and the timestamp of the last message, if
>>> present. That timestamp can potentially be added to the index file to
>>> answer offsetBeforeTimestamp queries more efficiently.
>>>
>>>
>> Agreed.
>>
>>
>>> 7. Log compaction: It seems that you are suggesting an improvement to
>>> compact the active segment as well. This can be tricky and we need to
>>> figure out the details on how to do this. This improvement seems to be
>>> orthogonal to the message format change though.
>>>
>>>
>> I think the improvements is more effective with the timestamps as in 6),
>> we can discuss more about this.
>>
>>
>>> 8. Data inconsistency from unclean election: I am not sure if we need  to
>>> add a controlled message to the log during leadership change. The <leader
>>> generation, starting offset> map can be maintained in a separate
>>> checkpoint
>>> file. The follower just need to get that map from the leader during
>>> startup.
>>>
>>> What I was proposing is an alternative solution given that we have this
>> message header enhancement; with this we do not need to add another logic
>> for leadership map and checkpoint file, but just the logic on
>> replica-manager to handle this extra controlled message and remembering the
>> current leader epoch instead of a map.
>>
>>
>>> Thanks,
>>>
>>> Jun
>>>
>>> On Fri, Oct 10, 2014 at 5:33 PM, Guozhang Wang <wa...@gmail.com>
>>> wrote:
>>>
>>> > Hello all,
>>> >
>>> > I put some thoughts on enhancing our current message metadata format to
>>> > solve a bunch of existing issues:
>>> >
>>> >
>>> >
>>> https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Enriched+Message+Metadata
>>> >
>>> > This wiki page is for kicking off some discussions about the
>>> feasibility of
>>> > adding more info into the message header, and if possible how we would
>>> add
>>> > them.
>>> >
>>> > -- Guozhang
>>> >
>>>
>>
>>
>>
>> --
>> -- Guozhang
>>
>
>
>
> --
> -- Guozhang
>



-- 
-- Guozhang

Re: [DISCUSSION] Message Metadata

Posted by Guozhang Wang <wa...@gmail.com>.
I have updated the wiki page incorporating received comments. We can
discuss some more details on:

1. How we want to do audit? Whether we want to have in-built auditing on
brokers or even MMs or use  an audit consumer to fetch all messages from
just brokers.

2. How we can avoid de-/re-compression on brokers and MMs with log
compaction turned on.

3. How we can resolve unclean leader election resulted data inconsistency
with control messages.

Guozhang

On Sun, Oct 19, 2014 at 11:41 PM, Guozhang Wang <wa...@gmail.com> wrote:

> Thanks for the detailed comments Jun! Some replies inlined.
>
> On Sun, Oct 19, 2014 at 7:42 PM, Jun Rao <ju...@gmail.com> wrote:
>
>> Hi, Guozhang,
>>
>> Thanks for the writeup.
>>
>> A few high level comments.
>>
>> 1. Associating (versioned) schemas to a topic can be a good thing overall.
>> Yes, this could add a bit more management overhead in Kafka. However, it
>> makes sure that the data format contract between a producer and a consumer
>> is kept and managed in a central place, instead of in the application. The
>> latter is probably easier to start with, but is likely to be brittle in
>> the
>> long run.
>>
>
> I am actually not proposing to not support associated versioned schemas
> for topics, but to not let some core Kafka functionalities like auditing
> being depend on schemas. I think this alone can separate the schema
> management from Kafka piping management (i.e. making sure every single
> message is delivered, and within some latency, etc). Adding additional
> auditing info into an existing schema will force Kafka to be aware of the
> schema systems (Avro, JSON, etc).
>
>
>>
>> 2. Auditing can be a general feature that's useful for many applications.
>> Such a feature can be implemented by extending the low level message
>> format
>> with a header. However, it can also be added as part of the schema
>> management. For example, you can imagine a type of audited schema that
>> adds
>> additional auditing info to an existing schema automatically. Performance
>> wise, it probably doesn't make a big difference whether the auditing info
>> is added in the message header or the schema header.
>>
>>
> See replies above.
>
>
>> 3. We talked about avoiding the overhead of decompressing in both the
>> broker and the mirror maker. We probably need to think through how this
>> works with auditing. In the more general case where you want to audit
>> every
>> message, one has to do the decompression to get the individual message,
>> independent of how the auditing info is stored. This means that if we want
>> to audit the broker directly or the consumer in mirror maker, we have to
>> pay the decompression cost anyway. Similarly, if we want to extend mirror
>> maker to support some customized filtering/transformation logic, we also
>> have to pay the decompression cost.
>>
>>
> I see your point. For that I would prefer to have a MM implementation that
> is able to do de-compress / re-compress ONLY if required, for example by
> auditing, etc. I agree that we have not thought through whether we should
> enable auditing on MM, and if yes how to do that, and we could discuss
> about that in a different thread. Overall, this proposal is not just for
> tackling de-compression on MM but about the feasibility of extending Kafka
> message header for system properties / app properties.
>
>
>> Some low level comments.
>>
>> 4. Broker offset reassignment (kafka-527):  This probably can be done with
>> just a format change on the compressed message set.
>>
>> That is true. As I mentioned in the wiki each of the problems may be
> resolvable separately but I am thinking about a general way to get all of
> them.
>
>
>> 5. MirrorMaker refactoring: We probably can think through how general we
>> want mirror maker to be. If we want to it to be more general, we likely
>> need to decompress every message just like in a normal consumer. There
>> will
>> definitely be overhead. However, as long as mirror maker is made scalable,
>> we can overcome the overhead by just running more instances on more
>> hardware resources. As for the proposed message format change, we probably
>> need to think through it a bit more. The honor-ship flag seems a bit hacky
>> to me.
>>
>>
> Replied as part of 3). Sure we can discuss more about that, will update
> the wiki for collected comments.
>
>
>> 6. Adding a timestamp in each message can be a useful thing. It (1) allows
>> log segments to be rolled more accurately; (2) allows finding an offset
>> for
>> a particular timestamp more accurately. I am thinking that the timestamp
>> in
>> the message should probably be the time when the leader receives the
>> message. Followers preserve the timestamp set by leader. To avoid time
>> going back during leader change, the leader can probably set the timestamp
>> to be the  max of current time and the timestamp of the last message, if
>> present. That timestamp can potentially be added to the index file to
>> answer offsetBeforeTimestamp queries more efficiently.
>>
>>
> Agreed.
>
>
>> 7. Log compaction: It seems that you are suggesting an improvement to
>> compact the active segment as well. This can be tricky and we need to
>> figure out the details on how to do this. This improvement seems to be
>> orthogonal to the message format change though.
>>
>>
> I think the improvements is more effective with the timestamps as in 6),
> we can discuss more about this.
>
>
>> 8. Data inconsistency from unclean election: I am not sure if we need  to
>> add a controlled message to the log during leadership change. The <leader
>> generation, starting offset> map can be maintained in a separate
>> checkpoint
>> file. The follower just need to get that map from the leader during
>> startup.
>>
>> What I was proposing is an alternative solution given that we have this
> message header enhancement; with this we do not need to add another logic
> for leadership map and checkpoint file, but just the logic on
> replica-manager to handle this extra controlled message and remembering the
> current leader epoch instead of a map.
>
>
>> Thanks,
>>
>> Jun
>>
>> On Fri, Oct 10, 2014 at 5:33 PM, Guozhang Wang <wa...@gmail.com>
>> wrote:
>>
>> > Hello all,
>> >
>> > I put some thoughts on enhancing our current message metadata format to
>> > solve a bunch of existing issues:
>> >
>> >
>> >
>> https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Enriched+Message+Metadata
>> >
>> > This wiki page is for kicking off some discussions about the
>> feasibility of
>> > adding more info into the message header, and if possible how we would
>> add
>> > them.
>> >
>> > -- Guozhang
>> >
>>
>
>
>
> --
> -- Guozhang
>



-- 
-- Guozhang

Re: [DISCUSSION] Message Metadata

Posted by Guozhang Wang <wa...@gmail.com>.
Thanks for the detailed comments Jun! Some replies inlined.

On Sun, Oct 19, 2014 at 7:42 PM, Jun Rao <ju...@gmail.com> wrote:

> Hi, Guozhang,
>
> Thanks for the writeup.
>
> A few high level comments.
>
> 1. Associating (versioned) schemas to a topic can be a good thing overall.
> Yes, this could add a bit more management overhead in Kafka. However, it
> makes sure that the data format contract between a producer and a consumer
> is kept and managed in a central place, instead of in the application. The
> latter is probably easier to start with, but is likely to be brittle in the
> long run.
>

I am actually not proposing to not support associated versioned schemas for
topics, but to not let some core Kafka functionalities like auditing being
depend on schemas. I think this alone can separate the schema management
from Kafka piping management (i.e. making sure every single message is
delivered, and within some latency, etc). Adding additional auditing info
into an existing schema will force Kafka to be aware of the schema systems
(Avro, JSON, etc).


>
> 2. Auditing can be a general feature that's useful for many applications.
> Such a feature can be implemented by extending the low level message format
> with a header. However, it can also be added as part of the schema
> management. For example, you can imagine a type of audited schema that adds
> additional auditing info to an existing schema automatically. Performance
> wise, it probably doesn't make a big difference whether the auditing info
> is added in the message header or the schema header.
>
>
See replies above.


> 3. We talked about avoiding the overhead of decompressing in both the
> broker and the mirror maker. We probably need to think through how this
> works with auditing. In the more general case where you want to audit every
> message, one has to do the decompression to get the individual message,
> independent of how the auditing info is stored. This means that if we want
> to audit the broker directly or the consumer in mirror maker, we have to
> pay the decompression cost anyway. Similarly, if we want to extend mirror
> maker to support some customized filtering/transformation logic, we also
> have to pay the decompression cost.
>
>
I see your point. For that I would prefer to have a MM implementation that
is able to do de-compress / re-compress ONLY if required, for example by
auditing, etc. I agree that we have not thought through whether we should
enable auditing on MM, and if yes how to do that, and we could discuss
about that in a different thread. Overall, this proposal is not just for
tackling de-compression on MM but about the feasibility of extending Kafka
message header for system properties / app properties.


> Some low level comments.
>
> 4. Broker offset reassignment (kafka-527):  This probably can be done with
> just a format change on the compressed message set.
>
> That is true. As I mentioned in the wiki each of the problems may be
resolvable separately but I am thinking about a general way to get all of
them.


> 5. MirrorMaker refactoring: We probably can think through how general we
> want mirror maker to be. If we want to it to be more general, we likely
> need to decompress every message just like in a normal consumer. There will
> definitely be overhead. However, as long as mirror maker is made scalable,
> we can overcome the overhead by just running more instances on more
> hardware resources. As for the proposed message format change, we probably
> need to think through it a bit more. The honor-ship flag seems a bit hacky
> to me.
>
>
Replied as part of 3). Sure we can discuss more about that, will update the
wiki for collected comments.


> 6. Adding a timestamp in each message can be a useful thing. It (1) allows
> log segments to be rolled more accurately; (2) allows finding an offset for
> a particular timestamp more accurately. I am thinking that the timestamp in
> the message should probably be the time when the leader receives the
> message. Followers preserve the timestamp set by leader. To avoid time
> going back during leader change, the leader can probably set the timestamp
> to be the  max of current time and the timestamp of the last message, if
> present. That timestamp can potentially be added to the index file to
> answer offsetBeforeTimestamp queries more efficiently.
>
>
Agreed.


> 7. Log compaction: It seems that you are suggesting an improvement to
> compact the active segment as well. This can be tricky and we need to
> figure out the details on how to do this. This improvement seems to be
> orthogonal to the message format change though.
>
>
I think the improvements is more effective with the timestamps as in 6), we
can discuss more about this.


> 8. Data inconsistency from unclean election: I am not sure if we need  to
> add a controlled message to the log during leadership change. The <leader
> generation, starting offset> map can be maintained in a separate checkpoint
> file. The follower just need to get that map from the leader during
> startup.
>
> What I was proposing is an alternative solution given that we have this
message header enhancement; with this we do not need to add another logic
for leadership map and checkpoint file, but just the logic on
replica-manager to handle this extra controlled message and remembering the
current leader epoch instead of a map.


> Thanks,
>
> Jun
>
> On Fri, Oct 10, 2014 at 5:33 PM, Guozhang Wang <wa...@gmail.com> wrote:
>
> > Hello all,
> >
> > I put some thoughts on enhancing our current message metadata format to
> > solve a bunch of existing issues:
> >
> >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Enriched+Message+Metadata
> >
> > This wiki page is for kicking off some discussions about the feasibility
> of
> > adding more info into the message header, and if possible how we would
> add
> > them.
> >
> > -- Guozhang
> >
>



-- 
-- Guozhang

Re: [DISCUSSION] Message Metadata

Posted by Jun Rao <ju...@gmail.com>.
Hi, Guozhang,

Thanks for the writeup.

A few high level comments.

1. Associating (versioned) schemas to a topic can be a good thing overall.
Yes, this could add a bit more management overhead in Kafka. However, it
makes sure that the data format contract between a producer and a consumer
is kept and managed in a central place, instead of in the application. The
latter is probably easier to start with, but is likely to be brittle in the
long run.

2. Auditing can be a general feature that's useful for many applications.
Such a feature can be implemented by extending the low level message format
with a header. However, it can also be added as part of the schema
management. For example, you can imagine a type of audited schema that adds
additional auditing info to an existing schema automatically. Performance
wise, it probably doesn't make a big difference whether the auditing info
is added in the message header or the schema header.

3. We talked about avoiding the overhead of decompressing in both the
broker and the mirror maker. We probably need to think through how this
works with auditing. In the more general case where you want to audit every
message, one has to do the decompression to get the individual message,
independent of how the auditing info is stored. This means that if we want
to audit the broker directly or the consumer in mirror maker, we have to
pay the decompression cost anyway. Similarly, if we want to extend mirror
maker to support some customized filtering/transformation logic, we also
have to pay the decompression cost.

Some low level comments.

4. Broker offset reassignment (kafka-527):  This probably can be done with
just a format change on the compressed message set.

5. MirrorMaker refactoring: We probably can think through how general we
want mirror maker to be. If we want to it to be more general, we likely
need to decompress every message just like in a normal consumer. There will
definitely be overhead. However, as long as mirror maker is made scalable,
we can overcome the overhead by just running more instances on more
hardware resources. As for the proposed message format change, we probably
need to think through it a bit more. The honor-ship flag seems a bit hacky
to me.

6. Adding a timestamp in each message can be a useful thing. It (1) allows
log segments to be rolled more accurately; (2) allows finding an offset for
a particular timestamp more accurately. I am thinking that the timestamp in
the message should probably be the time when the leader receives the
message. Followers preserve the timestamp set by leader. To avoid time
going back during leader change, the leader can probably set the timestamp
to be the  max of current time and the timestamp of the last message, if
present. That timestamp can potentially be added to the index file to
answer offsetBeforeTimestamp queries more efficiently.

7. Log compaction: It seems that you are suggesting an improvement to
compact the active segment as well. This can be tricky and we need to
figure out the details on how to do this. This improvement seems to be
orthogonal to the message format change though.

8. Data inconsistency from unclean election: I am not sure if we need  to
add a controlled message to the log during leadership change. The <leader
generation, starting offset> map can be maintained in a separate checkpoint
file. The follower just need to get that map from the leader during startup.

Thanks,

Jun

On Fri, Oct 10, 2014 at 5:33 PM, Guozhang Wang <wa...@gmail.com> wrote:

> Hello all,
>
> I put some thoughts on enhancing our current message metadata format to
> solve a bunch of existing issues:
>
>
> https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Enriched+Message+Metadata
>
> This wiki page is for kicking off some discussions about the feasibility of
> adding more info into the message header, and if possible how we would add
> them.
>
> -- Guozhang
>

Re: [DISCUSSION] Message Metadata

Posted by Joel Koshy <jj...@gmail.com>.
I think the tags are a useful concept to have in that they do for
applications, what the additional metadata does for brokers. i.e.,
avoiding decompression and recompression of an entire message-set. I
agree that we should not place any "core" fields (i.e., those used
internally by Kafka) in tags and those should be first-class fields in
the message header.  E.g., if we intend to support in-built end-to-end
audit in Kafka then fields for auditing (server, timestamps, etc.)
should be first-class fields in the message header.  However, tags are
useful for application-level features that can avoid a full
decompression.

Although Avro has the ability to just deserialize select fields (say a
header) we then limit the optimization to avro-like formats. Also,
that will remain an application-specific thing and not an intrinsic
part of the wire protocol. i.e., brokers will continue to have to
decompress and recompress messages to assign offsets.

Joel

On Wed, Oct 15, 2014 at 09:04:55PM +0000, Todd Palino wrote:
> Let me add my view on #2 in less delicate terms than Guozhang did :)
> 
> When you¹re trying to run Kafka as a service, having to care about the
> format of the message sucks. I have plenty of users who are just fine
> using the Avro standard and play nice. Then I have a bunch of users who
> don¹t want to use Avro and want to do something else (json, some plain
> text, whatever). Then I have a bunch of users who use Avro but don¹t
> properly register their schemas. Then I have a bunch of users who do
> whatever they want and don¹t tell us.
> 
> What this means is that I can¹t have standard tooling, like auditing, that
> works on the entire system. I either have to whitelist or blacklist
> topics, and then I run into problems when someone adds something new
> either way. It would be preferable if I could monitor and maintain the
> health of the system without having to worry about the message format.
> 
> -Todd
> 
> 
> On 10/15/14, 10:50 AM, "Guozhang Wang" <wa...@gmail.com> wrote:
> 
> >Thanks Joe,
> >
> >I think we now have a few open questions to discuss around this topic:
> >
> >1. Shall we make core Kafka properties as first class fields in message
> >header or put them as tags?
> >
> >The pros of the first approach is more compacted format and hence less
> >message header overhead; the cons are that any future message header
> >change
> >needs protocol bump and possible multi-versioned handling on the server
> >side.
> >
> >Vice versa for the second approach.
> >
> >2. Shall we leave app properties still in message content and enforce
> >schema based topics or make them as extensible tags?
> >
> >The pros of the first approach is again saving message header overhead for
> >apps properties; and the cons are that it enforce schema usage for message
> >content to be partially de-serialized only for app header. At LinkedIn we
> >enforce Avro schemas for auditing purposes, and as a result the Kafka team
> >has to manage the schema registration process / schema repository as well.
> >
> >3. Which properties should be core KAFKA and which should be app
> >properties? For example, shall we make properties that only MM cares about
> >as app properties or Kafka properties?
> >
> >Guozhang
> >
> >On Tue, Oct 14, 2014 at 5:10 AM, Joe Stein <jo...@stealth.ly> wrote:
> >
> >> I think we could add schemaId(binary) to the MessageAndMetaData
> >>
> >> With the schemaId you can implement different downstream software
> >>pattern
> >> on the messages reliably. I wrote up more thoughts on this use
> >> https://cwiki.apache.org/confluence/display/KAFKA/Schema+based+topics it
> >> should strive to encompass all implementation needs for producer,
> >>broker,
> >> consumer hooks.
> >>
> >> So if the application and tagged fields are important you can package
> >>that
> >> into a specific Kafka topic plug-in and assign it to topic(s).  Kafka
> >> server should be able to validate your expected formats (like
> >> encoders/decoders but in broker by topic regardless of producer) to the
> >> topics that have it enabled. We should have these maintained in the
> >>project
> >> under contrib.
> >>
> >> =- Joestein
> >>
> >> On Mon, Oct 13, 2014 at 11:02 PM, Guozhang Wang <wa...@gmail.com>
> >> wrote:
> >>
> >> > Hi Jay,
> >> >
> >> > Thanks for the comments. Replied inline.
> >> >
> >> > Guozhang
> >> >
> >> > On Mon, Oct 13, 2014 at 11:11 AM, Jay Kreps <ja...@gmail.com>
> >>wrote:
> >> >
> >> > > I need to take more time to think about this. Here are a few
> >> off-the-cuff
> >> > > remarks:
> >> > >
> >> > > - To date we have tried really, really hard to keep the data model
> >>for
> >> > > message simple since after all you can always add whatever you like
> >> > inside
> >> > > the message body.
> >> > >
> >> > > - For system tags, why not just make these fields first class
> >>fields in
> >> > > message? The purpose of a system tag is presumably that Why have a
> >> bunch
> >> > of
> >> > > key-value pairs versus first-class fields?
> >> > >
> >> >
> >> > Yes, we can alternatively make system tags as first class fields in
> >>the
> >> > message header to make the format / processing logic simpler.
> >> >
> >> > The main reasons I put them as systems tags are 1) when I think about
> >> these
> >> > possible system tags, some of them are for all types of messages (e.g.
> >> > timestamps), but some of them may be for a specific type of message
> >> > (compressed, control message) and for those not all of them are
> >> necessarily
> >> > required all the time, hence making them as compact tags may save us
> >>some
> >> > space when not all of them are available; 2) with tags we do not need
> >>to
> >> > bump up the protocol version every time we make a change to it, which
> >> > includes keeping the logic to handle all versions on the broker until
> >>the
> >> > old ones are officially discarded; instead, the broker can just
> >>ignore a
> >> > tag if its id is not recognizable since the client is on a newer
> >>version,
> >> > or use some default value / throw exception if a required tag is
> >>missing
> >> > since the client is on an older version.
> >> >
> >> >
> >> > >
> >> > > - You don't necessarily need application-level tags explicitly
> >> > represented
> >> > > in the message format for efficiency. The application can define
> >>their
> >> > own
> >> > > header (e.g. their message could be a size delimited header followed
> >> by a
> >> > > size delimited body). But actually if you use Avro you don't even
> >>need
> >> > this
> >> > > I don't think. Avro has the ability to just deserialize the "header"
> >> > fields
> >> > > in your message. Avro has a notion of reader and writer schemas. The
> >> > writer
> >> > > schema is whatever the message was written with. If the reader
> >>schema
> >> is
> >> > > just the header, avro will skip any fields it doesn't need and just
> >> > > deserialize the fields it does need. This is actually a much more
> >> usable
> >> > > and flexible way to define a header since you get all the types avro
> >> > allows
> >> > > instead of just bytes.
> >> > >
> >> >
> >> > I agree that we can use a reader schema to just read out the header
> >> without
> >> > de-serializing the full message, and probably for compressed message
> >>we
> >> can
> >> > add an Avro / etc header for the compressed wrapper message also, but
> >> that
> >> > would enforce these applications (MM, auditor, clients) to be
> >> schema-aware,
> >> > which would usually require the people who manage this data pipeline
> >>also
> >> > manage the schemas, whereas ideally Kafka itself should just consider
> >> > bytes-in and bytes-out (and maybe a little bit more, like timestamps).
> >> The
> >> > purpose here is to not introduce an extra dependency while at the same
> >> time
> >> > allow applications to not fully de-serialize / de-compress the
> >>message in
> >> > order to do some simple processing based on metadata only.
> >> >
> >> >
> >> > >
> >> > > - We will need to think carefully about what to do with timestamps
> >>if
> >> we
> >> > > end up including them. There are actually several timestamps
> >> > >   - The time the producer created the message
> >> > >   - The time the leader received the message
> >> > >   - The time the current broker received the message
> >> > > The producer timestamps won't be at all increasing. The leader
> >> timestamp
> >> > > will be mostly increasing except when the clock changes or
> >>leadership
> >> > > moves. This somewhat complicates the use of these timestamps,
> >>though.
> >> > From
> >> > > the point of view of the producer the only time that matters is the
> >> time
> >> > > the message was created. However since the producer sets this it
> >>can be
> >> > > arbitrarily bad (remember all the ntp issues and 1970 timestamps we
> >> would
> >> > > get). Say that the heuristic was to use the timestamp of the first
> >> > message
> >> > > in a file for retention, the problem would be that the timestamps
> >>for
> >> the
> >> > > segments need not even be sequential and a single bad producer could
> >> send
> >> > > data with time in the distant past or future causing data to be
> >>deleted
> >> > or
> >> > > retained forever. Using the broker timestamp at write time is
> >>better,
> >> > > though obvious that would be overwritten when data is mirrored
> >>between
> >> > > clusters (the mirror would then have a different time--and if the
> >> > mirroring
> >> > > ever stopped that gap could be large). One approach would be to use
> >>the
> >> > > client timestamp but have the broker overwrite it if it is too bad
> >> (e.g.
> >> > > off by more than a minute, say).
> >> > >
> >> >
> >> > We would need the reception timestamp (i.e. the third one) for log
> >> > cleaning, and as for the first / second ones, I originally put them as
> >> app
> >> > tags since they are likely to be used not by the brokers itself (e.g.
> >> > auditor, etc).
> >> >
> >> >
> >> > >
> >> > > -Jay
> >> > >
> >> > > On Fri, Oct 10, 2014 at 11:21 PM, Joel Koshy <jj...@gmail.com>
> >> > wrote:
> >> > >
> >> > > > Thanks Guozhang! This is an excellent write-up and the approach
> >> nicely
> >> > > > consolidates a number of long-standing issues. It would be great
> >>if
> >> > > > everyone can review this carefully and give feedback.
> >> > > >
> >> > > > Also, wrt discussion in the past we have used a mix of wiki
> >>comments
> >> > > > and the mailing list. Personally, I think it is better to discuss
> >>on
> >> > > > the mailing list (for more visibility) and just post a bold link
> >>to
> >> > > > the (archived) mailing list thread on the wiki.
> >> > > >
> >> > > > Joel
> >> > > >
> >> > > > On Fri, Oct 10, 2014 at 05:33:52PM -0700, Guozhang Wang wrote:
> >> > > > > Hello all,
> >> > > > >
> >> > > > > I put some thoughts on enhancing our current message metadata
> >> format
> >> > to
> >> > > > > solve a bunch of existing issues:
> >> > > > >
> >> > > > >
> >> > > >
> >> > >
> >> >
> >> 
> >>https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Enriched+Message+
> >>Metadata
> >> > > > >
> >> > > > > This wiki page is for kicking off some discussions about the
> >> > > feasibility
> >> > > > of
> >> > > > > adding more info into the message header, and if possible how we
> >> > would
> >> > > > add
> >> > > > > them.
> >> > > > >
> >> > > > > -- Guozhang
> >> > > >
> >> > > >
> >> > >
> >> >
> >> >
> >> >
> >> > --
> >> > -- Guozhang
> >> >
> >>
> >
> >
> >
> >-- 
> >-- Guozhang

Re: [DISCUSSION] Message Metadata

Posted by Todd Palino <tp...@linkedin.com.INVALID>.
Let me add my view on #2 in less delicate terms than Guozhang did :)

When you¹re trying to run Kafka as a service, having to care about the
format of the message sucks. I have plenty of users who are just fine
using the Avro standard and play nice. Then I have a bunch of users who
don¹t want to use Avro and want to do something else (json, some plain
text, whatever). Then I have a bunch of users who use Avro but don¹t
properly register their schemas. Then I have a bunch of users who do
whatever they want and don¹t tell us.

What this means is that I can¹t have standard tooling, like auditing, that
works on the entire system. I either have to whitelist or blacklist
topics, and then I run into problems when someone adds something new
either way. It would be preferable if I could monitor and maintain the
health of the system without having to worry about the message format.

-Todd


On 10/15/14, 10:50 AM, "Guozhang Wang" <wa...@gmail.com> wrote:

>Thanks Joe,
>
>I think we now have a few open questions to discuss around this topic:
>
>1. Shall we make core Kafka properties as first class fields in message
>header or put them as tags?
>
>The pros of the first approach is more compacted format and hence less
>message header overhead; the cons are that any future message header
>change
>needs protocol bump and possible multi-versioned handling on the server
>side.
>
>Vice versa for the second approach.
>
>2. Shall we leave app properties still in message content and enforce
>schema based topics or make them as extensible tags?
>
>The pros of the first approach is again saving message header overhead for
>apps properties; and the cons are that it enforce schema usage for message
>content to be partially de-serialized only for app header. At LinkedIn we
>enforce Avro schemas for auditing purposes, and as a result the Kafka team
>has to manage the schema registration process / schema repository as well.
>
>3. Which properties should be core KAFKA and which should be app
>properties? For example, shall we make properties that only MM cares about
>as app properties or Kafka properties?
>
>Guozhang
>
>On Tue, Oct 14, 2014 at 5:10 AM, Joe Stein <jo...@stealth.ly> wrote:
>
>> I think we could add schemaId(binary) to the MessageAndMetaData
>>
>> With the schemaId you can implement different downstream software
>>pattern
>> on the messages reliably. I wrote up more thoughts on this use
>> https://cwiki.apache.org/confluence/display/KAFKA/Schema+based+topics it
>> should strive to encompass all implementation needs for producer,
>>broker,
>> consumer hooks.
>>
>> So if the application and tagged fields are important you can package
>>that
>> into a specific Kafka topic plug-in and assign it to topic(s).  Kafka
>> server should be able to validate your expected formats (like
>> encoders/decoders but in broker by topic regardless of producer) to the
>> topics that have it enabled. We should have these maintained in the
>>project
>> under contrib.
>>
>> =- Joestein
>>
>> On Mon, Oct 13, 2014 at 11:02 PM, Guozhang Wang <wa...@gmail.com>
>> wrote:
>>
>> > Hi Jay,
>> >
>> > Thanks for the comments. Replied inline.
>> >
>> > Guozhang
>> >
>> > On Mon, Oct 13, 2014 at 11:11 AM, Jay Kreps <ja...@gmail.com>
>>wrote:
>> >
>> > > I need to take more time to think about this. Here are a few
>> off-the-cuff
>> > > remarks:
>> > >
>> > > - To date we have tried really, really hard to keep the data model
>>for
>> > > message simple since after all you can always add whatever you like
>> > inside
>> > > the message body.
>> > >
>> > > - For system tags, why not just make these fields first class
>>fields in
>> > > message? The purpose of a system tag is presumably that Why have a
>> bunch
>> > of
>> > > key-value pairs versus first-class fields?
>> > >
>> >
>> > Yes, we can alternatively make system tags as first class fields in
>>the
>> > message header to make the format / processing logic simpler.
>> >
>> > The main reasons I put them as systems tags are 1) when I think about
>> these
>> > possible system tags, some of them are for all types of messages (e.g.
>> > timestamps), but some of them may be for a specific type of message
>> > (compressed, control message) and for those not all of them are
>> necessarily
>> > required all the time, hence making them as compact tags may save us
>>some
>> > space when not all of them are available; 2) with tags we do not need
>>to
>> > bump up the protocol version every time we make a change to it, which
>> > includes keeping the logic to handle all versions on the broker until
>>the
>> > old ones are officially discarded; instead, the broker can just
>>ignore a
>> > tag if its id is not recognizable since the client is on a newer
>>version,
>> > or use some default value / throw exception if a required tag is
>>missing
>> > since the client is on an older version.
>> >
>> >
>> > >
>> > > - You don't necessarily need application-level tags explicitly
>> > represented
>> > > in the message format for efficiency. The application can define
>>their
>> > own
>> > > header (e.g. their message could be a size delimited header followed
>> by a
>> > > size delimited body). But actually if you use Avro you don't even
>>need
>> > this
>> > > I don't think. Avro has the ability to just deserialize the "header"
>> > fields
>> > > in your message. Avro has a notion of reader and writer schemas. The
>> > writer
>> > > schema is whatever the message was written with. If the reader
>>schema
>> is
>> > > just the header, avro will skip any fields it doesn't need and just
>> > > deserialize the fields it does need. This is actually a much more
>> usable
>> > > and flexible way to define a header since you get all the types avro
>> > allows
>> > > instead of just bytes.
>> > >
>> >
>> > I agree that we can use a reader schema to just read out the header
>> without
>> > de-serializing the full message, and probably for compressed message
>>we
>> can
>> > add an Avro / etc header for the compressed wrapper message also, but
>> that
>> > would enforce these applications (MM, auditor, clients) to be
>> schema-aware,
>> > which would usually require the people who manage this data pipeline
>>also
>> > manage the schemas, whereas ideally Kafka itself should just consider
>> > bytes-in and bytes-out (and maybe a little bit more, like timestamps).
>> The
>> > purpose here is to not introduce an extra dependency while at the same
>> time
>> > allow applications to not fully de-serialize / de-compress the
>>message in
>> > order to do some simple processing based on metadata only.
>> >
>> >
>> > >
>> > > - We will need to think carefully about what to do with timestamps
>>if
>> we
>> > > end up including them. There are actually several timestamps
>> > >   - The time the producer created the message
>> > >   - The time the leader received the message
>> > >   - The time the current broker received the message
>> > > The producer timestamps won't be at all increasing. The leader
>> timestamp
>> > > will be mostly increasing except when the clock changes or
>>leadership
>> > > moves. This somewhat complicates the use of these timestamps,
>>though.
>> > From
>> > > the point of view of the producer the only time that matters is the
>> time
>> > > the message was created. However since the producer sets this it
>>can be
>> > > arbitrarily bad (remember all the ntp issues and 1970 timestamps we
>> would
>> > > get). Say that the heuristic was to use the timestamp of the first
>> > message
>> > > in a file for retention, the problem would be that the timestamps
>>for
>> the
>> > > segments need not even be sequential and a single bad producer could
>> send
>> > > data with time in the distant past or future causing data to be
>>deleted
>> > or
>> > > retained forever. Using the broker timestamp at write time is
>>better,
>> > > though obvious that would be overwritten when data is mirrored
>>between
>> > > clusters (the mirror would then have a different time--and if the
>> > mirroring
>> > > ever stopped that gap could be large). One approach would be to use
>>the
>> > > client timestamp but have the broker overwrite it if it is too bad
>> (e.g.
>> > > off by more than a minute, say).
>> > >
>> >
>> > We would need the reception timestamp (i.e. the third one) for log
>> > cleaning, and as for the first / second ones, I originally put them as
>> app
>> > tags since they are likely to be used not by the brokers itself (e.g.
>> > auditor, etc).
>> >
>> >
>> > >
>> > > -Jay
>> > >
>> > > On Fri, Oct 10, 2014 at 11:21 PM, Joel Koshy <jj...@gmail.com>
>> > wrote:
>> > >
>> > > > Thanks Guozhang! This is an excellent write-up and the approach
>> nicely
>> > > > consolidates a number of long-standing issues. It would be great
>>if
>> > > > everyone can review this carefully and give feedback.
>> > > >
>> > > > Also, wrt discussion in the past we have used a mix of wiki
>>comments
>> > > > and the mailing list. Personally, I think it is better to discuss
>>on
>> > > > the mailing list (for more visibility) and just post a bold link
>>to
>> > > > the (archived) mailing list thread on the wiki.
>> > > >
>> > > > Joel
>> > > >
>> > > > On Fri, Oct 10, 2014 at 05:33:52PM -0700, Guozhang Wang wrote:
>> > > > > Hello all,
>> > > > >
>> > > > > I put some thoughts on enhancing our current message metadata
>> format
>> > to
>> > > > > solve a bunch of existing issues:
>> > > > >
>> > > > >
>> > > >
>> > >
>> >
>> 
>>https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Enriched+Message+
>>Metadata
>> > > > >
>> > > > > This wiki page is for kicking off some discussions about the
>> > > feasibility
>> > > > of
>> > > > > adding more info into the message header, and if possible how we
>> > would
>> > > > add
>> > > > > them.
>> > > > >
>> > > > > -- Guozhang
>> > > >
>> > > >
>> > >
>> >
>> >
>> >
>> > --
>> > -- Guozhang
>> >
>>
>
>
>
>-- 
>-- Guozhang


Re: [DISCUSSION] Message Metadata

Posted by Guozhang Wang <wa...@gmail.com>.
Thanks Joe,

I think we now have a few open questions to discuss around this topic:

1. Shall we make core Kafka properties as first class fields in message
header or put them as tags?

The pros of the first approach is more compacted format and hence less
message header overhead; the cons are that any future message header change
needs protocol bump and possible multi-versioned handling on the server
side.

Vice versa for the second approach.

2. Shall we leave app properties still in message content and enforce
schema based topics or make them as extensible tags?

The pros of the first approach is again saving message header overhead for
apps properties; and the cons are that it enforce schema usage for message
content to be partially de-serialized only for app header. At LinkedIn we
enforce Avro schemas for auditing purposes, and as a result the Kafka team
has to manage the schema registration process / schema repository as well.

3. Which properties should be core KAFKA and which should be app
properties? For example, shall we make properties that only MM cares about
as app properties or Kafka properties?

Guozhang

On Tue, Oct 14, 2014 at 5:10 AM, Joe Stein <jo...@stealth.ly> wrote:

> I think we could add schemaId(binary) to the MessageAndMetaData
>
> With the schemaId you can implement different downstream software pattern
> on the messages reliably. I wrote up more thoughts on this use
> https://cwiki.apache.org/confluence/display/KAFKA/Schema+based+topics it
> should strive to encompass all implementation needs for producer, broker,
> consumer hooks.
>
> So if the application and tagged fields are important you can package that
> into a specific Kafka topic plug-in and assign it to topic(s).  Kafka
> server should be able to validate your expected formats (like
> encoders/decoders but in broker by topic regardless of producer) to the
> topics that have it enabled. We should have these maintained in the project
> under contrib.
>
> =- Joestein
>
> On Mon, Oct 13, 2014 at 11:02 PM, Guozhang Wang <wa...@gmail.com>
> wrote:
>
> > Hi Jay,
> >
> > Thanks for the comments. Replied inline.
> >
> > Guozhang
> >
> > On Mon, Oct 13, 2014 at 11:11 AM, Jay Kreps <ja...@gmail.com> wrote:
> >
> > > I need to take more time to think about this. Here are a few
> off-the-cuff
> > > remarks:
> > >
> > > - To date we have tried really, really hard to keep the data model for
> > > message simple since after all you can always add whatever you like
> > inside
> > > the message body.
> > >
> > > - For system tags, why not just make these fields first class fields in
> > > message? The purpose of a system tag is presumably that Why have a
> bunch
> > of
> > > key-value pairs versus first-class fields?
> > >
> >
> > Yes, we can alternatively make system tags as first class fields in the
> > message header to make the format / processing logic simpler.
> >
> > The main reasons I put them as systems tags are 1) when I think about
> these
> > possible system tags, some of them are for all types of messages (e.g.
> > timestamps), but some of them may be for a specific type of message
> > (compressed, control message) and for those not all of them are
> necessarily
> > required all the time, hence making them as compact tags may save us some
> > space when not all of them are available; 2) with tags we do not need to
> > bump up the protocol version every time we make a change to it, which
> > includes keeping the logic to handle all versions on the broker until the
> > old ones are officially discarded; instead, the broker can just ignore a
> > tag if its id is not recognizable since the client is on a newer version,
> > or use some default value / throw exception if a required tag is missing
> > since the client is on an older version.
> >
> >
> > >
> > > - You don't necessarily need application-level tags explicitly
> > represented
> > > in the message format for efficiency. The application can define their
> > own
> > > header (e.g. their message could be a size delimited header followed
> by a
> > > size delimited body). But actually if you use Avro you don't even need
> > this
> > > I don't think. Avro has the ability to just deserialize the "header"
> > fields
> > > in your message. Avro has a notion of reader and writer schemas. The
> > writer
> > > schema is whatever the message was written with. If the reader schema
> is
> > > just the header, avro will skip any fields it doesn't need and just
> > > deserialize the fields it does need. This is actually a much more
> usable
> > > and flexible way to define a header since you get all the types avro
> > allows
> > > instead of just bytes.
> > >
> >
> > I agree that we can use a reader schema to just read out the header
> without
> > de-serializing the full message, and probably for compressed message we
> can
> > add an Avro / etc header for the compressed wrapper message also, but
> that
> > would enforce these applications (MM, auditor, clients) to be
> schema-aware,
> > which would usually require the people who manage this data pipeline also
> > manage the schemas, whereas ideally Kafka itself should just consider
> > bytes-in and bytes-out (and maybe a little bit more, like timestamps).
> The
> > purpose here is to not introduce an extra dependency while at the same
> time
> > allow applications to not fully de-serialize / de-compress the message in
> > order to do some simple processing based on metadata only.
> >
> >
> > >
> > > - We will need to think carefully about what to do with timestamps if
> we
> > > end up including them. There are actually several timestamps
> > >   - The time the producer created the message
> > >   - The time the leader received the message
> > >   - The time the current broker received the message
> > > The producer timestamps won't be at all increasing. The leader
> timestamp
> > > will be mostly increasing except when the clock changes or leadership
> > > moves. This somewhat complicates the use of these timestamps, though.
> > From
> > > the point of view of the producer the only time that matters is the
> time
> > > the message was created. However since the producer sets this it can be
> > > arbitrarily bad (remember all the ntp issues and 1970 timestamps we
> would
> > > get). Say that the heuristic was to use the timestamp of the first
> > message
> > > in a file for retention, the problem would be that the timestamps for
> the
> > > segments need not even be sequential and a single bad producer could
> send
> > > data with time in the distant past or future causing data to be deleted
> > or
> > > retained forever. Using the broker timestamp at write time is better,
> > > though obvious that would be overwritten when data is mirrored between
> > > clusters (the mirror would then have a different time--and if the
> > mirroring
> > > ever stopped that gap could be large). One approach would be to use the
> > > client timestamp but have the broker overwrite it if it is too bad
> (e.g.
> > > off by more than a minute, say).
> > >
> >
> > We would need the reception timestamp (i.e. the third one) for log
> > cleaning, and as for the first / second ones, I originally put them as
> app
> > tags since they are likely to be used not by the brokers itself (e.g.
> > auditor, etc).
> >
> >
> > >
> > > -Jay
> > >
> > > On Fri, Oct 10, 2014 at 11:21 PM, Joel Koshy <jj...@gmail.com>
> > wrote:
> > >
> > > > Thanks Guozhang! This is an excellent write-up and the approach
> nicely
> > > > consolidates a number of long-standing issues. It would be great if
> > > > everyone can review this carefully and give feedback.
> > > >
> > > > Also, wrt discussion in the past we have used a mix of wiki comments
> > > > and the mailing list. Personally, I think it is better to discuss on
> > > > the mailing list (for more visibility) and just post a bold link to
> > > > the (archived) mailing list thread on the wiki.
> > > >
> > > > Joel
> > > >
> > > > On Fri, Oct 10, 2014 at 05:33:52PM -0700, Guozhang Wang wrote:
> > > > > Hello all,
> > > > >
> > > > > I put some thoughts on enhancing our current message metadata
> format
> > to
> > > > > solve a bunch of existing issues:
> > > > >
> > > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Enriched+Message+Metadata
> > > > >
> > > > > This wiki page is for kicking off some discussions about the
> > > feasibility
> > > > of
> > > > > adding more info into the message header, and if possible how we
> > would
> > > > add
> > > > > them.
> > > > >
> > > > > -- Guozhang
> > > >
> > > >
> > >
> >
> >
> >
> > --
> > -- Guozhang
> >
>



-- 
-- Guozhang

Re: [DISCUSSION] Message Metadata

Posted by Joe Stein <jo...@stealth.ly>.
I think we could add schemaId(binary) to the MessageAndMetaData

With the schemaId you can implement different downstream software pattern
on the messages reliably. I wrote up more thoughts on this use
https://cwiki.apache.org/confluence/display/KAFKA/Schema+based+topics it
should strive to encompass all implementation needs for producer, broker,
consumer hooks.

So if the application and tagged fields are important you can package that
into a specific Kafka topic plug-in and assign it to topic(s).  Kafka
server should be able to validate your expected formats (like
encoders/decoders but in broker by topic regardless of producer) to the
topics that have it enabled. We should have these maintained in the project
under contrib.

=- Joestein

On Mon, Oct 13, 2014 at 11:02 PM, Guozhang Wang <wa...@gmail.com> wrote:

> Hi Jay,
>
> Thanks for the comments. Replied inline.
>
> Guozhang
>
> On Mon, Oct 13, 2014 at 11:11 AM, Jay Kreps <ja...@gmail.com> wrote:
>
> > I need to take more time to think about this. Here are a few off-the-cuff
> > remarks:
> >
> > - To date we have tried really, really hard to keep the data model for
> > message simple since after all you can always add whatever you like
> inside
> > the message body.
> >
> > - For system tags, why not just make these fields first class fields in
> > message? The purpose of a system tag is presumably that Why have a bunch
> of
> > key-value pairs versus first-class fields?
> >
>
> Yes, we can alternatively make system tags as first class fields in the
> message header to make the format / processing logic simpler.
>
> The main reasons I put them as systems tags are 1) when I think about these
> possible system tags, some of them are for all types of messages (e.g.
> timestamps), but some of them may be for a specific type of message
> (compressed, control message) and for those not all of them are necessarily
> required all the time, hence making them as compact tags may save us some
> space when not all of them are available; 2) with tags we do not need to
> bump up the protocol version every time we make a change to it, which
> includes keeping the logic to handle all versions on the broker until the
> old ones are officially discarded; instead, the broker can just ignore a
> tag if its id is not recognizable since the client is on a newer version,
> or use some default value / throw exception if a required tag is missing
> since the client is on an older version.
>
>
> >
> > - You don't necessarily need application-level tags explicitly
> represented
> > in the message format for efficiency. The application can define their
> own
> > header (e.g. their message could be a size delimited header followed by a
> > size delimited body). But actually if you use Avro you don't even need
> this
> > I don't think. Avro has the ability to just deserialize the "header"
> fields
> > in your message. Avro has a notion of reader and writer schemas. The
> writer
> > schema is whatever the message was written with. If the reader schema is
> > just the header, avro will skip any fields it doesn't need and just
> > deserialize the fields it does need. This is actually a much more usable
> > and flexible way to define a header since you get all the types avro
> allows
> > instead of just bytes.
> >
>
> I agree that we can use a reader schema to just read out the header without
> de-serializing the full message, and probably for compressed message we can
> add an Avro / etc header for the compressed wrapper message also, but that
> would enforce these applications (MM, auditor, clients) to be schema-aware,
> which would usually require the people who manage this data pipeline also
> manage the schemas, whereas ideally Kafka itself should just consider
> bytes-in and bytes-out (and maybe a little bit more, like timestamps). The
> purpose here is to not introduce an extra dependency while at the same time
> allow applications to not fully de-serialize / de-compress the message in
> order to do some simple processing based on metadata only.
>
>
> >
> > - We will need to think carefully about what to do with timestamps if we
> > end up including them. There are actually several timestamps
> >   - The time the producer created the message
> >   - The time the leader received the message
> >   - The time the current broker received the message
> > The producer timestamps won't be at all increasing. The leader timestamp
> > will be mostly increasing except when the clock changes or leadership
> > moves. This somewhat complicates the use of these timestamps, though.
> From
> > the point of view of the producer the only time that matters is the time
> > the message was created. However since the producer sets this it can be
> > arbitrarily bad (remember all the ntp issues and 1970 timestamps we would
> > get). Say that the heuristic was to use the timestamp of the first
> message
> > in a file for retention, the problem would be that the timestamps for the
> > segments need not even be sequential and a single bad producer could send
> > data with time in the distant past or future causing data to be deleted
> or
> > retained forever. Using the broker timestamp at write time is better,
> > though obvious that would be overwritten when data is mirrored between
> > clusters (the mirror would then have a different time--and if the
> mirroring
> > ever stopped that gap could be large). One approach would be to use the
> > client timestamp but have the broker overwrite it if it is too bad (e.g.
> > off by more than a minute, say).
> >
>
> We would need the reception timestamp (i.e. the third one) for log
> cleaning, and as for the first / second ones, I originally put them as app
> tags since they are likely to be used not by the brokers itself (e.g.
> auditor, etc).
>
>
> >
> > -Jay
> >
> > On Fri, Oct 10, 2014 at 11:21 PM, Joel Koshy <jj...@gmail.com>
> wrote:
> >
> > > Thanks Guozhang! This is an excellent write-up and the approach nicely
> > > consolidates a number of long-standing issues. It would be great if
> > > everyone can review this carefully and give feedback.
> > >
> > > Also, wrt discussion in the past we have used a mix of wiki comments
> > > and the mailing list. Personally, I think it is better to discuss on
> > > the mailing list (for more visibility) and just post a bold link to
> > > the (archived) mailing list thread on the wiki.
> > >
> > > Joel
> > >
> > > On Fri, Oct 10, 2014 at 05:33:52PM -0700, Guozhang Wang wrote:
> > > > Hello all,
> > > >
> > > > I put some thoughts on enhancing our current message metadata format
> to
> > > > solve a bunch of existing issues:
> > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Enriched+Message+Metadata
> > > >
> > > > This wiki page is for kicking off some discussions about the
> > feasibility
> > > of
> > > > adding more info into the message header, and if possible how we
> would
> > > add
> > > > them.
> > > >
> > > > -- Guozhang
> > >
> > >
> >
>
>
>
> --
> -- Guozhang
>

Re: [DISCUSSION] Message Metadata

Posted by Guozhang Wang <wa...@gmail.com>.
Hi Jay,

Thanks for the comments. Replied inline.

Guozhang

On Mon, Oct 13, 2014 at 11:11 AM, Jay Kreps <ja...@gmail.com> wrote:

> I need to take more time to think about this. Here are a few off-the-cuff
> remarks:
>
> - To date we have tried really, really hard to keep the data model for
> message simple since after all you can always add whatever you like inside
> the message body.
>
> - For system tags, why not just make these fields first class fields in
> message? The purpose of a system tag is presumably that Why have a bunch of
> key-value pairs versus first-class fields?
>

Yes, we can alternatively make system tags as first class fields in the
message header to make the format / processing logic simpler.

The main reasons I put them as systems tags are 1) when I think about these
possible system tags, some of them are for all types of messages (e.g.
timestamps), but some of them may be for a specific type of message
(compressed, control message) and for those not all of them are necessarily
required all the time, hence making them as compact tags may save us some
space when not all of them are available; 2) with tags we do not need to
bump up the protocol version every time we make a change to it, which
includes keeping the logic to handle all versions on the broker until the
old ones are officially discarded; instead, the broker can just ignore a
tag if its id is not recognizable since the client is on a newer version,
or use some default value / throw exception if a required tag is missing
since the client is on an older version.


>
> - You don't necessarily need application-level tags explicitly represented
> in the message format for efficiency. The application can define their own
> header (e.g. their message could be a size delimited header followed by a
> size delimited body). But actually if you use Avro you don't even need this
> I don't think. Avro has the ability to just deserialize the "header" fields
> in your message. Avro has a notion of reader and writer schemas. The writer
> schema is whatever the message was written with. If the reader schema is
> just the header, avro will skip any fields it doesn't need and just
> deserialize the fields it does need. This is actually a much more usable
> and flexible way to define a header since you get all the types avro allows
> instead of just bytes.
>

I agree that we can use a reader schema to just read out the header without
de-serializing the full message, and probably for compressed message we can
add an Avro / etc header for the compressed wrapper message also, but that
would enforce these applications (MM, auditor, clients) to be schema-aware,
which would usually require the people who manage this data pipeline also
manage the schemas, whereas ideally Kafka itself should just consider
bytes-in and bytes-out (and maybe a little bit more, like timestamps). The
purpose here is to not introduce an extra dependency while at the same time
allow applications to not fully de-serialize / de-compress the message in
order to do some simple processing based on metadata only.


>
> - We will need to think carefully about what to do with timestamps if we
> end up including them. There are actually several timestamps
>   - The time the producer created the message
>   - The time the leader received the message
>   - The time the current broker received the message
> The producer timestamps won't be at all increasing. The leader timestamp
> will be mostly increasing except when the clock changes or leadership
> moves. This somewhat complicates the use of these timestamps, though. From
> the point of view of the producer the only time that matters is the time
> the message was created. However since the producer sets this it can be
> arbitrarily bad (remember all the ntp issues and 1970 timestamps we would
> get). Say that the heuristic was to use the timestamp of the first message
> in a file for retention, the problem would be that the timestamps for the
> segments need not even be sequential and a single bad producer could send
> data with time in the distant past or future causing data to be deleted or
> retained forever. Using the broker timestamp at write time is better,
> though obvious that would be overwritten when data is mirrored between
> clusters (the mirror would then have a different time--and if the mirroring
> ever stopped that gap could be large). One approach would be to use the
> client timestamp but have the broker overwrite it if it is too bad (e.g.
> off by more than a minute, say).
>

We would need the reception timestamp (i.e. the third one) for log
cleaning, and as for the first / second ones, I originally put them as app
tags since they are likely to be used not by the brokers itself (e.g.
auditor, etc).


>
> -Jay
>
> On Fri, Oct 10, 2014 at 11:21 PM, Joel Koshy <jj...@gmail.com> wrote:
>
> > Thanks Guozhang! This is an excellent write-up and the approach nicely
> > consolidates a number of long-standing issues. It would be great if
> > everyone can review this carefully and give feedback.
> >
> > Also, wrt discussion in the past we have used a mix of wiki comments
> > and the mailing list. Personally, I think it is better to discuss on
> > the mailing list (for more visibility) and just post a bold link to
> > the (archived) mailing list thread on the wiki.
> >
> > Joel
> >
> > On Fri, Oct 10, 2014 at 05:33:52PM -0700, Guozhang Wang wrote:
> > > Hello all,
> > >
> > > I put some thoughts on enhancing our current message metadata format to
> > > solve a bunch of existing issues:
> > >
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Enriched+Message+Metadata
> > >
> > > This wiki page is for kicking off some discussions about the
> feasibility
> > of
> > > adding more info into the message header, and if possible how we would
> > add
> > > them.
> > >
> > > -- Guozhang
> >
> >
>



-- 
-- Guozhang

Re: [DISCUSSION] Message Metadata

Posted by Jay Kreps <ja...@gmail.com>.
I need to take more time to think about this. Here are a few off-the-cuff
remarks:

- To date we have tried really, really hard to keep the data model for
message simple since after all you can always add whatever you like inside
the message body.

- For system tags, why not just make these fields first class fields in
message? The purpose of a system tag is presumably that Why have a bunch of
key-value pairs versus first-class fields?

- You don't necessarily need application-level tags explicitly represented
in the message format for efficiency. The application can define their own
header (e.g. their message could be a size delimited header followed by a
size delimited body). But actually if you use Avro you don't even need this
I don't think. Avro has the ability to just deserialize the "header" fields
in your message. Avro has a notion of reader and writer schemas. The writer
schema is whatever the message was written with. If the reader schema is
just the header, avro will skip any fields it doesn't need and just
deserialize the fields it does need. This is actually a much more usable
and flexible way to define a header since you get all the types avro allows
instead of just bytes.

- We will need to think carefully about what to do with timestamps if we
end up including them. There are actually several timestamps
  - The time the producer created the message
  - The time the leader received the message
  - The time the current broker received the message
The producer timestamps won't be at all increasing. The leader timestamp
will be mostly increasing except when the clock changes or leadership
moves. This somewhat complicates the use of these timestamps, though. From
the point of view of the producer the only time that matters is the time
the message was created. However since the producer sets this it can be
arbitrarily bad (remember all the ntp issues and 1970 timestamps we would
get). Say that the heuristic was to use the timestamp of the first message
in a file for retention, the problem would be that the timestamps for the
segments need not even be sequential and a single bad producer could send
data with time in the distant past or future causing data to be deleted or
retained forever. Using the broker timestamp at write time is better,
though obvious that would be overwritten when data is mirrored between
clusters (the mirror would then have a different time--and if the mirroring
ever stopped that gap could be large). One approach would be to use the
client timestamp but have the broker overwrite it if it is too bad (e.g.
off by more than a minute, say).

-Jay

On Fri, Oct 10, 2014 at 11:21 PM, Joel Koshy <jj...@gmail.com> wrote:

> Thanks Guozhang! This is an excellent write-up and the approach nicely
> consolidates a number of long-standing issues. It would be great if
> everyone can review this carefully and give feedback.
>
> Also, wrt discussion in the past we have used a mix of wiki comments
> and the mailing list. Personally, I think it is better to discuss on
> the mailing list (for more visibility) and just post a bold link to
> the (archived) mailing list thread on the wiki.
>
> Joel
>
> On Fri, Oct 10, 2014 at 05:33:52PM -0700, Guozhang Wang wrote:
> > Hello all,
> >
> > I put some thoughts on enhancing our current message metadata format to
> > solve a bunch of existing issues:
> >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Enriched+Message+Metadata
> >
> > This wiki page is for kicking off some discussions about the feasibility
> of
> > adding more info into the message header, and if possible how we would
> add
> > them.
> >
> > -- Guozhang
>
>

Re: [DISCUSSION] Message Metadata

Posted by Joel Koshy <jj...@gmail.com>.
Thanks Guozhang! This is an excellent write-up and the approach nicely
consolidates a number of long-standing issues. It would be great if
everyone can review this carefully and give feedback.

Also, wrt discussion in the past we have used a mix of wiki comments
and the mailing list. Personally, I think it is better to discuss on
the mailing list (for more visibility) and just post a bold link to
the (archived) mailing list thread on the wiki.

Joel

On Fri, Oct 10, 2014 at 05:33:52PM -0700, Guozhang Wang wrote:
> Hello all,
> 
> I put some thoughts on enhancing our current message metadata format to
> solve a bunch of existing issues:
> 
> https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Enriched+Message+Metadata
> 
> This wiki page is for kicking off some discussions about the feasibility of
> adding more info into the message header, and if possible how we would add
> them.
> 
> -- Guozhang