You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@kafka.apache.org by Grant Henke <gh...@cloudera.com> on 2015/03/24 03:16:23 UTC

New Producer Questions/Feedback

I am reading over the new producer code in an effort to understand the
implementation more thoroughly and had some questions/feedback.

Currently org.apache.kafka.clients.producer.internals.RecordAccumulator
append method accepts the compressionType on a per record basis. It looks
like the code would only work on a per batch basis because the
CompressionType is only used when creating a new RecordBatch. My
understanding is this should only support setting per batch at most. I may
have misread this though. Is there a time where setting per record would
make sense?

    public RecordAppendResult append(TopicPartition tp, byte[] key, byte[]
value, CompressionType compression, Callback callback) throws
InterruptedException;

Why does org.apache.kafka.common.serialization.Serializer Interface require
a topic?  Is there a use case where serialization would change based on
topic?

   public byte[] serialize(String topic, T data);

Thank you,
Grant

-- 
Grant Henke
Solutions Consultant | Cloudera
ghenke@cloudera.com | 920-980-8979
twitter.com/ghenke <http://twitter.com/gchenke> | linkedin.com/in/granthenke

Re: New Producer Questions/Feedback

Posted by Grant Henke <gh...@cloudera.com>.
Here is the jira: https://issues.apache.org/jira/browse/KAFKA-2043

Thanks,
Grant

On Mon, Mar 23, 2015 at 11:53 PM, Jun Rao <ju...@confluent.io> wrote:

> RecordAccumulator is actually not part of the public api since it's
> internal. The public apis are only those in
>
> http://kafka.apache.org/082/javadoc/index.html?org/apache/kafka/clients/producer/KafkaProducer.html
>
> Thanks,
>
> Jun
>
> On Mon, Mar 23, 2015 at 9:23 PM, Grant Henke <gh...@cloudera.com> wrote:
>
> > Thanks for validating that. I was thinking of solving it in the same
> > fashion. Though I was unsure if there was/would be a use case to have
> > multiple CompressionTypes in the same RecordAccumulator since the API was
> > originally created this way.
> >
> > I would be happy to file a jira and can take on making the change too.
> > Since
> > RecordAccumulator is part of the public api, should the KIP process be
> > followed here as well?
> >
> > On Mon, Mar 23, 2015 at 10:58 PM, Jun Rao <ju...@confluent.io> wrote:
> >
> > > Hi, Grant,
> > >
> > > The append api seems indeed a bit weird. The compression type is a
> > producer
> > > level config. Instead of passing it in for each append, we probably
> > should
> > > just pass it in once during the creation RecordAccumulator. Could you
> > file
> > > a jira to track this?
> > >
> > > Thanks,
> > >
> > > Jun
> > >
> > > On Mon, Mar 23, 2015 at 7:16 PM, Grant Henke <gh...@cloudera.com>
> > wrote:
> > >
> > > > I am reading over the new producer code in an effort to understand
> the
> > > > implementation more thoroughly and had some questions/feedback.
> > > >
> > > > Currently
> org.apache.kafka.clients.producer.internals.RecordAccumulator
> > > > append method accepts the compressionType on a per record basis. It
> > looks
> > > > like the code would only work on a per batch basis because the
> > > > CompressionType is only used when creating a new RecordBatch. My
> > > > understanding is this should only support setting per batch at most.
> I
> > > may
> > > > have misread this though. Is there a time where setting per record
> > would
> > > > make sense?
> > > >
> > > >     public RecordAppendResult append(TopicPartition tp, byte[] key,
> > > byte[]
> > > > value, CompressionType compression, Callback callback) throws
> > > > InterruptedException;
> > > >
> > > > Why does org.apache.kafka.common.serialization.Serializer Interface
> > > require
> > > > a topic?  Is there a use case where serialization would change based
> on
> > > > topic?
> > > >
> > > >    public byte[] serialize(String topic, T data);
> > > >
> > > > Thank you,
> > > > Grant
> > > >
> > > > --
> > > > Grant Henke
> > > > Solutions Consultant | Cloudera
> > > > ghenke@cloudera.com | 920-980-8979
> > > > twitter.com/ghenke <http://twitter.com/gchenke> |
> > > > linkedin.com/in/granthenke
> > > >
> > >
> >
> >
> >
> > --
> > Grant Henke
> > Solutions Consultant | Cloudera
> > ghenke@cloudera.com | 920-980-8979
> > twitter.com/ghenke <http://twitter.com/gchenke> |
> > linkedin.com/in/granthenke
> >
>



-- 
Grant Henke
Solutions Consultant | Cloudera
ghenke@cloudera.com | 920-980-8979
twitter.com/ghenke <http://twitter.com/gchenke> | linkedin.com/in/granthenke

Re: New Producer Questions/Feedback

Posted by Jun Rao <ju...@confluent.io>.
RecordAccumulator is actually not part of the public api since it's
internal. The public apis are only those in
http://kafka.apache.org/082/javadoc/index.html?org/apache/kafka/clients/producer/KafkaProducer.html

Thanks,

Jun

On Mon, Mar 23, 2015 at 9:23 PM, Grant Henke <gh...@cloudera.com> wrote:

