You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@kafka.apache.org by James Cheng <wu...@gmail.com> on 2021/07/07 04:12:55 UTC

Re: [DISCUSS] KIP-729 Custom validation of records on the broker prior to log append

One use case we would like is to require that producers are sending compressed messages. Would this KIP (or KIP-686) allow the broker to detect that? From looking at both KIPs, it doesn't look it would help with my particular use case. Both of the KIPs are at the Record-level.

Thanks,
-James

> On Jun 30, 2021, at 10:05 AM, Soumyajit Sahu <so...@gmail.com> wrote:
> 
> Hi Nikolay,
> Great to hear that. I'm ok with either one too.
> I had missed noticing the KIP-686. Thanks for bringing it up.
> 
> I have tried to keep this one simple, but hope it can cover all our
> enterprise needs.
> 
> Should we put this one for vote?
> 
> Regards,
> Soumyajit
> 
> 
> On Wed, Jun 30, 2021, 8:50 AM Nikolay Izhikov <ni...@apache.org> wrote:
> 
>> Team, If we have support from committers for API to check records on the
>> broker side let’s choose one KIP to go with and move forward to vote and
>> implementation?
>> I’m ready to drive implementation of this API.
>> 
>> I’m ready to drive the implementation of this API.
>> It seems very useful to me.
>> 
>>> 30 июня 2021 г., в 18:04, Nikolay Izhikov <NI...@gmail.com>
>> написал(а):
>>> 
>>> Hello.
>>> 
>>> I had a very similar proposal [1].
>>> So, yes, I think we should have one implementation of API in the product.
>>> 
>>> [1]
>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-686%3A+API+to+ensure+Records+policy+on+the+broker
>>> 
>>>> 30 июня 2021 г., в 17:57, Christopher Shannon <
>> christopher.l.shannon@gmail.com> написал(а):
>>>> 
>>>> I would find this feature very useful as well as adding custom
>> validation
>>>> to incoming records would be nice to prevent bad data from making it to
>> the
>>>> topic.
>>>> 
>>>> On Wed, Apr 7, 2021 at 7:03 PM Soumyajit Sahu <soumyajit.sahu@gmail.com
>>> 
>>>> wrote:
>>>> 
>>>>> Thanks Colin! Good call on the ApiRecordError. We could use
>>>>> InvalidRecordException instead, and have the broker convert it
>>>>> to ApiRecordError.
>>>>> Modified signature below.
>>>>> 
>>>>> interface BrokerRecordValidator {
>>>>> /**
>>>>>  * Validate the record for a given topic-partition.
>>>>>  */
>>>>>  Optional<InvalidRecordException> validateRecord(TopicPartition
>>>>> topicPartition, ByteBuffer key, ByteBuffer value, Header[] headers);
>>>>> }
>>>>> 
>>>>> On Tue, Apr 6, 2021 at 5:09 PM Colin McCabe <cm...@apache.org>
>> wrote:
>>>>> 
>>>>>> Hi Soumyajit,
>>>>>> 
>>>>>> The difficult thing is deciding which fields to share and how to share
>>>>>> them.  Key and value are probably the minimum we need to make this
>>>>> useful.
>>>>>> If we do choose to go with byte buffer, it is not necessary to also
>> pass
>>>>>> the size, since ByteBuffer maintains that internally.
>>>>>> 
>>>>>> ApiRecordError is also an internal class, so it can't be used in a
>> public
>>>>>> API.  I think most likely if we were going to do this, we would just
>>>>> catch
>>>>>> an exception and use the exception text as the validation error.
>>>>>> 
>>>>>> best,
>>>>>> Colin
>>>>>> 
>>>>>> 
>>>>>> On Tue, Apr 6, 2021, at 15:57, Soumyajit Sahu wrote:
>>>>>>> Hi Tom,
>>>>>>> 
>>>>>>> Makes sense. Thanks for the explanation. I get what Colin had meant
>>>>>> earlier.
>>>>>>> 
>>>>>>> Would a different signature for the interface work? Example below,
>> but
>>>>>>> please feel free to suggest alternatives if there are any
>> possibilities
>>>>>> of
>>>>>>> such.
>>>>>>> 
>>>>>>> If needed, then deprecating this and introducing a new signature
>> would
>>>>> be
>>>>>>> straight-forward as both (old and new) calls could be made serially
>> in
>>>>>> the
>>>>>>> LogValidator allowing a coexistence for a transition period.
>>>>>>> 
>>>>>>> interface BrokerRecordValidator {
>>>>>>>  /**
>>>>>>>   * Validate the record for a given topic-partition.
>>>>>>>   */
>>>>>>>  Optional<ApiRecordError> validateRecord(TopicPartition
>>>>>> topicPartition,
>>>>>>> int keySize, ByteBuffer key, int valueSize, ByteBuffer value,
>> Header[]
>>>>>>> headers);
>>>>>>> }
>>>>>>> 
>>>>>>> 
>>>>>>> On Tue, Apr 6, 2021 at 12:54 AM Tom Bentley <tb...@redhat.com>
>>>>> wrote:
>>>>>>> 
>>>>>>>> Hi Soumyajit,
>>>>>>>> 
>>>>>>>> Although that class does indeed have public access at the Java
>> level,
>>>>>> it
>>>>>>>> does so only because it needs to be used by internal Kafka code
>> which
>>>>>> lives
>>>>>>>> in other packages (there isn't any more restrictive access modifier
>>>>>> which
>>>>>>>> would work). What the project considers public Java API is
>> determined
>>>>>> by
>>>>>>>> what's included in the published Javadocs:
>>>>>>>> https://kafka.apache.org/27/javadoc/index.html, which doesn't
>>>>> include
>>>>>> the
>>>>>>>> org.apache.kafka.common.record package.
>>>>>>>> 
>>>>>>>> One of the problems with making these internal classes public is it
>>>>>> ties
>>>>>>>> the project into supporting them as APIs, which can make changing
>>>>> them
>>>>>> much
>>>>>>>> harder and in the long run that can slow, or even prevent,
>> innovation
>>>>>> in
>>>>>>>> the rest of Kafka.
>>>>>>>> 
>>>>>>>> Kind regards,
>>>>>>>> 
>>>>>>>> Tom
>>>>>>>> 
>>>>>>>> 
>>>>>>>> 
>>>>>>>> On Sun, Apr 4, 2021 at 7:31 PM Soumyajit Sahu <
>>>>>> soumyajit.sahu@gmail.com>
>>>>>>>> wrote:
>>>>>>>> 
>>>>>>>>> Hi Colin,
>>>>>>>>> I see that both the interface "Record" and the implementation
>>>>>>>>> "DefaultRecord" being used in LogValidator.java are public
>>>>>>>>> interfaces/classes.
>>>>>>>>> 
>>>>>>>>> 
>>>>>>>>> 
>>>>>>>> 
>>>>>> 
>>>>> 
>> https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/common/record/Records.java
>>>>>>>>> and
>>>>>>>>> 
>>>>>>>>> 
>>>>>>>> 
>>>>>> 
>>>>> 
>> https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/common/record/DefaultRecord.java
>>>>>>>>> 
>>>>>>>>> So, it should be ok to use them. Let me know what you think.
>>>>>>>>> 
>>>>>>>>> Thanks,
>>>>>>>>> Soumyajit
>>>>>>>>> 
>>>>>>>>> 
>>>>>>>>> On Fri, Apr 2, 2021 at 8:51 AM Colin McCabe <cm...@apache.org>
>>>>>> wrote:
>>>>>>>>> 
>>>>>>>>>> Hi Soumyajit,
>>>>>>>>>> 
>>>>>>>>>> I believe we've had discussions about proposals similar to this
>>>>>> before,
>>>>>>>>>> although I'm having trouble finding one right now.  The issue
>>>>> here
>>>>>> is
>>>>>>>>> that
>>>>>>>>>> Record is a private class -- it is not part of any public API,
>>>>> and
>>>>>> may
>>>>>>>>>> change at any time.  So we can't expose it in public APIs.
>>>>>>>>>> 
>>>>>>>>>> best,
>>>>>>>>>> Colin
>>>>>>>>>> 
>>>>>>>>>> 
>>>>>>>>>> On Thu, Apr 1, 2021, at 14:18, Soumyajit Sahu wrote:
>>>>>>>>>>> Hello All,
>>>>>>>>>>> I would like to start a discussion on the KIP-729.
>>>>>>>>>>> 
>>>>>>>>>>> 
>>>>>>>>>> 
>>>>>>>>> 
>>>>>>>> 
>>>>>> 
>>>>> 
>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-729%3A+Custom+validation+of+records+on+the+broker+prior+to+log+append
>>>>>>>>>>> 
>>>>>>>>>>> Thanks!
>>>>>>>>>>> Soumyajit
>>>>>>>>>>> 
>>>>>>>>>> 
>>>>>>>>> 
>>>>>>>> 
>>>>>>> 
>>>>>> 
>>>>> 
>>> 
>> 
>> 