> Thanks for validating that. I was thinking of solving it in the same
> fashion. Though I was unsure if there was/would be a use case to have
> multiple CompressionTypes in the same RecordAccumulator since the API was
> originally created this way.
>
> I would be happy to file a jira and can take on making the change too.
> Since
> RecordAccumulator is part of the public api, should the KIP process be
> followed here as well?
>
> On Mon, Mar 23, 2015 at 10:58 PM, Jun Rao <ju...@confluent.io> wrote:
>
> > Hi, Grant,
> >
> > The append api seems indeed a bit weird. The compression type is a
> producer
> > level config. Instead of passing it in for each append, we probably
> should
> > just pass it in once during the creation RecordAccumulator. Could you
> file
> > a jira to track this?
> >
> > Thanks,
> >
> > Jun
> >
> > On Mon, Mar 23, 2015 at 7:16 PM, Grant Henke <gh...@cloudera.com>
> wrote:
> >
> > > I am reading over the new producer code in an effort to understand the
> > > implementation more thoroughly and had some questions/feedback.
> > >
> > > Currently org.apache.kafka.clients.producer.internals.RecordAccumulator
> > > append method accepts the compressionType on a per record basis. It
> looks
> > > like the code would only work on a per batch basis because the
> > > CompressionType is only used when creating a new RecordBatch. My
> > > understanding is this should only support setting per batch at most. I
> > may
> > > have misread this though. Is there a time where setting per record
> would
> > > make sense?
> > >
> > >     public RecordAppendResult append(TopicPartition tp, byte[] key,
> > byte[]
> > > value, CompressionType compression, Callback callback) throws
> > > InterruptedException;
> > >
> > > Why does org.apache.kafka.common.serialization.Serializer Interface
> > require
> > > a topic?  Is there a use case where serialization would change based on
> > > topic?
> > >
> > >    public byte[] serialize(String topic, T data);
> > >
> > > Thank you,
> > > Grant
> > >
> > > --
> > > Grant Henke
> > > Solutions Consultant | Cloudera
> > > ghenke@cloudera.com | 920-980-8979
> > > twitter.com/ghenke <http://twitter.com/gchenke> |
> > > linkedin.com/in/granthenke
> > >
> >
>
>
>
> --
> Grant Henke
> Solutions Consultant | Cloudera
> ghenke@cloudera.com | 920-980-8979
> twitter.com/ghenke <http://twitter.com/gchenke> |
> linkedin.com/in/granthenke
>

Re: New Producer Questions/Feedback

Posted by Grant Henke <gh...@cloudera.com>.
Thanks for validating that. I was thinking of solving it in the same
fashion. Though I was unsure if there was/would be a use case to have
multiple CompressionTypes in the same RecordAccumulator since the API was
originally created this way.

I would be happy to file a jira and can take on making the change too. Since
RecordAccumulator is part of the public api, should the KIP process be
followed here as well?

On Mon, Mar 23, 2015 at 10:58 PM, Jun Rao <ju...@confluent.io> wrote:

> Hi, Grant,
>
> The append api seems indeed a bit weird. The compression type is a producer
> level config. Instead of passing it in for each append, we probably should
> just pass it in once during the creation RecordAccumulator. Could you file
> a jira to track this?
>
> Thanks,
>
> Jun
>
> On Mon, Mar 23, 2015 at 7:16 PM, Grant Henke <gh...@cloudera.com> wrote:
>
> > I am reading over the new producer code in an effort to understand the
> > implementation more thoroughly and had some questions/feedback.
> >
> > Currently org.apache.kafka.clients.producer.internals.RecordAccumulator
> > append method accepts the compressionType on a per record basis. It looks
> > like the code would only work on a per batch basis because the
> > CompressionType is only used when creating a new RecordBatch. My
> > understanding is this should only support setting per batch at most. I
> may
> > have misread this though. Is there a time where setting per record would
> > make sense?
> >
> >     public RecordAppendResult append(TopicPartition tp, byte[] key,
> byte[]
> > value, CompressionType compression, Callback callback) throws
> > InterruptedException;
> >
> > Why does org.apache.kafka.common.serialization.Serializer Interface
> require
> > a topic?  Is there a use case where serialization would change based on
> > topic?
> >
> >    public byte[] serialize(String topic, T data);
> >
> > Thank you,
> > Grant
> >
> > --
> > Grant Henke
> > Solutions Consultant | Cloudera
> > ghenke@cloudera.com | 920-980-8979
> > twitter.com/ghenke <http://twitter.com/gchenke> |
> > linkedin.com/in/granthenke
> >
>



-- 
Grant Henke
Solutions Consultant | Cloudera
ghenke@cloudera.com | 920-980-8979
twitter.com/ghenke <http://twitter.com/gchenke> | linkedin.com/in/granthenke

Re: New Producer Questions/Feedback

Posted by Jun Rao <ju...@confluent.io>.
Hi, Grant,

The append api seems indeed a bit weird. The compression type is a producer
level config. Instead of passing it in for each append, we probably should
just pass it in once during the creation RecordAccumulator. Could you file
a jira to track this?

Thanks,

Jun

On Mon, Mar 23, 2015 at 7:16 PM, Grant Henke <gh...@cloudera.com> wrote:

> I am reading over the new producer code in an effort to understand the
> implementation more thoroughly and had some questions/feedback.
>
> Currently org.apache.kafka.clients.producer.internals.RecordAccumulator
> append method accepts the compressionType on a per record basis. It looks
> like the code would only work on a per batch basis because the
> CompressionType is only used when creating a new RecordBatch. My
> understanding is this should only support setting per batch at most. I may
> have misread this though. Is there a time where setting per record would
> make sense?
>
>     public RecordAppendResult append(TopicPartition tp, byte[] key, byte[]
> value, CompressionType compression, Callback callback) throws
> InterruptedException;
>
> Why does org.apache.kafka.common.serialization.Serializer Interface require
> a topic?  Is there a use case where serialization would change based on
> topic?
>
>    public byte[] serialize(String topic, T data);
>
> Thank you,
> Grant
>
> --
> Grant Henke
> Solutions Consultant | Cloudera
> ghenke@cloudera.com | 920-980-8979
> twitter.com/ghenke <http://twitter.com/gchenke> |
> linkedin.com/in/granthenke
>