Re: [DISCUSS] KIP-729 Custom validation of records on the broker prior to log append

Posted by Nikolay Izhikov <ni...@apache.org>.
Dear Kafka commiters.

Let’s have this API in Kafka!

> 2 дек. 2021 г., в 17:19, Christopher Shannon <ch...@gmail.com> написал(а):
> 
> Revisiting this as this has come up for my use case again. Specifically for
> validation I need to be able to validate headers including compressed
> messages. It looks like in LogValidator the messages are already
> decompressed to validate records but the headers get skipped when loaded
> into a partial record. So as part of this change I would think there should
> be a way to read in the headers for validation even if records are
> compressed.
> 
> On Wed, Jul 7, 2021 at 3:30 AM Nikolay Izhikov <ni...@apache.org> wrote:
> 
>> Hello, James.
>> 
>>> One use case we would like is to require that producers are sending
>> compressed messages.
>> 
>> I think that forcing producers to send compressed messages is out of scope
>> of this KIP.
>> 
>> 
>>> 7 июля 2021 г., в 08:48, Soumyajit Sahu <so...@gmail.com>
>> написал(а):
>>> 
>>> Interesting point. You are correct that at least KIP-729 cannot validate
>>> that.
>>> 
>>> We could propose a different KIP for that which could enforce that in the
>>> upper layer. Personally, I would be hesitant to discard the data in that
>>> case, but just use metrics/logs to detect those and inform the producers
>>> about it.
>>> 
>>> 
>>> On Tue, Jul 6, 2021, 9:13 PM James Cheng <wu...@gmail.com> wrote:
>>> 
>>>> One use case we would like is to require that producers are sending
>>>> compressed messages. Would this KIP (or KIP-686) allow the broker to
>> detect
>>>> that? From looking at both KIPs, it doesn't look it would help with my
>>>> particular use case. Both of the KIPs are at the Record-level.
>>>> 
>>>> Thanks,
>>>> -James
>>>> 
>>>>> On Jun 30, 2021, at 10:05 AM, Soumyajit Sahu <soumyajit.sahu@gmail.com
>>> 
>>>> wrote:
>>>>> 
>>>>> Hi Nikolay,
>>>>> Great to hear that. I'm ok with either one too.
>>>>> I had missed noticing the KIP-686. Thanks for bringing it up.
>>>>> 
>>>>> I have tried to keep this one simple, but hope it can cover all our
>>>>> enterprise needs.
>>>>> 
>>>>> Should we put this one for vote?
>>>>> 
>>>>> Regards,
>>>>> Soumyajit
>>>>> 
>>>>> 
>>>>> On Wed, Jun 30, 2021, 8:50 AM Nikolay Izhikov <ni...@apache.org>
>>>> wrote:
>>>>> 
>>>>>> Team, If we have support from committers for API to check records on
>> the
>>>>>> broker side let’s choose one KIP to go with and move forward to vote
>> and
>>>>>> implementation?
>>>>>> I’m ready to drive implementation of this API.
>>>>>> 
>>>>>> I’m ready to drive the implementation of this API.
>>>>>> It seems very useful to me.
>>>>>> 
>>>>>>> 30 июня 2021 г., в 18:04, Nikolay Izhikov <NI...@gmail.com>
>>>>>> написал(а):
>>>>>>> 
>>>>>>> Hello.
>>>>>>> 
>>>>>>> I had a very similar proposal [1].
>>>>>>> So, yes, I think we should have one implementation of API in the
>>>> product.
>>>>>>> 
>>>>>>> [1]
>>>>>> 
>>>> 
>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-686%3A+API+to+ensure+Records+policy+on+the+broker
>>>>>>> 
>>>>>>>> 30 июня 2021 г., в 17:57, Christopher Shannon <
>>>>>> christopher.l.shannon@gmail.com> написал(а):
>>>>>>>> 
>>>>>>>> I would find this feature very useful as well as adding custom
>>>>>> validation
>>>>>>>> to incoming records would be nice to prevent bad data from making it
>>>> to
>>>>>> the
>>>>>>>> topic.
>>>>>>>> 
>>>>>>>> On Wed, Apr 7, 2021 at 7:03 PM Soumyajit Sahu <
>>>> soumyajit.sahu@gmail.com
>>>>>>> 
>>>>>>>> wrote:
>>>>>>>> 
>>>>>>>>> Thanks Colin! Good call on the ApiRecordError. We could use
>>>>>>>>> InvalidRecordException instead, and have the broker convert it
>>>>>>>>> to ApiRecordError.
>>>>>>>>> Modified signature below.
>>>>>>>>> 
>>>>>>>>> interface BrokerRecordValidator {
>>>>>>>>> /**
>>>>>>>>> * Validate the record for a given topic-partition.
>>>>>>>>> */
>>>>>>>>> Optional<InvalidRecordException> validateRecord(TopicPartition
>>>>>>>>> topicPartition, ByteBuffer key, ByteBuffer value, Header[]
>> headers);
>>>>>>>>> }
>>>>>>>>> 
>>>>>>>>> On Tue, Apr 6, 2021 at 5:09 PM Colin McCabe <cm...@apache.org>
>>>>>> wrote:
>>>>>>>>> 
>>>>>>>>>> Hi Soumyajit,
>>>>>>>>>> 
>>>>>>>>>> The difficult thing is deciding which fields to share and how to
>>>> share
>>>>>>>>>> them.  Key and value are probably the minimum we need to make this
>>>>>>>>> useful.
>>>>>>>>>> If we do choose to go with byte buffer, it is not necessary to
>> also
>>>>>> pass
>>>>>>>>>> the size, since ByteBuffer maintains that internally.
>>>>>>>>>> 
>>>>>>>>>> ApiRecordError is also an internal class, so it can't be used in a
>>>>>> public
>>>>>>>>>> API.  I think most likely if we were going to do this, we would
>> just
>>>>>>>>> catch
>>>>>>>>>> an exception and use the exception text as the validation error.
>>>>>>>>>> 
>>>>>>>>>> best,
>>>>>>>>>> Colin
>>>>>>>>>> 
>>>>>>>>>> 
>>>>>>>>>> On Tue, Apr 6, 2021, at 15:57, Soumyajit Sahu wrote:
>>>>>>>>>>> Hi Tom,
>>>>>>>>>>> 
>>>>>>>>>>> Makes sense. Thanks for the explanation. I get what Colin had
>> meant
>>>>>>>>>> earlier.
>>>>>>>>>>> 
>>>>>>>>>>> Would a different signature for the interface work? Example
>> below,
>>>>>> but
>>>>>>>>>>> please feel free to suggest alternatives if there are any
>>>>>> possibilities
>>>>>>>>>> of
>>>>>>>>>>> such.
>>>>>>>>>>> 
>>>>>>>>>>> If needed, then deprecating this and introducing a new signature
>>>>>> would
>>>>>>>>> be
>>>>>>>>>>> straight-forward as both (old and new) calls could be made
>> serially
>>>>>> in
>>>>>>>>>> the
>>>>>>>>>>> LogValidator allowing a coexistence for a transition period.
>>>>>>>>>>> 
>>>>>>>>>>> interface BrokerRecordValidator {
>>>>>>>>>>> /**
>>>>>>>>>>> * Validate the record for a given topic-partition.
>>>>>>>>>>> */
>>>>>>>>>>> Optional<ApiRecordError> validateRecord(TopicPartition
>>>>>>>>>> topicPartition,
>>>>>>>>>>> int keySize, ByteBuffer key, int valueSize, ByteBuffer value,
>>>>>> Header[]
>>>>>>>>>>> headers);
>>>>>>>>>>> }
>>>>>>>>>>> 
>>>>>>>>>>> 
>>>>>>>>>>> On Tue, Apr 6, 2021 at 12:54 AM Tom Bentley <tbentley@redhat.com
>>> 
>>>>>>>>> wrote:
>>>>>>>>>>> 
>>>>>>>>>>>> Hi Soumyajit,
>>>>>>>>>>>> 
>>>>>>>>>>>> Although that class does indeed have public access at the Java
>>>>>> level,
>>>>>>>>>> it
>>>>>>>>>>>> does so only because it needs to be used by internal Kafka code
>>>>>> which
>>>>>>>>>> lives
>>>>>>>>>>>> in other packages (there isn't any more restrictive access
>>>> modifier
>>>>>>>>>> which
>>>>>>>>>>>> would work). What the project considers public Java API is
>>>>>> determined
>>>>>>>>>> by
>>>>>>>>>>>> what's included in the published Javadocs:
>>>>>>>>>>>> https://kafka.apache.org/27/javadoc/index.html, which doesn't
>>>>>>>>> include
>>>>>>>>>> the
>>>>>>>>>>>> org.apache.kafka.common.record package.
>>>>>>>>>>>> 
>>>>>>>>>>>> One of the problems with making these internal classes public is
>>>> it
>>>>>>>>>> ties
>>>>>>>>>>>> the project into supporting them as APIs, which can make
>> changing
>>>>>>>>> them
>>>>>>>>>> much
>>>>>>>>>>>> harder and in the long run that can slow, or even prevent,
>>>>>> innovation
>>>>>>>>>> in
>>>>>>>>>>>> the rest of Kafka.
>>>>>>>>>>>> 
>>>>>>>>>>>> Kind regards,
>>>>>>>>>>>> 
>>>>>>>>>>>> Tom
>>>>>>>>>>>> 
>>>>>>>>>>>> 
>>>>>>>>>>>> 
>>>>>>>>>>>> On Sun, Apr 4, 2021 at 7:31 PM Soumyajit Sahu <
>>>>>>>>>> soumyajit.sahu@gmail.com>
>>>>>>>>>>>> wrote:
>>>>>>>>>>>> 
>>>>>>>>>>>>> Hi Colin,
>>>>>>>>>>>>> I see that both the interface "Record" and the implementation
>>>>>>>>>>>>> "DefaultRecord" being used in LogValidator.java are public
>>>>>>>>>>>>> interfaces/classes.
>>>>>>>>>>>>> 
>>>>>>>>>>>>> 
>>>>>>>>>>>>> 
>>>>>>>>>>>> 
>>>>>>>>>> 
>>>>>>>>> 
>>>>>> 
>>>> 
>> https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/common/record/Records.java
>>>>>>>>>>>>> and
>>>>>>>>>>>>> 
>>>>>>>>>>>>> 
>>>>>>>>>>>> 
>>>>>>>>>> 
>>>>>>>>> 
>>>>>> 
>>>> 
>> https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/common/record/DefaultRecord.java
>>>>>>>>>>>>> 
>>>>>>>>>>>>> So, it should be ok to use them. Let me know what you think.
>>>>>>>>>>>>> 
>>>>>>>>>>>>> Thanks,
>>>>>>>>>>>>> Soumyajit
>>>>>>>>>>>>> 
>>>>>>>>>>>>> 
>>>>>>>>>>>>> On Fri, Apr 2, 2021 at 8:51 AM Colin McCabe <
>> cmccabe@apache.org>
>>>>>>>>>> wrote:
>>>>>>>>>>>>> 
>>>>>>>>>>>>>> Hi Soumyajit,
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> I believe we've had discussions about proposals similar to
>> this
>>>>>>>>>> before,
>>>>>>>>>>>>>> although I'm having trouble finding one right now.  The issue
>>>>>>>>> here
>>>>>>>>>> is
>>>>>>>>>>>>> that
>>>>>>>>>>>>>> Record is a private class -- it is not part of any public API,
>>>>>>>>> and
>>>>>>>>>> may
>>>>>>>>>>>>>> change at any time.  So we can't expose it in public APIs.
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> best,
>>>>>>>>>>>>>> Colin
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> On Thu, Apr 1, 2021, at 14:18, Soumyajit Sahu wrote:
>>>>>>>>>>>>>>> Hello All,
>>>>>>>>>>>>>>> I would like to start a discussion on the KIP-729.
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> 
>>>>>>>>>>>>> 
>>>>>>>>>>>> 
>>>>>>>>>> 
>>>>>>>>> 
>>>>>> 
>>>> 
>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-729%3A+Custom+validation+of+records+on+the+broker+prior+to+log+append
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> Thanks!
>>>>>>>>>>>>>>> Soumyajit
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> 
>>>>>>>>>>>>> 
>>>>>>>>>>>> 
>>>>>>>>>>> 
>>>>>>>>>> 
>>>>>>>>> 
>>>>>>> 
>>>>>> 
>>>>>> 
>>>> 
>>>> 
>> 
>> 


Re: [DISCUSS] KIP-729 Custom validation of records on the broker prior to log append

Posted by Christopher Shannon <ch...@gmail.com>.
Revisiting this as this has come up for my use case again. Specifically for
validation I need to be able to validate headers including compressed
messages. It looks like in LogValidator the messages are already
decompressed to validate records but the headers get skipped when loaded
into a partial record. So as part of this change I would think there should
be a way to read in the headers for validation even if records are
compressed.

On Wed, Jul 7, 2021 at 3:30 AM Nikolay Izhikov <ni...@apache.org> wrote:

> Hello, James.
>
> > One use case we would like is to require that producers are sending
> compressed messages.
>
> I think that forcing producers to send compressed messages is out of scope
> of this KIP.
>
>
> > 7 июля 2021 г., в 08:48, Soumyajit Sahu <so...@gmail.com>
> написал(а):
> >
> > Interesting point. You are correct that at least KIP-729 cannot validate
> > that.
> >
> > We could propose a different KIP for that which could enforce that in the
> > upper layer. Personally, I would be hesitant to discard the data in that
> > case, but just use metrics/logs to detect those and inform the producers
> > about it.
> >
> >
> > On Tue, Jul 6, 2021, 9:13 PM James Cheng <wu...@gmail.com> wrote:
> >
> >> One use case we would like is to require that producers are sending
> >> compressed messages. Would this KIP (or KIP-686) allow the broker to
> detect
> >> that? From looking at both KIPs, it doesn't look it would help with my
> >> particular use case. Both of the KIPs are at the Record-level.
> >>
> >> Thanks,
> >> -James
> >>
> >>> On Jun 30, 2021, at 10:05 AM, Soumyajit Sahu <soumyajit.sahu@gmail.com
> >
> >> wrote:
> >>>
> >>> Hi Nikolay,
> >>> Great to hear that. I'm ok with either one too.
> >>> I had missed noticing the KIP-686. Thanks for bringing it up.
> >>>
> >>> I have tried to keep this one simple, but hope it can cover all our
> >>> enterprise needs.
> >>>
> >>> Should we put this one for vote?
> >>>
> >>> Regards,
> >>> Soumyajit
> >>>
> >>>
> >>> On Wed, Jun 30, 2021, 8:50 AM Nikolay Izhikov <ni...@apache.org>
> >> wrote:
> >>>
> >>>> Team, If we have support from committers for API to check records on
> the
> >>>> broker side let’s choose one KIP to go with and move forward to vote
> and
> >>>> implementation?
> >>>> I’m ready to drive implementation of this API.
> >>>>
> >>>> I’m ready to drive the implementation of this API.
> >>>> It seems very useful to me.
> >>>>
> >>>>> 30 июня 2021 г., в 18:04, Nikolay Izhikov <NI...@gmail.com>
> >>>> написал(а):
> >>>>>
> >>>>> Hello.
> >>>>>
> >>>>> I had a very similar proposal [1].
> >>>>> So, yes, I think we should have one implementation of API in the
> >> product.
> >>>>>
> >>>>> [1]
> >>>>
> >>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-686%3A+API+to+ensure+Records+policy+on+the+broker
> >>>>>
> >>>>>> 30 июня 2021 г., в 17:57, Christopher Shannon <
> >>>> christopher.l.shannon@gmail.com> написал(а):
> >>>>>>
> >>>>>> I would find this feature very useful as well as adding custom
> >>>> validation
> >>>>>> to incoming records would be nice to prevent bad data from making it
> >> to
> >>>> the
> >>>>>> topic.
> >>>>>>
> >>>>>> On Wed, Apr 7, 2021 at 7:03 PM Soumyajit Sahu <
> >> soumyajit.sahu@gmail.com
> >>>>>
> >>>>>> wrote:
> >>>>>>
> >>>>>>> Thanks Colin! Good call on the ApiRecordError. We could use
> >>>>>>> InvalidRecordException instead, and have the broker convert it
> >>>>>>> to ApiRecordError.
> >>>>>>> Modified signature below.
> >>>>>>>
> >>>>>>> interface BrokerRecordValidator {
> >>>>>>> /**
> >>>>>>> * Validate the record for a given topic-partition.
> >>>>>>> */
> >>>>>>> Optional<InvalidRecordException> validateRecord(TopicPartition
> >>>>>>> topicPartition, ByteBuffer key, ByteBuffer value, Header[]
> headers);
> >>>>>>> }
> >>>>>>>
> >>>>>>> On Tue, Apr 6, 2021 at 5:09 PM Colin McCabe <cm...@apache.org>
> >>>> wrote:
> >>>>>>>
> >>>>>>>> Hi Soumyajit,
> >>>>>>>>
> >>>>>>>> The difficult thing is deciding which fields to share and how to
> >> share
> >>>>>>>> them.  Key and value are probably the minimum we need to make this
> >>>>>>> useful.
> >>>>>>>> If we do choose to go with byte buffer, it is not necessary to
> also
> >>>> pass
> >>>>>>>> the size, since ByteBuffer maintains that internally.
> >>>>>>>>
> >>>>>>>> ApiRecordError is also an internal class, so it can't be used in a
> >>>> public
> >>>>>>>> API.  I think most likely if we were going to do this, we would
> just
> >>>>>>> catch
> >>>>>>>> an exception and use the exception text as the validation error.
> >>>>>>>>
> >>>>>>>> best,
> >>>>>>>> Colin
> >>>>>>>>
> >>>>>>>>
> >>>>>>>> On Tue, Apr 6, 2021, at 15:57, Soumyajit Sahu wrote:
> >>>>>>>>> Hi Tom,
> >>>>>>>>>
> >>>>>>>>> Makes sense. Thanks for the explanation. I get what Colin had
> meant
> >>>>>>>> earlier.
> >>>>>>>>>
> >>>>>>>>> Would a different signature for the interface work? Example
> below,
> >>>> but
> >>>>>>>>> please feel free to suggest alternatives if there are any
> >>>> possibilities
> >>>>>>>> of
> >>>>>>>>> such.
> >>>>>>>>>
> >>>>>>>>> If needed, then deprecating this and introducing a new signature
> >>>> would
> >>>>>>> be
> >>>>>>>>> straight-forward as both (old and new) calls could be made
> serially
> >>>> in
> >>>>>>>> the
> >>>>>>>>> LogValidator allowing a coexistence for a transition period.
> >>>>>>>>>
> >>>>>>>>> interface BrokerRecordValidator {
> >>>>>>>>> /**
> >>>>>>>>>  * Validate the record for a given topic-partition.
> >>>>>>>>>  */
> >>>>>>>>> Optional<ApiRecordError> validateRecord(TopicPartition
> >>>>>>>> topicPartition,
> >>>>>>>>> int keySize, ByteBuffer key, int valueSize, ByteBuffer value,
> >>>> Header[]
> >>>>>>>>> headers);
> >>>>>>>>> }
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>> On Tue, Apr 6, 2021 at 12:54 AM Tom Bentley <tbentley@redhat.com
> >
> >>>>>>> wrote:
> >>>>>>>>>
> >>>>>>>>>> Hi Soumyajit,
> >>>>>>>>>>
> >>>>>>>>>> Although that class does indeed have public access at the Java
> >>>> level,
> >>>>>>>> it
> >>>>>>>>>> does so only because it needs to be used by internal Kafka code
> >>>> which
> >>>>>>>> lives
> >>>>>>>>>> in other packages (there isn't any more restrictive access
> >> modifier
> >>>>>>>> which
> >>>>>>>>>> would work). What the project considers public Java API is
> >>>> determined
> >>>>>>>> by
> >>>>>>>>>> what's included in the published Javadocs:
> >>>>>>>>>> https://kafka.apache.org/27/javadoc/index.html, which doesn't
> >>>>>>> include
> >>>>>>>> the
> >>>>>>>>>> org.apache.kafka.common.record package.
> >>>>>>>>>>
> >>>>>>>>>> One of the problems with making these internal classes public is
> >> it
> >>>>>>>> ties
> >>>>>>>>>> the project into supporting them as APIs, which can make
> changing
> >>>>>>> them
> >>>>>>>> much
> >>>>>>>>>> harder and in the long run that can slow, or even prevent,
> >>>> innovation
> >>>>>>>> in
> >>>>>>>>>> the rest of Kafka.
> >>>>>>>>>>
> >>>>>>>>>> Kind regards,
> >>>>>>>>>>
> >>>>>>>>>> Tom
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>> On Sun, Apr 4, 2021 at 7:31 PM Soumyajit Sahu <
> >>>>>>>> soumyajit.sahu@gmail.com>
> >>>>>>>>>> wrote:
> >>>>>>>>>>
> >>>>>>>>>>> Hi Colin,
> >>>>>>>>>>> I see that both the interface "Record" and the implementation
> >>>>>>>>>>> "DefaultRecord" being used in LogValidator.java are public
> >>>>>>>>>>> interfaces/classes.
> >>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>
> >>>>>>>
> >>>>
> >>
> https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/common/record/Records.java
> >>>>>>>>>>> and
> >>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>
> >>>>>>>
> >>>>
> >>
> https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/common/record/DefaultRecord.java
> >>>>>>>>>>>
> >>>>>>>>>>> So, it should be ok to use them. Let me know what you think.
> >>>>>>>>>>>
> >>>>>>>>>>> Thanks,
> >>>>>>>>>>> Soumyajit
> >>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>> On Fri, Apr 2, 2021 at 8:51 AM Colin McCabe <
> cmccabe@apache.org>
> >>>>>>>> wrote:
> >>>>>>>>>>>
> >>>>>>>>>>>> Hi Soumyajit,
> >>>>>>>>>>>>
> >>>>>>>>>>>> I believe we've had discussions about proposals similar to
> this
> >>>>>>>> before,
> >>>>>>>>>>>> although I'm having trouble finding one right now.  The issue
> >>>>>>> here
> >>>>>>>> is
> >>>>>>>>>>> that
> >>>>>>>>>>>> Record is a private class -- it is not part of any public API,
> >>>>>>> and
> >>>>>>>> may
> >>>>>>>>>>>> change at any time.  So we can't expose it in public APIs.
> >>>>>>>>>>>>
> >>>>>>>>>>>> best,
> >>>>>>>>>>>> Colin
> >>>>>>>>>>>>
> >>>>>>>>>>>>
> >>>>>>>>>>>> On Thu, Apr 1, 2021, at 14:18, Soumyajit Sahu wrote:
> >>>>>>>>>>>>> Hello All,
> >>>>>>>>>>>>> I would like to start a discussion on the KIP-729.
> >>>>>>>>>>>>>
> >>>>>>>>>>>>>
> >>>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>
> >>>>>>>
> >>>>
> >>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-729%3A+Custom+validation+of+records+on+the+broker+prior+to+log+append
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> Thanks!
> >>>>>>>>>>>>> Soumyajit
> >>>>>>>>>>>>>
> >>>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>
> >>>>>>>>
> >>>>>>>
> >>>>>
> >>>>
> >>>>
> >>
> >>
>
>

Re: [DISCUSS] KIP-729 Custom validation of records on the broker prior to log append

Posted by Nikolay Izhikov <ni...@apache.org>.
Hello, James.

> One use case we would like is to require that producers are sending compressed messages.

I think that forcing producers to send compressed messages is out of scope of this KIP.


> 7 июля 2021 г., в 08:48, Soumyajit Sahu <so...@gmail.com> написал(а):
> 
> Interesting point. You are correct that at least KIP-729 cannot validate
> that.
> 
> We could propose a different KIP for that which could enforce that in the
> upper layer. Personally, I would be hesitant to discard the data in that
> case, but just use metrics/logs to detect those and inform the producers
> about it.
> 
> 
> On Tue, Jul 6, 2021, 9:13 PM James Cheng <wu...@gmail.com> wrote:
> 
>> One use case we would like is to require that producers are sending
>> compressed messages. Would this KIP (or KIP-686) allow the broker to detect
>> that? From looking at both KIPs, it doesn't look it would help with my
>> particular use case. Both of the KIPs are at the Record-level.
>> 
>> Thanks,
>> -James
>> 
>>> On Jun 30, 2021, at 10:05 AM, Soumyajit Sahu <so...@gmail.com>
>> wrote:
>>> 
>>> Hi Nikolay,
>>> Great to hear that. I'm ok with either one too.
>>> I had missed noticing the KIP-686. Thanks for bringing it up.
>>> 
>>> I have tried to keep this one simple, but hope it can cover all our
>>> enterprise needs.
>>> 
>>> Should we put this one for vote?
>>> 
>>> Regards,
>>> Soumyajit
>>> 
>>> 
>>> On Wed, Jun 30, 2021, 8:50 AM Nikolay Izhikov <ni...@apache.org>
>> wrote:
>>> 
>>>> Team, If we have support from committers for API to check records on the
>>>> broker side let’s choose one KIP to go with and move forward to vote and
>>>> implementation?
>>>> I’m ready to drive implementation of this API.
>>>> 
>>>> I’m ready to drive the implementation of this API.
>>>> It seems very useful to me.
>>>> 
>>>>> 30 июня 2021 г., в 18:04, Nikolay Izhikov <NI...@gmail.com>
>>>> написал(а):
>>>>> 
>>>>> Hello.
>>>>> 
>>>>> I had a very similar proposal [1].
>>>>> So, yes, I think we should have one implementation of API in the
>> product.
>>>>> 
>>>>> [1]
>>>> 
>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-686%3A+API+to+ensure+Records+policy+on+the+broker
>>>>> 
>>>>>> 30 июня 2021 г., в 17:57, Christopher Shannon <
>>>> christopher.l.shannon@gmail.com> написал(а):
>>>>>> 
>>>>>> I would find this feature very useful as well as adding custom
>>>> validation
>>>>>> to incoming records would be nice to prevent bad data from making it
>> to
>>>> the
>>>>>> topic.
>>>>>> 
>>>>>> On Wed, Apr 7, 2021 at 7:03 PM Soumyajit Sahu <
>> soumyajit.sahu@gmail.com
>>>>> 
>>>>>> wrote:
>>>>>> 
>>>>>>> Thanks Colin! Good call on the ApiRecordError. We could use
>>>>>>> InvalidRecordException instead, and have the broker convert it
>>>>>>> to ApiRecordError.
>>>>>>> Modified signature below.
>>>>>>> 
>>>>>>> interface BrokerRecordValidator {
>>>>>>> /**
>>>>>>> * Validate the record for a given topic-partition.
>>>>>>> */
>>>>>>> Optional<InvalidRecordException> validateRecord(TopicPartition
>>>>>>> topicPartition, ByteBuffer key, ByteBuffer value, Header[] headers);
>>>>>>> }
>>>>>>> 
>>>>>>> On Tue, Apr 6, 2021 at 5:09 PM Colin McCabe <cm...@apache.org>
>>>> wrote:
>>>>>>> 
>>>>>>>> Hi Soumyajit,
>>>>>>>> 
>>>>>>>> The difficult thing is deciding which fields to share and how to
>> share
>>>>>>>> them.  Key and value are probably the minimum we need to make this
>>>>>>> useful.
>>>>>>>> If we do choose to go with byte buffer, it is not necessary to also
>>>> pass
>>>>>>>> the size, since ByteBuffer maintains that internally.
>>>>>>>> 
>>>>>>>> ApiRecordError is also an internal class, so it can't be used in a
>>>> public
>>>>>>>> API.  I think most likely if we were going to do this, we would just
>>>>>>> catch
>>>>>>>> an exception and use the exception text as the validation error.
>>>>>>>> 
>>>>>>>> best,
>>>>>>>> Colin
>>>>>>>> 
>>>>>>>> 
>>>>>>>> On Tue, Apr 6, 2021, at 15:57, Soumyajit Sahu wrote:
>>>>>>>>> Hi Tom,
>>>>>>>>> 
>>>>>>>>> Makes sense. Thanks for the explanation. I get what Colin had meant
>>>>>>>> earlier.
>>>>>>>>> 
>>>>>>>>> Would a different signature for the interface work? Example below,
>>>> but
>>>>>>>>> please feel free to suggest alternatives if there are any
>>>> possibilities
>>>>>>>> of
>>>>>>>>> such.
>>>>>>>>> 
>>>>>>>>> If needed, then deprecating this and introducing a new signature
>>>> would
>>>>>>> be
>>>>>>>>> straight-forward as both (old and new) calls could be made serially
>>>> in
>>>>>>>> the
>>>>>>>>> LogValidator allowing a coexistence for a transition period.
>>>>>>>>> 
>>>>>>>>> interface BrokerRecordValidator {
>>>>>>>>> /**
>>>>>>>>>  * Validate the record for a given topic-partition.
>>>>>>>>>  */
>>>>>>>>> Optional<ApiRecordError> validateRecord(TopicPartition
>>>>>>>> topicPartition,
>>>>>>>>> int keySize, ByteBuffer key, int valueSize, ByteBuffer value,
>>>> Header[]
>>>>>>>>> headers);
>>>>>>>>> }
>>>>>>>>> 
>>>>>>>>> 
>>>>>>>>> On Tue, Apr 6, 2021 at 12:54 AM Tom Bentley <tb...@redhat.com>
>>>>>>> wrote:
>>>>>>>>> 
>>>>>>>>>> Hi Soumyajit,
>>>>>>>>>> 
>>>>>>>>>> Although that class does indeed have public access at the Java
>>>> level,
>>>>>>>> it
>>>>>>>>>> does so only because it needs to be used by internal Kafka code
>>>> which
>>>>>>>> lives
>>>>>>>>>> in other packages (there isn't any more restrictive access
>> modifier
>>>>>>>> which
>>>>>>>>>> would work). What the project considers public Java API is
>>>> determined
>>>>>>>> by
>>>>>>>>>> what's included in the published Javadocs:
>>>>>>>>>> https://kafka.apache.org/27/javadoc/index.html, which doesn't
>>>>>>> include
>>>>>>>> the
>>>>>>>>>> org.apache.kafka.common.record package.
>>>>>>>>>> 
>>>>>>>>>> One of the problems with making these internal classes public is
>> it
>>>>>>>> ties
>>>>>>>>>> the project into supporting them as APIs, which can make changing
>>>>>>> them
>>>>>>>> much
>>>>>>>>>> harder and in the long run that can slow, or even prevent,
>>>> innovation
>>>>>>>> in
>>>>>>>>>> the rest of Kafka.
>>>>>>>>>> 
>>>>>>>>>> Kind regards,
>>>>>>>>>> 
>>>>>>>>>> Tom
>>>>>>>>>> 
>>>>>>>>>> 
>>>>>>>>>> 
>>>>>>>>>> On Sun, Apr 4, 2021 at 7:31 PM Soumyajit Sahu <
>>>>>>>> soumyajit.sahu@gmail.com>
>>>>>>>>>> wrote:
>>>>>>>>>> 
>>>>>>>>>>> Hi Colin,
>>>>>>>>>>> I see that both the interface "Record" and the implementation
>>>>>>>>>>> "DefaultRecord" being used in LogValidator.java are public
>>>>>>>>>>> interfaces/classes.
>>>>>>>>>>> 
>>>>>>>>>>> 
>>>>>>>>>>> 
>>>>>>>>>> 
>>>>>>>> 
>>>>>>> 
>>>> 
>> https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/common/record/Records.java
>>>>>>>>>>> and
>>>>>>>>>>> 
>>>>>>>>>>> 
>>>>>>>>>> 
>>>>>>>> 
>>>>>>> 
>>>> 
>> https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/common/record/DefaultRecord.java
>>>>>>>>>>> 
>>>>>>>>>>> So, it should be ok to use them. Let me know what you think.
>>>>>>>>>>> 
>>>>>>>>>>> Thanks,
>>>>>>>>>>> Soumyajit
>>>>>>>>>>> 
>>>>>>>>>>> 
>>>>>>>>>>> On Fri, Apr 2, 2021 at 8:51 AM Colin McCabe <cm...@apache.org>
>>>>>>>> wrote:
>>>>>>>>>>> 
>>>>>>>>>>>> Hi Soumyajit,
>>>>>>>>>>>> 
>>>>>>>>>>>> I believe we've had discussions about proposals similar to this
>>>>>>>> before,
>>>>>>>>>>>> although I'm having trouble finding one right now.  The issue
>>>>>>> here
>>>>>>>> is
>>>>>>>>>>> that
>>>>>>>>>>>> Record is a private class -- it is not part of any public API,
>>>>>>> and
>>>>>>>> may
>>>>>>>>>>>> change at any time.  So we can't expose it in public APIs.
>>>>>>>>>>>> 
>>>>>>>>>>>> best,
>>>>>>>>>>>> Colin
>>>>>>>>>>>> 
>>>>>>>>>>>> 
>>>>>>>>>>>> On Thu, Apr 1, 2021, at 14:18, Soumyajit Sahu wrote:
>>>>>>>>>>>>> Hello All,
>>>>>>>>>>>>> I would like to start a discussion on the KIP-729.
>>>>>>>>>>>>> 
>>>>>>>>>>>>> 
>>>>>>>>>>>> 
>>>>>>>>>>> 
>>>>>>>>>> 
>>>>>>>> 
>>>>>>> 
>>>> 
>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-729%3A+Custom+validation+of+records+on+the+broker+prior+to+log+append
>>>>>>>>>>>>> 
>>>>>>>>>>>>> Thanks!
>>>>>>>>>>>>> Soumyajit
>>>>>>>>>>>>> 
>>>>>>>>>>>> 
>>>>>>>>>>> 
>>>>>>>>>> 
>>>>>>>>> 
>>>>>>>> 
>>>>>>> 
>>>>> 
>>>> 
>>>> 
>> 
>> 


Re: [DISCUSS] KIP-729 Custom validation of records on the broker prior to log append

Posted by Soumyajit Sahu <so...@gmail.com>.
Interesting point. You are correct that at least KIP-729 cannot validate
that.

We could propose a different KIP for that which could enforce that in the
upper layer. Personally, I would be hesitant to discard the data in that
case, but just use metrics/logs to detect those and inform the producers
about it.


On Tue, Jul 6, 2021, 9:13 PM James Cheng <wu...@gmail.com> wrote:

> One use case we would like is to require that producers are sending
> compressed messages. Would this KIP (or KIP-686) allow the broker to detect
> that? From looking at both KIPs, it doesn't look it would help with my
> particular use case. Both of the KIPs are at the Record-level.
>
> Thanks,
> -James
>
> > On Jun 30, 2021, at 10:05 AM, Soumyajit Sahu <so...@gmail.com>
> wrote:
> >
> > Hi Nikolay,
> > Great to hear that. I'm ok with either one too.
> > I had missed noticing the KIP-686. Thanks for bringing it up.
> >
> > I have tried to keep this one simple, but hope it can cover all our
> > enterprise needs.
> >
> > Should we put this one for vote?
> >
> > Regards,
> > Soumyajit
> >
> >
> > On Wed, Jun 30, 2021, 8:50 AM Nikolay Izhikov <ni...@apache.org>
> wrote:
> >
> >> Team, If we have support from committers for API to check records on the
> >> broker side let’s choose one KIP to go with and move forward to vote and
> >> implementation?
> >> I’m ready to drive implementation of this API.
> >>
> >> I’m ready to drive the implementation of this API.
> >> It seems very useful to me.
> >>
> >>> 30 июня 2021 г., в 18:04, Nikolay Izhikov <NI...@gmail.com>
> >> написал(а):
> >>>
> >>> Hello.
> >>>
> >>> I had a very similar proposal [1].
> >>> So, yes, I think we should have one implementation of API in the
> product.
> >>>
> >>> [1]
> >>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-686%3A+API+to+ensure+Records+policy+on+the+broker
> >>>
> >>>> 30 июня 2021 г., в 17:57, Christopher Shannon <
> >> christopher.l.shannon@gmail.com> написал(а):
> >>>>
> >>>> I would find this feature very useful as well as adding custom
> >> validation
> >>>> to incoming records would be nice to prevent bad data from making it
> to
> >> the
> >>>> topic.
> >>>>
> >>>> On Wed, Apr 7, 2021 at 7:03 PM Soumyajit Sahu <
> soumyajit.sahu@gmail.com
> >>>
> >>>> wrote:
> >>>>
> >>>>> Thanks Colin! Good call on the ApiRecordError. We could use
> >>>>> InvalidRecordException instead, and have the broker convert it
> >>>>> to ApiRecordError.
> >>>>> Modified signature below.
> >>>>>
> >>>>> interface BrokerRecordValidator {
> >>>>> /**
> >>>>>  * Validate the record for a given topic-partition.
> >>>>>  */
> >>>>>  Optional<InvalidRecordException> validateRecord(TopicPartition
> >>>>> topicPartition, ByteBuffer key, ByteBuffer value, Header[] headers);
> >>>>> }
> >>>>>
> >>>>> On Tue, Apr 6, 2021 at 5:09 PM Colin McCabe <cm...@apache.org>
> >> wrote:
> >>>>>
> >>>>>> Hi Soumyajit,
> >>>>>>
> >>>>>> The difficult thing is deciding which fields to share and how to
> share
> >>>>>> them.  Key and value are probably the minimum we need to make this
> >>>>> useful.
> >>>>>> If we do choose to go with byte buffer, it is not necessary to also
> >> pass
> >>>>>> the size, since ByteBuffer maintains that internally.
> >>>>>>
> >>>>>> ApiRecordError is also an internal class, so it can't be used in a
> >> public
> >>>>>> API.  I think most likely if we were going to do this, we would just
> >>>>> catch
> >>>>>> an exception and use the exception text as the validation error.
> >>>>>>
> >>>>>> best,
> >>>>>> Colin
> >>>>>>
> >>>>>>
> >>>>>> On Tue, Apr 6, 2021, at 15:57, Soumyajit Sahu wrote:
> >>>>>>> Hi Tom,
> >>>>>>>
> >>>>>>> Makes sense. Thanks for the explanation. I get what Colin had meant
> >>>>>> earlier.
> >>>>>>>
> >>>>>>> Would a different signature for the interface work? Example below,
> >> but
> >>>>>>> please feel free to suggest alternatives if there are any
> >> possibilities
> >>>>>> of
> >>>>>>> such.
> >>>>>>>
> >>>>>>> If needed, then deprecating this and introducing a new signature
> >> would
> >>>>> be
> >>>>>>> straight-forward as both (old and new) calls could be made serially
> >> in
> >>>>>> the
> >>>>>>> LogValidator allowing a coexistence for a transition period.
> >>>>>>>
> >>>>>>> interface BrokerRecordValidator {
> >>>>>>>  /**
> >>>>>>>   * Validate the record for a given topic-partition.
> >>>>>>>   */
> >>>>>>>  Optional<ApiRecordError> validateRecord(TopicPartition
> >>>>>> topicPartition,
> >>>>>>> int keySize, ByteBuffer key, int valueSize, ByteBuffer value,
> >> Header[]
> >>>>>>> headers);
> >>>>>>> }
> >>>>>>>
> >>>>>>>
> >>>>>>> On Tue, Apr 6, 2021 at 12:54 AM Tom Bentley <tb...@redhat.com>
> >>>>> wrote:
> >>>>>>>
> >>>>>>>> Hi Soumyajit,
> >>>>>>>>
> >>>>>>>> Although that class does indeed have public access at the Java
> >> level,
> >>>>>> it
> >>>>>>>> does so only because it needs to be used by internal Kafka code
> >> which
> >>>>>> lives
> >>>>>>>> in other packages (there isn't any more restrictive access
> modifier
> >>>>>> which
> >>>>>>>> would work). What the project considers public Java API is
> >> determined
> >>>>>> by
> >>>>>>>> what's included in the published Javadocs:
> >>>>>>>> https://kafka.apache.org/27/javadoc/index.html, which doesn't
> >>>>> include
> >>>>>> the
> >>>>>>>> org.apache.kafka.common.record package.
> >>>>>>>>
> >>>>>>>> One of the problems with making these internal classes public is
> it
> >>>>>> ties
> >>>>>>>> the project into supporting them as APIs, which can make changing
> >>>>> them
> >>>>>> much
> >>>>>>>> harder and in the long run that can slow, or even prevent,
> >> innovation
> >>>>>> in
> >>>>>>>> the rest of Kafka.
> >>>>>>>>
> >>>>>>>> Kind regards,
> >>>>>>>>
> >>>>>>>> Tom
> >>>>>>>>
> >>>>>>>>
> >>>>>>>>
> >>>>>>>> On Sun, Apr 4, 2021 at 7:31 PM Soumyajit Sahu <
> >>>>>> soumyajit.sahu@gmail.com>
> >>>>>>>> wrote:
> >>>>>>>>
> >>>>>>>>> Hi Colin,
> >>>>>>>>> I see that both the interface "Record" and the implementation
> >>>>>>>>> "DefaultRecord" being used in LogValidator.java are public
> >>>>>>>>> interfaces/classes.
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>
> >>>>>>
> >>>>>
> >>
> https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/common/record/Records.java
> >>>>>>>>> and
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>
> >>>>>>
> >>>>>
> >>
> https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/common/record/DefaultRecord.java
> >>>>>>>>>
> >>>>>>>>> So, it should be ok to use them. Let me know what you think.
> >>>>>>>>>
> >>>>>>>>> Thanks,
> >>>>>>>>> Soumyajit
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>> On Fri, Apr 2, 2021 at 8:51 AM Colin McCabe <cm...@apache.org>
> >>>>>> wrote:
> >>>>>>>>>
> >>>>>>>>>> Hi Soumyajit,
> >>>>>>>>>>
> >>>>>>>>>> I believe we've had discussions about proposals similar to this
> >>>>>> before,
> >>>>>>>>>> although I'm having trouble finding one right now.  The issue
> >>>>> here
> >>>>>> is
> >>>>>>>>> that
> >>>>>>>>>> Record is a private class -- it is not part of any public API,
> >>>>> and
> >>>>>> may
> >>>>>>>>>> change at any time.  So we can't expose it in public APIs.
> >>>>>>>>>>
> >>>>>>>>>> best,
> >>>>>>>>>> Colin
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>> On Thu, Apr 1, 2021, at 14:18, Soumyajit Sahu wrote:
> >>>>>>>>>>> Hello All,
> >>>>>>>>>>> I would like to start a discussion on the KIP-729.
> >>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>
> >>>>>>>>
> >>>>>>
> >>>>>
> >>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-729%3A+Custom+validation+of+records+on+the+broker+prior+to+log+append
> >>>>>>>>>>>
> >>>>>>>>>>> Thanks!
> >>>>>>>>>>> Soumyajit
> >>>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>
> >>>>>>>>
> >>>>>>>
> >>>>>>
> >>>>>
> >>>
> >>
> >>
>
>