You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by Alexis Midon <al...@airbedandbreakfast.com> on 2014/08/28 01:40:14 UTC

message size limit

Hello,

my brokers are reporting that some received messages exceed the
`message.max.bytes` value.
I'd like to know what producers are at fault but It is pretty much
impossible:
- the brokers don't log the content of the rejected messages
- the log messages do not contain the IP of the producers
- on the consumer side, no exception is thrown (afaik it is because Ack-0
is used). The only kind of notification is to closed the connection.

[1] Do you have any suggestions to track down the guilty producers or find
out the message content?

Even though it makes total sense to have the limit defined and applied on
the brokers, I was thinking that this check could also be applied by the
producers. Some google results suggest that `message.max.bytes` might be
used by the producers but I can't find any trace of that behavior in the
code.

The closest thing I have is
https://github.com/apache/kafka/blob/0.8.1/core/src/main/scala/kafka/producer/SyncProducer.scala#L67
but it simply logs the message size and content and the log level is trace.

[2] could you please confirm if such a producer-side check exists?


thanks!

Alexis

Re: message size limit

Posted by Jun Rao <ju...@gmail.com>.
You just need to set batch.size to be less than 2MB. max.request.size in
the producer has to be less than socket.request.max.bytes in the broker.

Thanks,

Jun

On Wed, Sep 10, 2014 at 2:22 PM, Bhavesh Mistry <mi...@gmail.com>
wrote:

> Hi Jun,
>
> Thank for highlighting.  Please correct me my understanding.  If the max
> request size <= message.max.bytes then batch size will be optimally decided
> ( batch size will be determine by  either max.request.size limit or
> batch.size which ever is less).
>
> Here is configuration parameter I was referring to on new producer side and
> Broker config.  Effectively eliminating the data loss issue or large amount
> of messages (except for one the really exceed the limit).  That is what I
> had asked for in previous email.
>
> *New Producer Config:*
> max.request.size=2MB
> batch.size=1000
>
> *Broker Config:*
> message.max.bytes=2MB
>
> Thanks,
> Bhavesh
>
> On Wed, Sep 10, 2014 at 10:10 AM, Jun Rao <ju...@gmail.com> wrote:
>
> > Actually, with the new producer, you can configure the batch size in
> bytes.
> > If you set the batch size to be smaller than the max message size,
> messages
> > exceeding the max message limit will be in its own batch. Then, only one
> > message will be rejected by the broker.
> >
> > Thanks,
> >
> > Jun
> >
> > On Tue, Sep 9, 2014 at 5:51 PM, Bhavesh Mistry <
> mistry.p.bhavesh@gmail.com
> > >
> > wrote:
> >
> > > Hi Jun,
> > >
> > > Is there any plug-ability that Developer can customize batching logic
> or
> > > inject custom code for this ? Shall I file Jira for this issues.
> > >
> > > Thanks,
> > >
> > > Bhavesh
> > >
> > > On Tue, Sep 9, 2014 at 3:52 PM, Jun Rao <ju...@gmail.com> wrote:
> > >
> > > > No, the new producer doesn't address that problem.
> > > >
> > > > Thanks,
> > > >
> > > > Jun
> > > >
> > > > On Tue, Sep 9, 2014 at 12:59 PM, Bhavesh Mistry <
> > > > mistry.p.bhavesh@gmail.com>
> > > > wrote:
> > > >
> > > > > HI Jun,
> > > > >
> > > > > Thanks for clarification.  Follow up questions, does new producer
> > solve
> > > > the
> > > > > issues highlight.  In event of compression and async mode in new
> > > > producer,
> > > > > will it break down messages to this UPPER limit and submit or new
> > > > producer
> > > > > strictly honor batch size.  I am just asking if compression batch
> > size
> > > > > message reaches this configured limit, does batch broken down to
> sub
> > > > batch
> > > > > that is within limit.  I would like to minimize the data loss due
> to
> > > this
> > > > > limit.
> > > > >
> > > > >
> > > > > Thanks,
> > > > >
> > > > > Bhavesh
> > > > >
> > > > > On Mon, Sep 8, 2014 at 9:17 PM, Jun Rao <ju...@gmail.com> wrote:
> > > > >
> > > > > > Are you using compression in the producer? If so,
> message.max.bytes
> > > > > applies
> > > > > > to the compressed size of a batch of messages. Otherwise,
> > > > > message.max.bytes
> > > > > > applies to the size of each individual message.
> > > > > >
> > > > > > Thanks,
> > > > > >
> > > > > > Jun
> > > > > >
> > > > > > On Wed, Sep 3, 2014 at 3:25 PM, Bhavesh Mistry <
> > > > > mistry.p.bhavesh@gmail.com
> > > > > > >
> > > > > > wrote:
> > > > > >
> > > > > > > I am referring to wiki
> > > http://kafka.apache.org/08/configuration.html
> > > > > and
> > > > > > > following parameter control max batch message bytes as far as I
> > > know.
> > > > > > > Kafka Community, please correct me if I am wrong.  I do not
> want
> > to
> > > > > > create
> > > > > > > confusion for Kafka User Community here.   Also, if you
> increase
> > > this
> > > > > > limit
> > > > > > > than you have to set the corresponding limit increase on
> consumer
> > > > side
> > > > > as
> > > > > > > well (fetch.message.max.bytes).
> > > > > > >
> > > > > > > Since we are using batch async mode, our messages are getting
> > drop
> > > > > > sometime
> > > > > > > if the entire batch bytes  exceed this limit so I was asking
> > Kafka
> > > > > > > Developers if any optimal way to determine the batch size based
> > on
> > > > this
> > > > > > > limit to minimize the data loss. Because, entire batch is
> > rejected
> > > by
> > > > > > > brokers.
> > > > > > >
> > > > > > > message.max.bytes 1000000 The maximum size of a message that
> the
> > > > server
> > > > > > can
> > > > > > > receive. It is important that this property be in sync with the
> > > > maximum
> > > > > > > fetch size your consumers use or else an unruly producer will
> be
> > > able
> > > > > to
> > > > > > > publish messages too large for consumers to consume.
> > > > > > >
> > > > > > > Thanks,
> > > > > > >
> > > > > > > Bhavesh
> > > > > > >
> > > > > > >
> > > > > > > On Wed, Sep 3, 2014 at 2:59 PM, Alexis Midon <
> > > > > > > alexis.midon@airbedandbreakfast.com> wrote:
> > > > > > >
> > > > > > > > Hi Bhavesh
> > > > > > > >
> > > > > > > > can you explain what limit you're referring to?
> > > > > > > > I'm asking because `message.max.bytes` is applied per message
> > not
> > > > per
> > > > > > > > batch.
> > > > > > > > is there another limit I should be aware of?
> > > > > > > >
> > > > > > > > thanks
> > > > > > > >
> > > > > > > >
> > > > > > > > On Wed, Sep 3, 2014 at 2:07 PM, Bhavesh Mistry <
> > > > > > > mistry.p.bhavesh@gmail.com
> > > > > > > > >
> > > > > > > > wrote:
> > > > > > > >
> > > > > > > > > Hi Jun,
> > > > > > > > >
> > > > > > > > > We have similar problem.  We have variable length of
> > messages.
> > > > So
> > > > > > when
> > > > > > > > we
> > > > > > > > > have fixed size of Batch sometime the batch exceed the
> limit
> > > set
> > > > on
> > > > > > the
> > > > > > > > > brokers (2MB).
> > > > > > > > >
> > > > > > > > > So can Producer have some extra logic to determine the
> > optimal
> > > > > batch
> > > > > > > size
> > > > > > > > > by looking at configured message.max.bytes  value.
> > > > > > > > >
> > > > > > > > > During the metadata update, Producer will get this value
> from
> > > the
> > > > > > > Broker
> > > > > > > > > for each topic and Producer will check if current batch
> size
> > > > reach
> > > > > > this
> > > > > > > > > limit than break batch into smaller chunk such way that It
> > > would
> > > > > not
> > > > > > > > exceed
> > > > > > > > > limit (unless single message exceed the limit). Basically
> try
> > > to
> > > > > > avoid
> > > > > > > > data
> > > > > > > > > loss as much as possible.
> > > > > > > > >
> > > > > > > > > Please let me know what is your opinion on this...
> > > > > > > > >
> > > > > > > > > Thanks,
> > > > > > > > >
> > > > > > > > > Bhavesh
> > > > > > > > >
> > > > > > > > >
> > > > > > > > > On Wed, Sep 3, 2014 at 6:21 AM, Alexis Midon <
> > > > > > > > > alexis.midon@airbedandbreakfast.com> wrote:
> > > > > > > > >
> > > > > > > > > > Thanks Jun.
> > > > > > > > > >
> > > > > > > > > > I'll create a jira and try to provide a patch. I think
> this
> > > is
> > > > > > pretty
> > > > > > > > > > serious.
> > > > > > > > > >
> > > > > > > > > > On Friday, August 29, 2014, Jun Rao <ju...@gmail.com>
> > > wrote:
> > > > > > > > > >
> > > > > > > > > > > The goal of batching is mostly to reduce the # RPC
> calls
> > to
> > > > the
> > > > > > > > broker.
> > > > > > > > > > If
> > > > > > > > > > > compression is enabled, a larger batch typically
> implies
> > > > better
> > > > > > > > > > compression
> > > > > > > > > > > ratio.
> > > > > > > > > > >
> > > > > > > > > > > The reason that we have to fail the whole batch is that
> > the
> > > > > error
> > > > > > > > code
> > > > > > > > > in
> > > > > > > > > > > the produce response is per partition, instead of per
> > > > message.
> > > > > > > > > > >
> > > > > > > > > > > Retrying individual messages on MessageSizeTooLarge
> seems
> > > > > > > reasonable.
> > > > > > > > > > >
> > > > > > > > > > > Thanks,
> > > > > > > > > > >
> > > > > > > > > > > Jun
> > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > > > On Fri, Aug 29, 2014 at 4:28 PM, Alexis Midon <
> > > > > > > > > > > alexis.midon@airbedandbreakfast.com <javascript:;>>
> > wrote:
> > > > > > > > > > >
> > > > > > > > > > > > Could you explain the goals of batches? I was
> assuming
> > > this
> > > > > was
> > > > > > > > > simply
> > > > > > > > > > a
> > > > > > > > > > > > performance optimization, but this behavior makes me
> > > think
> > > > > I'm
> > > > > > > > > missing
> > > > > > > > > > > > something.
> > > > > > > > > > > > is a batch more than a list of *independent*
> messages?
> > > > > > > > > > > >
> > > > > > > > > > > > Why would you reject the whole batch? One invalid
> > message
> > > > > > causes
> > > > > > > > the
> > > > > > > > > > loss
> > > > > > > > > > > > of batch.num.messages-1 messages :(
> > > > > > > > > > > > It seems pretty critical to me.
> > > > > > > > > > > >
> > > > > > > > > > > > If ack=0, the producer will never know about it.
> > > > > > > > > > > > If ack !=0, the producer will retry the whole batch.
> If
> > > the
> > > > > > issue
> > > > > > > > was
> > > > > > > > > > > > related to data corruption (etc), retries might work.
> > But
> > > > in
> > > > > > the
> > > > > > > > case
> > > > > > > > > > of
> > > > > > > > > > > > "big message", the batch will always be rejected and
> > the
> > > > > > producer
> > > > > > > > > will
> > > > > > > > > > > give
> > > > > > > > > > > > up.
> > > > > > > > > > > >
> > > > > > > > > > > > If the messages are indeed considered independent, I
> > > think
> > > > > this
> > > > > > > is
> > > > > > > > a
> > > > > > > > > > > pretty
> > > > > > > > > > > > serious issue.
> > > > > > > > > > > >
> > > > > > > > > > > > I see 2 possible fix approaches:
> > > > > > > > > > > > - the broker could reject only the invalid messages
> > > > > > > > > > > > - the broker could reject the whole batch (like
> today)
> > > but
> > > > > the
> > > > > > > > > producer
> > > > > > > > > > > (if
> > > > > > > > > > > > ack!=0) could retry messages one at a time on
> exception
> > > > like
> > > > > > > > > > > > "MessageSizeTooLarge".
> > > > > > > > > > > >
> > > > > > > > > > > > opinions?
> > > > > > > > > > > >
> > > > > > > > > > > > Alexis
> > > > > > > > > > > >
> > > > > > > > > > > > ```
> > > > > > > > > > > > [2014-08-29 16:00:35,170] WARN Produce request with
> > > > > correlation
> > > > > > > id
> > > > > > > > 46
> > > > > > > > > > > > failed due to [test,1]:
> > > > > > kafka.common.MessageSizeTooLargeException
> > > > > > > > > > > > (kafka.producer.async.DefaultEventHandler)
> > > > > > > > > > > > [2014-08-29 16:00:35,284] WARN Produce request with
> > > > > correlation
> > > > > > > id
> > > > > > > > 51
> > > > > > > > > > > > failed due to [test,0]:
> > > > > > kafka.common.MessageSizeTooLargeException
> > > > > > > > > > > > (kafka.producer.async.DefaultEventHandler)
> > > > > > > > > > > > [2014-08-29 16:00:35,392] WARN Produce request with
> > > > > correlation
> > > > > > > id
> > > > > > > > 56
> > > > > > > > > > > > failed due to [test,0]:
> > > > > > kafka.common.MessageSizeTooLargeException
> > > > > > > > > > > > (kafka.producer.async.DefaultEventHandler)
> > > > > > > > > > > > [2014-08-29 16:00:35,499] WARN Produce request with
> > > > > correlation
> > > > > > > id
> > > > > > > > 61
> > > > > > > > > > > > failed due to [test,1]:
> > > > > > kafka.common.MessageSizeTooLargeException
> > > > > > > > > > > > (kafka.producer.async.DefaultEventHandler)
> > > > > > > > > > > > [2014-08-29 16:00:35,603] ERROR Failed to send
> requests
> > > for
> > > > > > > topics
> > > > > > > > > test
> > > > > > > > > > > > with correlation ids in [43,62]
> > > > > > > > > > > (kafka.producer.async.DefaultEventHandler)
> > > > > > > > > > > > [2014-08-29 16:00:35,603] ERROR Error in handling
> batch
> > > of
> > > > 3
> > > > > > > events
> > > > > > > > > > > > (kafka.producer.async.ProducerSendThread)
> > > > > > > > > > > > kafka.common.FailedToSendMessageException: Failed to
> > send
> > > > > > > messages
> > > > > > > > > > after
> > > > > > > > > > > 3
> > > > > > > > > > > > tries.
> > > > > > > > > > > > at
> > > > > > > > > > > >
> > > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:90)
> > > > > > > > > > > >  at
> > > > > > > > > > > >
> > > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.scala:104)
> > > > > > > > > > > > at
> > > > > > > > > > > >
> > > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:87)
> > > > > > > > > > > >  at
> > > > > > > > > > > >
> > > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:67)
> > > > > > > > > > > > ```
> > > > > > > > > > > >
> > > > > > > > > > > >
> > > > > > > > > > > > On Thu, Aug 28, 2014 at 7:13 AM, Jun Rao <
> > > junrao@gmail.com
> > > > > > > > > > > <javascript:;>> wrote:
> > > > > > > > > > > >
> > > > > > > > > > > > > That's right. If one message in a batch exceeds the
> > > size
> > > > > > limit,
> > > > > > > > the
> > > > > > > > > > > whole
> > > > > > > > > > > > > batch is rejected.
> > > > > > > > > > > > >
> > > > > > > > > > > > > When determining message.max.bytes, the most
> > important
> > > > > thing
> > > > > > to
> > > > > > > > > > > consider
> > > > > > > > > > > > is
> > > > > > > > > > > > > probably memory since currently we need to allocate
> > > > memory
> > > > > > for
> > > > > > > a
> > > > > > > > > full
> > > > > > > > > > > > > message in the broker and the producer and the
> > consumer
> > > > > > client.
> > > > > > > > > > > > >
> > > > > > > > > > > > > Thanks,
> > > > > > > > > > > > >
> > > > > > > > > > > > > Jun
> > > > > > > > > > > > >
> > > > > > > > > > > > >
> > > > > > > > > > > > > On Wed, Aug 27, 2014 at 9:52 PM, Alexis Midon <
> > > > > > > > > > > > > alexis.midon@airbedandbreakfast.com
> <javascript:;>>
> > > > wrote:
> > > > > > > > > > > > >
> > > > > > > > > > > > > > am I miss reading this loop:
> > > > > > > > > > > > > >
> > > > > > > > > > > > > >
> > > > > > > > > > > > >
> > > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://github.com/apache/kafka/blob/0.8.1/core/src/main/scala/kafka/log/Log.scala#L265-L269
> > > > > > > > > > > > > >
> > > > > > > > > > > > > > it seems like all messages from `validMessages`
> > > (which
> > > > is
> > > > > > > > > > > > > > ByteBufferMessageSet) are NOT appended if one of
> > the
> > > > > > message
> > > > > > > > size
> > > > > > > > > > > > exceeds
> > > > > > > > > > > > > > the limit.
> > > > > > > > > > > > > >
> > > > > > > > > > > > > > I hope I'm missing something.
> > > > > > > > > > > > > >
> > > > > > > > > > > > > >
> > > > > > > > > > > > > >
> > > > > > > > > > > > > > On Wed, Aug 27, 2014 at 9:38 PM, Alexis Midon <
> > > > > > > > > > > > > > alexis.midon@airbedandbreakfast.com
> > <javascript:;>>
> > > > > wrote:
> > > > > > > > > > > > > >
> > > > > > > > > > > > > > > Hi Jun,
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > thanks for you answer.
> > > > > > > > > > > > > > > Unfortunately the size won't help much, I'd
> like
> > to
> > > > see
> > > > > > the
> > > > > > > > > > actual
> > > > > > > > > > > > > > message
> > > > > > > > > > > > > > > data.
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > By the way what are the things to consider when
> > > > > deciding
> > > > > > on
> > > > > > > > > > > > > > > `message.max.bytes` value?
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > On Wed, Aug 27, 2014 at 9:06 PM, Jun Rao <
> > > > > > junrao@gmail.com
> > > > > > > > > > > <javascript:;>> wrote:
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > > >> The message size check is currently only done
> on
> > > the
> > > > > > > broker.
> > > > > > > > > If
> > > > > > > > > > > you
> > > > > > > > > > > > > > enable
> > > > > > > > > > > > > > >> trace level logging in RequestChannel, you
> will
> > > see
> > > > > the
> > > > > > > > > produce
> > > > > > > > > > > > > request,
> > > > > > > > > > > > > > >> which includes the size of each partition.
> > > > > > > > > > > > > > >>
> > > > > > > > > > > > > > >> Thanks,
> > > > > > > > > > > > > > >>
> > > > > > > > > > > > > > >> Jun
> > > > > > > > > > > > > > >>
> > > > > > > > > > > > > > >>
> > > > > > > > > > > > > > >> On Wed, Aug 27, 2014 at 4:40 PM, Alexis Midon
> <
> > > > > > > > > > > > > > >> alexis.midon@airbedandbreakfast.com
> > > <javascript:;>>
> > > > > > > wrote:
> > > > > > > > > > > > > > >>
> > > > > > > > > > > > > > >> > Hello,
> > > > > > > > > > > > > > >> >
> > > > > > > > > > > > > > >> > my brokers are reporting that some received
> > > > messages
> > > > > > > > exceed
> > > > > > > > > > the
> > > > > > > > > > > > > > >> > `message.max.bytes` value.
> > > > > > > > > > > > > > >> > I'd like to know what producers are at fault
> > but
> > > > It
> > > > > is
> > > > > > > > > pretty
> > > > > > > > > > > much
> > > > > > > > > > > > > > >> > impossible:
> > > > > > > > > > > > > > >> > - the brokers don't log the content of the
> > > > rejected
> > > > > > > > messages
> > > > > > > > > > > > > > >> > - the log messages do not contain the IP of
> > the
> > > > > > > producers
> > > > > > > > > > > > > > >> > - on the consumer side, no exception is
> thrown
> > > > > (afaik
> > > > > > it
> > > > > > > > is
> > > > > > > > > > > > because
> > > > > > > > > > > > > > >> Ack-0
> > > > > > > > > > > > > > >> > is used). The only kind of notification is
> to
> > > > closed
> > > > > > the
> > > > > > > > > > > > connection.
> > > > > > > > > > > > > > >> >
> > > > > > > > > > > > > > >> > [1] Do you have any suggestions to track
> down
> > > the
> > > > > > guilty
> > > > > > > > > > > producers
> > > > > > > > > > > > > or
> > > > > > > > > > > > > > >> find
> > > > > > > > > > > > > > >> > out the message content?
> > > > > > > > > > > > > > >> >
> > > > > > > > > > > > > > >> > Even though it makes total sense to have the
> > > limit
> > > > > > > defined
> > > > > > > > > and
> > > > > > > > > > > > > applied
> > > > > > > > > > > > > > >> on
> > > > > > > > > > > > > > >> > the brokers, I was thinking that this check
> > > could
> > > > > also
> > > > > > > be
> > > > > > > > > > > applied
> > > > > > > > > > > > by
> > > > > > > > > > > > > > the
> > > > > > > > > > > > > > >> > producers. Some google results suggest that
> > > > > > > > > > `message.max.bytes`
> > > > > > > > > > > > > might
> > > > > > > > > > > > > > be
> > > > > > > > > > > > > > >> > used by the producers but I can't find any
> > trace
> > > > of
> > > > > > that
> > > > > > > > > > > behavior
> > > > > > > > > > > > in
> > > > > > > > > > > > > > the
> > > > > > > > > > > > > > >> > code.
> > > > > > > > > > > > > > >> >
> > > > > > > > > > > > > > >> > The closest thing I have is
> > > > > > > > > > > > > > >> >
> > > > > > > > > > > > > > >> >
> > > > > > > > > > > > > > >>
> > > > > > > > > > > > > >
> > > > > > > > > > > > >
> > > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://github.com/apache/kafka/blob/0.8.1/core/src/main/scala/kafka/producer/SyncProducer.scala#L67
> > > > > > > > > > > > > > >> > but it simply logs the message size and
> > content
> > > > and
> > > > > > the
> > > > > > > > log
> > > > > > > > > > > level
> > > > > > > > > > > > is
> > > > > > > > > > > > > > >> trace.
> > > > > > > > > > > > > > >> >
> > > > > > > > > > > > > > >> > [2] could you please confirm if such a
> > > > producer-side
> > > > > > > check
> > > > > > > > > > > exists?
> > > > > > > > > > > > > > >> >
> > > > > > > > > > > > > > >> >
> > > > > > > > > > > > > > >> > thanks!
> > > > > > > > > > > > > > >> >
> > > > > > > > > > > > > > >> > Alexis
> > > > > > > > > > > > > > >> >
> > > > > > > > > > > > > > >>
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > >
> > > > > > > > > > > > >
> > > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
>

Re: message size limit

Posted by Bhavesh Mistry <mi...@gmail.com>.
Hi Jun,

Thank for highlighting.  Please correct me my understanding.  If the max
request size <= message.max.bytes then batch size will be optimally decided
( batch size will be determine by  either max.request.size limit or
batch.size which ever is less).

Here is configuration parameter I was referring to on new producer side and
Broker config.  Effectively eliminating the data loss issue or large amount
of messages (except for one the really exceed the limit).  That is what I
had asked for in previous email.

*New Producer Config:*
max.request.size=2MB
batch.size=1000

*Broker Config:*
message.max.bytes=2MB

Thanks,
Bhavesh

On Wed, Sep 10, 2014 at 10:10 AM, Jun Rao <ju...@gmail.com> wrote:

> Actually, with the new producer, you can configure the batch size in bytes.
> If you set the batch size to be smaller than the max message size, messages
> exceeding the max message limit will be in its own batch. Then, only one
> message will be rejected by the broker.
>
> Thanks,
>
> Jun
>
> On Tue, Sep 9, 2014 at 5:51 PM, Bhavesh Mistry <mistry.p.bhavesh@gmail.com
> >
> wrote:
>
> > Hi Jun,
> >
> > Is there any plug-ability that Developer can customize batching logic or
> > inject custom code for this ? Shall I file Jira for this issues.
> >
> > Thanks,
> >
> > Bhavesh
> >
> > On Tue, Sep 9, 2014 at 3:52 PM, Jun Rao <ju...@gmail.com> wrote:
> >
> > > No, the new producer doesn't address that problem.
> > >
> > > Thanks,
> > >
> > > Jun
> > >
> > > On Tue, Sep 9, 2014 at 12:59 PM, Bhavesh Mistry <
> > > mistry.p.bhavesh@gmail.com>
> > > wrote:
> > >
> > > > HI Jun,
> > > >
> > > > Thanks for clarification.  Follow up questions, does new producer
> solve
> > > the
> > > > issues highlight.  In event of compression and async mode in new
> > > producer,
> > > > will it break down messages to this UPPER limit and submit or new
> > > producer
> > > > strictly honor batch size.  I am just asking if compression batch
> size
> > > > message reaches this configured limit, does batch broken down to sub
> > > batch
> > > > that is within limit.  I would like to minimize the data loss due to
> > this
> > > > limit.
> > > >
> > > >
> > > > Thanks,
> > > >
> > > > Bhavesh
> > > >
> > > > On Mon, Sep 8, 2014 at 9:17 PM, Jun Rao <ju...@gmail.com> wrote:
> > > >
> > > > > Are you using compression in the producer? If so, message.max.bytes
> > > > applies
> > > > > to the compressed size of a batch of messages. Otherwise,
> > > > message.max.bytes
> > > > > applies to the size of each individual message.
> > > > >
> > > > > Thanks,
> > > > >
> > > > > Jun
> > > > >
> > > > > On Wed, Sep 3, 2014 at 3:25 PM, Bhavesh Mistry <
> > > > mistry.p.bhavesh@gmail.com
> > > > > >
> > > > > wrote:
> > > > >
> > > > > > I am referring to wiki
> > http://kafka.apache.org/08/configuration.html
> > > > and
> > > > > > following parameter control max batch message bytes as far as I
> > know.
> > > > > > Kafka Community, please correct me if I am wrong.  I do not want
> to
> > > > > create
> > > > > > confusion for Kafka User Community here.   Also, if you increase
> > this
> > > > > limit
> > > > > > than you have to set the corresponding limit increase on consumer
> > > side
> > > > as
> > > > > > well (fetch.message.max.bytes).
> > > > > >
> > > > > > Since we are using batch async mode, our messages are getting
> drop
> > > > > sometime
> > > > > > if the entire batch bytes  exceed this limit so I was asking
> Kafka
> > > > > > Developers if any optimal way to determine the batch size based
> on
> > > this
> > > > > > limit to minimize the data loss. Because, entire batch is
> rejected
> > by
> > > > > > brokers.
> > > > > >
> > > > > > message.max.bytes 1000000 The maximum size of a message that the
> > > server
> > > > > can
> > > > > > receive. It is important that this property be in sync with the
> > > maximum
> > > > > > fetch size your consumers use or else an unruly producer will be
> > able
> > > > to
> > > > > > publish messages too large for consumers to consume.
> > > > > >
> > > > > > Thanks,
> > > > > >
> > > > > > Bhavesh
> > > > > >
> > > > > >
> > > > > > On Wed, Sep 3, 2014 at 2:59 PM, Alexis Midon <
> > > > > > alexis.midon@airbedandbreakfast.com> wrote:
> > > > > >
> > > > > > > Hi Bhavesh
> > > > > > >
> > > > > > > can you explain what limit you're referring to?
> > > > > > > I'm asking because `message.max.bytes` is applied per message
> not
> > > per
> > > > > > > batch.
> > > > > > > is there another limit I should be aware of?
> > > > > > >
> > > > > > > thanks
> > > > > > >
> > > > > > >
> > > > > > > On Wed, Sep 3, 2014 at 2:07 PM, Bhavesh Mistry <
> > > > > > mistry.p.bhavesh@gmail.com
> > > > > > > >
> > > > > > > wrote:
> > > > > > >
> > > > > > > > Hi Jun,
> > > > > > > >
> > > > > > > > We have similar problem.  We have variable length of
> messages.
> > > So
> > > > > when
> > > > > > > we
> > > > > > > > have fixed size of Batch sometime the batch exceed the limit
> > set
> > > on
> > > > > the
> > > > > > > > brokers (2MB).
> > > > > > > >
> > > > > > > > So can Producer have some extra logic to determine the
> optimal
> > > > batch
> > > > > > size
> > > > > > > > by looking at configured message.max.bytes  value.
> > > > > > > >
> > > > > > > > During the metadata update, Producer will get this value from
> > the
> > > > > > Broker
> > > > > > > > for each topic and Producer will check if current batch size
> > > reach
> > > > > this
> > > > > > > > limit than break batch into smaller chunk such way that It
> > would
> > > > not
> > > > > > > exceed
> > > > > > > > limit (unless single message exceed the limit). Basically try
> > to
> > > > > avoid
> > > > > > > data
> > > > > > > > loss as much as possible.
> > > > > > > >
> > > > > > > > Please let me know what is your opinion on this...
> > > > > > > >
> > > > > > > > Thanks,
> > > > > > > >
> > > > > > > > Bhavesh
> > > > > > > >
> > > > > > > >
> > > > > > > > On Wed, Sep 3, 2014 at 6:21 AM, Alexis Midon <
> > > > > > > > alexis.midon@airbedandbreakfast.com> wrote:
> > > > > > > >
> > > > > > > > > Thanks Jun.
> > > > > > > > >
> > > > > > > > > I'll create a jira and try to provide a patch. I think this
> > is
> > > > > pretty
> > > > > > > > > serious.
> > > > > > > > >
> > > > > > > > > On Friday, August 29, 2014, Jun Rao <ju...@gmail.com>
> > wrote:
> > > > > > > > >
> > > > > > > > > > The goal of batching is mostly to reduce the # RPC calls
> to
> > > the
> > > > > > > broker.
> > > > > > > > > If
> > > > > > > > > > compression is enabled, a larger batch typically implies
> > > better
> > > > > > > > > compression
> > > > > > > > > > ratio.
> > > > > > > > > >
> > > > > > > > > > The reason that we have to fail the whole batch is that
> the
> > > > error
> > > > > > > code
> > > > > > > > in
> > > > > > > > > > the produce response is per partition, instead of per
> > > message.
> > > > > > > > > >
> > > > > > > > > > Retrying individual messages on MessageSizeTooLarge seems
> > > > > > reasonable.
> > > > > > > > > >
> > > > > > > > > > Thanks,
> > > > > > > > > >
> > > > > > > > > > Jun
> > > > > > > > > >
> > > > > > > > > >
> > > > > > > > > > On Fri, Aug 29, 2014 at 4:28 PM, Alexis Midon <
> > > > > > > > > > alexis.midon@airbedandbreakfast.com <javascript:;>>
> wrote:
> > > > > > > > > >
> > > > > > > > > > > Could you explain the goals of batches? I was assuming
> > this
> > > > was
> > > > > > > > simply
> > > > > > > > > a
> > > > > > > > > > > performance optimization, but this behavior makes me
> > think
> > > > I'm
> > > > > > > > missing
> > > > > > > > > > > something.
> > > > > > > > > > > is a batch more than a list of *independent* messages?
> > > > > > > > > > >
> > > > > > > > > > > Why would you reject the whole batch? One invalid
> message
> > > > > causes
> > > > > > > the
> > > > > > > > > loss
> > > > > > > > > > > of batch.num.messages-1 messages :(
> > > > > > > > > > > It seems pretty critical to me.
> > > > > > > > > > >
> > > > > > > > > > > If ack=0, the producer will never know about it.
> > > > > > > > > > > If ack !=0, the producer will retry the whole batch. If
> > the
> > > > > issue
> > > > > > > was
> > > > > > > > > > > related to data corruption (etc), retries might work.
> But
> > > in
> > > > > the
> > > > > > > case
> > > > > > > > > of
> > > > > > > > > > > "big message", the batch will always be rejected and
> the
> > > > > producer
> > > > > > > > will
> > > > > > > > > > give
> > > > > > > > > > > up.
> > > > > > > > > > >
> > > > > > > > > > > If the messages are indeed considered independent, I
> > think
> > > > this
> > > > > > is
> > > > > > > a
> > > > > > > > > > pretty
> > > > > > > > > > > serious issue.
> > > > > > > > > > >
> > > > > > > > > > > I see 2 possible fix approaches:
> > > > > > > > > > > - the broker could reject only the invalid messages
> > > > > > > > > > > - the broker could reject the whole batch (like today)
> > but
> > > > the
> > > > > > > > producer
> > > > > > > > > > (if
> > > > > > > > > > > ack!=0) could retry messages one at a time on exception
> > > like
> > > > > > > > > > > "MessageSizeTooLarge".
> > > > > > > > > > >
> > > > > > > > > > > opinions?
> > > > > > > > > > >
> > > > > > > > > > > Alexis
> > > > > > > > > > >
> > > > > > > > > > > ```
> > > > > > > > > > > [2014-08-29 16:00:35,170] WARN Produce request with
> > > > correlation
> > > > > > id
> > > > > > > 46
> > > > > > > > > > > failed due to [test,1]:
> > > > > kafka.common.MessageSizeTooLargeException
> > > > > > > > > > > (kafka.producer.async.DefaultEventHandler)
> > > > > > > > > > > [2014-08-29 16:00:35,284] WARN Produce request with
> > > > correlation
> > > > > > id
> > > > > > > 51
> > > > > > > > > > > failed due to [test,0]:
> > > > > kafka.common.MessageSizeTooLargeException
> > > > > > > > > > > (kafka.producer.async.DefaultEventHandler)
> > > > > > > > > > > [2014-08-29 16:00:35,392] WARN Produce request with
> > > > correlation
> > > > > > id
> > > > > > > 56
> > > > > > > > > > > failed due to [test,0]:
> > > > > kafka.common.MessageSizeTooLargeException
> > > > > > > > > > > (kafka.producer.async.DefaultEventHandler)
> > > > > > > > > > > [2014-08-29 16:00:35,499] WARN Produce request with
> > > > correlation
> > > > > > id
> > > > > > > 61
> > > > > > > > > > > failed due to [test,1]:
> > > > > kafka.common.MessageSizeTooLargeException
> > > > > > > > > > > (kafka.producer.async.DefaultEventHandler)
> > > > > > > > > > > [2014-08-29 16:00:35,603] ERROR Failed to send requests
> > for
> > > > > > topics
> > > > > > > > test
> > > > > > > > > > > with correlation ids in [43,62]
> > > > > > > > > > (kafka.producer.async.DefaultEventHandler)
> > > > > > > > > > > [2014-08-29 16:00:35,603] ERROR Error in handling batch
> > of
> > > 3
> > > > > > events
> > > > > > > > > > > (kafka.producer.async.ProducerSendThread)
> > > > > > > > > > > kafka.common.FailedToSendMessageException: Failed to
> send
> > > > > > messages
> > > > > > > > > after
> > > > > > > > > > 3
> > > > > > > > > > > tries.
> > > > > > > > > > > at
> > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:90)
> > > > > > > > > > >  at
> > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.scala:104)
> > > > > > > > > > > at
> > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:87)
> > > > > > > > > > >  at
> > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:67)
> > > > > > > > > > > ```
> > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > > > On Thu, Aug 28, 2014 at 7:13 AM, Jun Rao <
> > junrao@gmail.com
> > > > > > > > > > <javascript:;>> wrote:
> > > > > > > > > > >
> > > > > > > > > > > > That's right. If one message in a batch exceeds the
> > size
> > > > > limit,
> > > > > > > the
> > > > > > > > > > whole
> > > > > > > > > > > > batch is rejected.
> > > > > > > > > > > >
> > > > > > > > > > > > When determining message.max.bytes, the most
> important
> > > > thing
> > > > > to
> > > > > > > > > > consider
> > > > > > > > > > > is
> > > > > > > > > > > > probably memory since currently we need to allocate
> > > memory
> > > > > for
> > > > > > a
> > > > > > > > full
> > > > > > > > > > > > message in the broker and the producer and the
> consumer
> > > > > client.
> > > > > > > > > > > >
> > > > > > > > > > > > Thanks,
> > > > > > > > > > > >
> > > > > > > > > > > > Jun
> > > > > > > > > > > >
> > > > > > > > > > > >
> > > > > > > > > > > > On Wed, Aug 27, 2014 at 9:52 PM, Alexis Midon <
> > > > > > > > > > > > alexis.midon@airbedandbreakfast.com <javascript:;>>
> > > wrote:
> > > > > > > > > > > >
> > > > > > > > > > > > > am I miss reading this loop:
> > > > > > > > > > > > >
> > > > > > > > > > > > >
> > > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://github.com/apache/kafka/blob/0.8.1/core/src/main/scala/kafka/log/Log.scala#L265-L269
> > > > > > > > > > > > >
> > > > > > > > > > > > > it seems like all messages from `validMessages`
> > (which
> > > is
> > > > > > > > > > > > > ByteBufferMessageSet) are NOT appended if one of
> the
> > > > > message
> > > > > > > size
> > > > > > > > > > > exceeds
> > > > > > > > > > > > > the limit.
> > > > > > > > > > > > >
> > > > > > > > > > > > > I hope I'm missing something.
> > > > > > > > > > > > >
> > > > > > > > > > > > >
> > > > > > > > > > > > >
> > > > > > > > > > > > > On Wed, Aug 27, 2014 at 9:38 PM, Alexis Midon <
> > > > > > > > > > > > > alexis.midon@airbedandbreakfast.com
> <javascript:;>>
> > > > wrote:
> > > > > > > > > > > > >
> > > > > > > > > > > > > > Hi Jun,
> > > > > > > > > > > > > >
> > > > > > > > > > > > > > thanks for you answer.
> > > > > > > > > > > > > > Unfortunately the size won't help much, I'd like
> to
> > > see
> > > > > the
> > > > > > > > > actual
> > > > > > > > > > > > > message
> > > > > > > > > > > > > > data.
> > > > > > > > > > > > > >
> > > > > > > > > > > > > > By the way what are the things to consider when
> > > > deciding
> > > > > on
> > > > > > > > > > > > > > `message.max.bytes` value?
> > > > > > > > > > > > > >
> > > > > > > > > > > > > >
> > > > > > > > > > > > > >
> > > > > > > > > > > > > >
> > > > > > > > > > > > > >
> > > > > > > > > > > > > >
> > > > > > > > > > > > > > On Wed, Aug 27, 2014 at 9:06 PM, Jun Rao <
> > > > > junrao@gmail.com
> > > > > > > > > > <javascript:;>> wrote:
> > > > > > > > > > > > > >
> > > > > > > > > > > > > >> The message size check is currently only done on
> > the
> > > > > > broker.
> > > > > > > > If
> > > > > > > > > > you
> > > > > > > > > > > > > enable
> > > > > > > > > > > > > >> trace level logging in RequestChannel, you will
> > see
> > > > the
> > > > > > > > produce
> > > > > > > > > > > > request,
> > > > > > > > > > > > > >> which includes the size of each partition.
> > > > > > > > > > > > > >>
> > > > > > > > > > > > > >> Thanks,
> > > > > > > > > > > > > >>
> > > > > > > > > > > > > >> Jun
> > > > > > > > > > > > > >>
> > > > > > > > > > > > > >>
> > > > > > > > > > > > > >> On Wed, Aug 27, 2014 at 4:40 PM, Alexis Midon <
> > > > > > > > > > > > > >> alexis.midon@airbedandbreakfast.com
> > <javascript:;>>
> > > > > > wrote:
> > > > > > > > > > > > > >>
> > > > > > > > > > > > > >> > Hello,
> > > > > > > > > > > > > >> >
> > > > > > > > > > > > > >> > my brokers are reporting that some received
> > > messages
> > > > > > > exceed
> > > > > > > > > the
> > > > > > > > > > > > > >> > `message.max.bytes` value.
> > > > > > > > > > > > > >> > I'd like to know what producers are at fault
> but
> > > It
> > > > is
> > > > > > > > pretty
> > > > > > > > > > much
> > > > > > > > > > > > > >> > impossible:
> > > > > > > > > > > > > >> > - the brokers don't log the content of the
> > > rejected
> > > > > > > messages
> > > > > > > > > > > > > >> > - the log messages do not contain the IP of
> the
> > > > > > producers
> > > > > > > > > > > > > >> > - on the consumer side, no exception is thrown
> > > > (afaik
> > > > > it
> > > > > > > is
> > > > > > > > > > > because
> > > > > > > > > > > > > >> Ack-0
> > > > > > > > > > > > > >> > is used). The only kind of notification is to
> > > closed
> > > > > the
> > > > > > > > > > > connection.
> > > > > > > > > > > > > >> >
> > > > > > > > > > > > > >> > [1] Do you have any suggestions to track down
> > the
> > > > > guilty
> > > > > > > > > > producers
> > > > > > > > > > > > or
> > > > > > > > > > > > > >> find
> > > > > > > > > > > > > >> > out the message content?
> > > > > > > > > > > > > >> >
> > > > > > > > > > > > > >> > Even though it makes total sense to have the
> > limit
> > > > > > defined
> > > > > > > > and
> > > > > > > > > > > > applied
> > > > > > > > > > > > > >> on
> > > > > > > > > > > > > >> > the brokers, I was thinking that this check
> > could
> > > > also
> > > > > > be
> > > > > > > > > > applied
> > > > > > > > > > > by
> > > > > > > > > > > > > the
> > > > > > > > > > > > > >> > producers. Some google results suggest that
> > > > > > > > > `message.max.bytes`
> > > > > > > > > > > > might
> > > > > > > > > > > > > be
> > > > > > > > > > > > > >> > used by the producers but I can't find any
> trace
> > > of
> > > > > that
> > > > > > > > > > behavior
> > > > > > > > > > > in
> > > > > > > > > > > > > the
> > > > > > > > > > > > > >> > code.
> > > > > > > > > > > > > >> >
> > > > > > > > > > > > > >> > The closest thing I have is
> > > > > > > > > > > > > >> >
> > > > > > > > > > > > > >> >
> > > > > > > > > > > > > >>
> > > > > > > > > > > > >
> > > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://github.com/apache/kafka/blob/0.8.1/core/src/main/scala/kafka/producer/SyncProducer.scala#L67
> > > > > > > > > > > > > >> > but it simply logs the message size and
> content
> > > and
> > > > > the
> > > > > > > log
> > > > > > > > > > level
> > > > > > > > > > > is
> > > > > > > > > > > > > >> trace.
> > > > > > > > > > > > > >> >
> > > > > > > > > > > > > >> > [2] could you please confirm if such a
> > > producer-side
> > > > > > check
> > > > > > > > > > exists?
> > > > > > > > > > > > > >> >
> > > > > > > > > > > > > >> >
> > > > > > > > > > > > > >> > thanks!
> > > > > > > > > > > > > >> >
> > > > > > > > > > > > > >> > Alexis
> > > > > > > > > > > > > >> >
> > > > > > > > > > > > > >>
> > > > > > > > > > > > > >
> > > > > > > > > > > > > >
> > > > > > > > > > > > >
> > > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
>

Re: message size limit

Posted by Jun Rao <ju...@gmail.com>.
Actually, with the new producer, you can configure the batch size in bytes.
If you set the batch size to be smaller than the max message size, messages
exceeding the max message limit will be in its own batch. Then, only one
message will be rejected by the broker.

Thanks,

Jun

On Tue, Sep 9, 2014 at 5:51 PM, Bhavesh Mistry <mi...@gmail.com>
wrote:

> Hi Jun,
>
> Is there any plug-ability that Developer can customize batching logic or
> inject custom code for this ? Shall I file Jira for this issues.
>
> Thanks,
>
> Bhavesh
>
> On Tue, Sep 9, 2014 at 3:52 PM, Jun Rao <ju...@gmail.com> wrote:
>
> > No, the new producer doesn't address that problem.
> >
> > Thanks,
> >
> > Jun
> >
> > On Tue, Sep 9, 2014 at 12:59 PM, Bhavesh Mistry <
> > mistry.p.bhavesh@gmail.com>
> > wrote:
> >
> > > HI Jun,
> > >
> > > Thanks for clarification.  Follow up questions, does new producer solve
> > the
> > > issues highlight.  In event of compression and async mode in new
> > producer,
> > > will it break down messages to this UPPER limit and submit or new
> > producer
> > > strictly honor batch size.  I am just asking if compression batch size
> > > message reaches this configured limit, does batch broken down to sub
> > batch
> > > that is within limit.  I would like to minimize the data loss due to
> this
> > > limit.
> > >
> > >
> > > Thanks,
> > >
> > > Bhavesh
> > >
> > > On Mon, Sep 8, 2014 at 9:17 PM, Jun Rao <ju...@gmail.com> wrote:
> > >
> > > > Are you using compression in the producer? If so, message.max.bytes
> > > applies
> > > > to the compressed size of a batch of messages. Otherwise,
> > > message.max.bytes
> > > > applies to the size of each individual message.
> > > >
> > > > Thanks,
> > > >
> > > > Jun
> > > >
> > > > On Wed, Sep 3, 2014 at 3:25 PM, Bhavesh Mistry <
> > > mistry.p.bhavesh@gmail.com
> > > > >
> > > > wrote:
> > > >
> > > > > I am referring to wiki
> http://kafka.apache.org/08/configuration.html
> > > and
> > > > > following parameter control max batch message bytes as far as I
> know.
> > > > > Kafka Community, please correct me if I am wrong.  I do not want to
> > > > create
> > > > > confusion for Kafka User Community here.   Also, if you increase
> this
> > > > limit
> > > > > than you have to set the corresponding limit increase on consumer
> > side
> > > as
> > > > > well (fetch.message.max.bytes).
> > > > >
> > > > > Since we are using batch async mode, our messages are getting drop
> > > > sometime
> > > > > if the entire batch bytes  exceed this limit so I was asking Kafka
> > > > > Developers if any optimal way to determine the batch size based on
> > this
> > > > > limit to minimize the data loss. Because, entire batch is rejected
> by
> > > > > brokers.
> > > > >
> > > > > message.max.bytes 1000000 The maximum size of a message that the
> > server
> > > > can
> > > > > receive. It is important that this property be in sync with the
> > maximum
> > > > > fetch size your consumers use or else an unruly producer will be
> able
> > > to
> > > > > publish messages too large for consumers to consume.
> > > > >
> > > > > Thanks,
> > > > >
> > > > > Bhavesh
> > > > >
> > > > >
> > > > > On Wed, Sep 3, 2014 at 2:59 PM, Alexis Midon <
> > > > > alexis.midon@airbedandbreakfast.com> wrote:
> > > > >
> > > > > > Hi Bhavesh
> > > > > >
> > > > > > can you explain what limit you're referring to?
> > > > > > I'm asking because `message.max.bytes` is applied per message not
> > per
> > > > > > batch.
> > > > > > is there another limit I should be aware of?
> > > > > >
> > > > > > thanks
> > > > > >
> > > > > >
> > > > > > On Wed, Sep 3, 2014 at 2:07 PM, Bhavesh Mistry <
> > > > > mistry.p.bhavesh@gmail.com
> > > > > > >
> > > > > > wrote:
> > > > > >
> > > > > > > Hi Jun,
> > > > > > >
> > > > > > > We have similar problem.  We have variable length of messages.
> > So
> > > > when
> > > > > > we
> > > > > > > have fixed size of Batch sometime the batch exceed the limit
> set
> > on
> > > > the
> > > > > > > brokers (2MB).
> > > > > > >
> > > > > > > So can Producer have some extra logic to determine the optimal
> > > batch
> > > > > size
> > > > > > > by looking at configured message.max.bytes  value.
> > > > > > >
> > > > > > > During the metadata update, Producer will get this value from
> the
> > > > > Broker
> > > > > > > for each topic and Producer will check if current batch size
> > reach
> > > > this
> > > > > > > limit than break batch into smaller chunk such way that It
> would
> > > not
> > > > > > exceed
> > > > > > > limit (unless single message exceed the limit). Basically try
> to
> > > > avoid
> > > > > > data
> > > > > > > loss as much as possible.
> > > > > > >
> > > > > > > Please let me know what is your opinion on this...
> > > > > > >
> > > > > > > Thanks,
> > > > > > >
> > > > > > > Bhavesh
> > > > > > >
> > > > > > >
> > > > > > > On Wed, Sep 3, 2014 at 6:21 AM, Alexis Midon <
> > > > > > > alexis.midon@airbedandbreakfast.com> wrote:
> > > > > > >
> > > > > > > > Thanks Jun.
> > > > > > > >
> > > > > > > > I'll create a jira and try to provide a patch. I think this
> is
> > > > pretty
> > > > > > > > serious.
> > > > > > > >
> > > > > > > > On Friday, August 29, 2014, Jun Rao <ju...@gmail.com>
> wrote:
> > > > > > > >
> > > > > > > > > The goal of batching is mostly to reduce the # RPC calls to
> > the
> > > > > > broker.
> > > > > > > > If
> > > > > > > > > compression is enabled, a larger batch typically implies
> > better
> > > > > > > > compression
> > > > > > > > > ratio.
> > > > > > > > >
> > > > > > > > > The reason that we have to fail the whole batch is that the
> > > error
> > > > > > code
> > > > > > > in
> > > > > > > > > the produce response is per partition, instead of per
> > message.
> > > > > > > > >
> > > > > > > > > Retrying individual messages on MessageSizeTooLarge seems
> > > > > reasonable.
> > > > > > > > >
> > > > > > > > > Thanks,
> > > > > > > > >
> > > > > > > > > Jun
> > > > > > > > >
> > > > > > > > >
> > > > > > > > > On Fri, Aug 29, 2014 at 4:28 PM, Alexis Midon <
> > > > > > > > > alexis.midon@airbedandbreakfast.com <javascript:;>> wrote:
> > > > > > > > >
> > > > > > > > > > Could you explain the goals of batches? I was assuming
> this
> > > was
> > > > > > > simply
> > > > > > > > a
> > > > > > > > > > performance optimization, but this behavior makes me
> think
> > > I'm
> > > > > > > missing
> > > > > > > > > > something.
> > > > > > > > > > is a batch more than a list of *independent* messages?
> > > > > > > > > >
> > > > > > > > > > Why would you reject the whole batch? One invalid message
> > > > causes
> > > > > > the
> > > > > > > > loss
> > > > > > > > > > of batch.num.messages-1 messages :(
> > > > > > > > > > It seems pretty critical to me.
> > > > > > > > > >
> > > > > > > > > > If ack=0, the producer will never know about it.
> > > > > > > > > > If ack !=0, the producer will retry the whole batch. If
> the
> > > > issue
> > > > > > was
> > > > > > > > > > related to data corruption (etc), retries might work. But
> > in
> > > > the
> > > > > > case
> > > > > > > > of
> > > > > > > > > > "big message", the batch will always be rejected and the
> > > > producer
> > > > > > > will
> > > > > > > > > give
> > > > > > > > > > up.
> > > > > > > > > >
> > > > > > > > > > If the messages are indeed considered independent, I
> think
> > > this
> > > > > is
> > > > > > a
> > > > > > > > > pretty
> > > > > > > > > > serious issue.
> > > > > > > > > >
> > > > > > > > > > I see 2 possible fix approaches:
> > > > > > > > > > - the broker could reject only the invalid messages
> > > > > > > > > > - the broker could reject the whole batch (like today)
> but
> > > the
> > > > > > > producer
> > > > > > > > > (if
> > > > > > > > > > ack!=0) could retry messages one at a time on exception
> > like
> > > > > > > > > > "MessageSizeTooLarge".
> > > > > > > > > >
> > > > > > > > > > opinions?
> > > > > > > > > >
> > > > > > > > > > Alexis
> > > > > > > > > >
> > > > > > > > > > ```
> > > > > > > > > > [2014-08-29 16:00:35,170] WARN Produce request with
> > > correlation
> > > > > id
> > > > > > 46
> > > > > > > > > > failed due to [test,1]:
> > > > kafka.common.MessageSizeTooLargeException
> > > > > > > > > > (kafka.producer.async.DefaultEventHandler)
> > > > > > > > > > [2014-08-29 16:00:35,284] WARN Produce request with
> > > correlation
> > > > > id
> > > > > > 51
> > > > > > > > > > failed due to [test,0]:
> > > > kafka.common.MessageSizeTooLargeException
> > > > > > > > > > (kafka.producer.async.DefaultEventHandler)
> > > > > > > > > > [2014-08-29 16:00:35,392] WARN Produce request with
> > > correlation
> > > > > id
> > > > > > 56
> > > > > > > > > > failed due to [test,0]:
> > > > kafka.common.MessageSizeTooLargeException
> > > > > > > > > > (kafka.producer.async.DefaultEventHandler)
> > > > > > > > > > [2014-08-29 16:00:35,499] WARN Produce request with
> > > correlation
> > > > > id
> > > > > > 61
> > > > > > > > > > failed due to [test,1]:
> > > > kafka.common.MessageSizeTooLargeException
> > > > > > > > > > (kafka.producer.async.DefaultEventHandler)
> > > > > > > > > > [2014-08-29 16:00:35,603] ERROR Failed to send requests
> for
> > > > > topics
> > > > > > > test
> > > > > > > > > > with correlation ids in [43,62]
> > > > > > > > > (kafka.producer.async.DefaultEventHandler)
> > > > > > > > > > [2014-08-29 16:00:35,603] ERROR Error in handling batch
> of
> > 3
> > > > > events
> > > > > > > > > > (kafka.producer.async.ProducerSendThread)
> > > > > > > > > > kafka.common.FailedToSendMessageException: Failed to send
> > > > > messages
> > > > > > > > after
> > > > > > > > > 3
> > > > > > > > > > tries.
> > > > > > > > > > at
> > > > > > > > > >
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:90)
> > > > > > > > > >  at
> > > > > > > > > >
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.scala:104)
> > > > > > > > > > at
> > > > > > > > > >
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:87)
> > > > > > > > > >  at
> > > > > > > > > >
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:67)
> > > > > > > > > > ```
> > > > > > > > > >
> > > > > > > > > >
> > > > > > > > > > On Thu, Aug 28, 2014 at 7:13 AM, Jun Rao <
> junrao@gmail.com
> > > > > > > > > <javascript:;>> wrote:
> > > > > > > > > >
> > > > > > > > > > > That's right. If one message in a batch exceeds the
> size
> > > > limit,
> > > > > > the
> > > > > > > > > whole
> > > > > > > > > > > batch is rejected.
> > > > > > > > > > >
> > > > > > > > > > > When determining message.max.bytes, the most important
> > > thing
> > > > to
> > > > > > > > > consider
> > > > > > > > > > is
> > > > > > > > > > > probably memory since currently we need to allocate
> > memory
> > > > for
> > > > > a
> > > > > > > full
> > > > > > > > > > > message in the broker and the producer and the consumer
> > > > client.
> > > > > > > > > > >
> > > > > > > > > > > Thanks,
> > > > > > > > > > >
> > > > > > > > > > > Jun
> > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > > > On Wed, Aug 27, 2014 at 9:52 PM, Alexis Midon <
> > > > > > > > > > > alexis.midon@airbedandbreakfast.com <javascript:;>>
> > wrote:
> > > > > > > > > > >
> > > > > > > > > > > > am I miss reading this loop:
> > > > > > > > > > > >
> > > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://github.com/apache/kafka/blob/0.8.1/core/src/main/scala/kafka/log/Log.scala#L265-L269
> > > > > > > > > > > >
> > > > > > > > > > > > it seems like all messages from `validMessages`
> (which
> > is
> > > > > > > > > > > > ByteBufferMessageSet) are NOT appended if one of the
> > > > message
> > > > > > size
> > > > > > > > > > exceeds
> > > > > > > > > > > > the limit.
> > > > > > > > > > > >
> > > > > > > > > > > > I hope I'm missing something.
> > > > > > > > > > > >
> > > > > > > > > > > >
> > > > > > > > > > > >
> > > > > > > > > > > > On Wed, Aug 27, 2014 at 9:38 PM, Alexis Midon <
> > > > > > > > > > > > alexis.midon@airbedandbreakfast.com <javascript:;>>
> > > wrote:
> > > > > > > > > > > >
> > > > > > > > > > > > > Hi Jun,
> > > > > > > > > > > > >
> > > > > > > > > > > > > thanks for you answer.
> > > > > > > > > > > > > Unfortunately the size won't help much, I'd like to
> > see
> > > > the
> > > > > > > > actual
> > > > > > > > > > > > message
> > > > > > > > > > > > > data.
> > > > > > > > > > > > >
> > > > > > > > > > > > > By the way what are the things to consider when
> > > deciding
> > > > on
> > > > > > > > > > > > > `message.max.bytes` value?
> > > > > > > > > > > > >
> > > > > > > > > > > > >
> > > > > > > > > > > > >
> > > > > > > > > > > > >
> > > > > > > > > > > > >
> > > > > > > > > > > > >
> > > > > > > > > > > > > On Wed, Aug 27, 2014 at 9:06 PM, Jun Rao <
> > > > junrao@gmail.com
> > > > > > > > > <javascript:;>> wrote:
> > > > > > > > > > > > >
> > > > > > > > > > > > >> The message size check is currently only done on
> the
> > > > > broker.
> > > > > > > If
> > > > > > > > > you
> > > > > > > > > > > > enable
> > > > > > > > > > > > >> trace level logging in RequestChannel, you will
> see
> > > the
> > > > > > > produce
> > > > > > > > > > > request,
> > > > > > > > > > > > >> which includes the size of each partition.
> > > > > > > > > > > > >>
> > > > > > > > > > > > >> Thanks,
> > > > > > > > > > > > >>
> > > > > > > > > > > > >> Jun
> > > > > > > > > > > > >>
> > > > > > > > > > > > >>
> > > > > > > > > > > > >> On Wed, Aug 27, 2014 at 4:40 PM, Alexis Midon <
> > > > > > > > > > > > >> alexis.midon@airbedandbreakfast.com
> <javascript:;>>
> > > > > wrote:
> > > > > > > > > > > > >>
> > > > > > > > > > > > >> > Hello,
> > > > > > > > > > > > >> >
> > > > > > > > > > > > >> > my brokers are reporting that some received
> > messages
> > > > > > exceed
> > > > > > > > the
> > > > > > > > > > > > >> > `message.max.bytes` value.
> > > > > > > > > > > > >> > I'd like to know what producers are at fault but
> > It
> > > is
> > > > > > > pretty
> > > > > > > > > much
> > > > > > > > > > > > >> > impossible:
> > > > > > > > > > > > >> > - the brokers don't log the content of the
> > rejected
> > > > > > messages
> > > > > > > > > > > > >> > - the log messages do not contain the IP of the
> > > > > producers
> > > > > > > > > > > > >> > - on the consumer side, no exception is thrown
> > > (afaik
> > > > it
> > > > > > is
> > > > > > > > > > because
> > > > > > > > > > > > >> Ack-0
> > > > > > > > > > > > >> > is used). The only kind of notification is to
> > closed
> > > > the
> > > > > > > > > > connection.
> > > > > > > > > > > > >> >
> > > > > > > > > > > > >> > [1] Do you have any suggestions to track down
> the
> > > > guilty
> > > > > > > > > producers
> > > > > > > > > > > or
> > > > > > > > > > > > >> find
> > > > > > > > > > > > >> > out the message content?
> > > > > > > > > > > > >> >
> > > > > > > > > > > > >> > Even though it makes total sense to have the
> limit
> > > > > defined
> > > > > > > and
> > > > > > > > > > > applied
> > > > > > > > > > > > >> on
> > > > > > > > > > > > >> > the brokers, I was thinking that this check
> could
> > > also
> > > > > be
> > > > > > > > > applied
> > > > > > > > > > by
> > > > > > > > > > > > the
> > > > > > > > > > > > >> > producers. Some google results suggest that
> > > > > > > > `message.max.bytes`
> > > > > > > > > > > might
> > > > > > > > > > > > be
> > > > > > > > > > > > >> > used by the producers but I can't find any trace
> > of
> > > > that
> > > > > > > > > behavior
> > > > > > > > > > in
> > > > > > > > > > > > the
> > > > > > > > > > > > >> > code.
> > > > > > > > > > > > >> >
> > > > > > > > > > > > >> > The closest thing I have is
> > > > > > > > > > > > >> >
> > > > > > > > > > > > >> >
> > > > > > > > > > > > >>
> > > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://github.com/apache/kafka/blob/0.8.1/core/src/main/scala/kafka/producer/SyncProducer.scala#L67
> > > > > > > > > > > > >> > but it simply logs the message size and content
> > and
> > > > the
> > > > > > log
> > > > > > > > > level
> > > > > > > > > > is
> > > > > > > > > > > > >> trace.
> > > > > > > > > > > > >> >
> > > > > > > > > > > > >> > [2] could you please confirm if such a
> > producer-side
> > > > > check
> > > > > > > > > exists?
> > > > > > > > > > > > >> >
> > > > > > > > > > > > >> >
> > > > > > > > > > > > >> > thanks!
> > > > > > > > > > > > >> >
> > > > > > > > > > > > >> > Alexis
> > > > > > > > > > > > >> >
> > > > > > > > > > > > >>
> > > > > > > > > > > > >
> > > > > > > > > > > > >
> > > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
>

Re: message size limit

Posted by Bhavesh Mistry <mi...@gmail.com>.
Hi Jun,

Is there any plug-ability that Developer can customize batching logic or
inject custom code for this ? Shall I file Jira for this issues.

Thanks,

Bhavesh

On Tue, Sep 9, 2014 at 3:52 PM, Jun Rao <ju...@gmail.com> wrote:

> No, the new producer doesn't address that problem.
>
> Thanks,
>
> Jun
>
> On Tue, Sep 9, 2014 at 12:59 PM, Bhavesh Mistry <
> mistry.p.bhavesh@gmail.com>
> wrote:
>
> > HI Jun,
> >
> > Thanks for clarification.  Follow up questions, does new producer solve
> the
> > issues highlight.  In event of compression and async mode in new
> producer,
> > will it break down messages to this UPPER limit and submit or new
> producer
> > strictly honor batch size.  I am just asking if compression batch size
> > message reaches this configured limit, does batch broken down to sub
> batch
> > that is within limit.  I would like to minimize the data loss due to this
> > limit.
> >
> >
> > Thanks,
> >
> > Bhavesh
> >
> > On Mon, Sep 8, 2014 at 9:17 PM, Jun Rao <ju...@gmail.com> wrote:
> >
> > > Are you using compression in the producer? If so, message.max.bytes
> > applies
> > > to the compressed size of a batch of messages. Otherwise,
> > message.max.bytes
> > > applies to the size of each individual message.
> > >
> > > Thanks,
> > >
> > > Jun
> > >
> > > On Wed, Sep 3, 2014 at 3:25 PM, Bhavesh Mistry <
> > mistry.p.bhavesh@gmail.com
> > > >
> > > wrote:
> > >
> > > > I am referring to wiki http://kafka.apache.org/08/configuration.html
> > and
> > > > following parameter control max batch message bytes as far as I know.
> > > > Kafka Community, please correct me if I am wrong.  I do not want to
> > > create
> > > > confusion for Kafka User Community here.   Also, if you increase this
> > > limit
> > > > than you have to set the corresponding limit increase on consumer
> side
> > as
> > > > well (fetch.message.max.bytes).
> > > >
> > > > Since we are using batch async mode, our messages are getting drop
> > > sometime
> > > > if the entire batch bytes  exceed this limit so I was asking Kafka
> > > > Developers if any optimal way to determine the batch size based on
> this
> > > > limit to minimize the data loss. Because, entire batch is rejected by
> > > > brokers.
> > > >
> > > > message.max.bytes 1000000 The maximum size of a message that the
> server
> > > can
> > > > receive. It is important that this property be in sync with the
> maximum
> > > > fetch size your consumers use or else an unruly producer will be able
> > to
> > > > publish messages too large for consumers to consume.
> > > >
> > > > Thanks,
> > > >
> > > > Bhavesh
> > > >
> > > >
> > > > On Wed, Sep 3, 2014 at 2:59 PM, Alexis Midon <
> > > > alexis.midon@airbedandbreakfast.com> wrote:
> > > >
> > > > > Hi Bhavesh
> > > > >
> > > > > can you explain what limit you're referring to?
> > > > > I'm asking because `message.max.bytes` is applied per message not
> per
> > > > > batch.
> > > > > is there another limit I should be aware of?
> > > > >
> > > > > thanks
> > > > >
> > > > >
> > > > > On Wed, Sep 3, 2014 at 2:07 PM, Bhavesh Mistry <
> > > > mistry.p.bhavesh@gmail.com
> > > > > >
> > > > > wrote:
> > > > >
> > > > > > Hi Jun,
> > > > > >
> > > > > > We have similar problem.  We have variable length of messages.
> So
> > > when
> > > > > we
> > > > > > have fixed size of Batch sometime the batch exceed the limit set
> on
> > > the
> > > > > > brokers (2MB).
> > > > > >
> > > > > > So can Producer have some extra logic to determine the optimal
> > batch
> > > > size
> > > > > > by looking at configured message.max.bytes  value.
> > > > > >
> > > > > > During the metadata update, Producer will get this value from the
> > > > Broker
> > > > > > for each topic and Producer will check if current batch size
> reach
> > > this
> > > > > > limit than break batch into smaller chunk such way that It would
> > not
> > > > > exceed
> > > > > > limit (unless single message exceed the limit). Basically try to
> > > avoid
> > > > > data
> > > > > > loss as much as possible.
> > > > > >
> > > > > > Please let me know what is your opinion on this...
> > > > > >
> > > > > > Thanks,
> > > > > >
> > > > > > Bhavesh
> > > > > >
> > > > > >
> > > > > > On Wed, Sep 3, 2014 at 6:21 AM, Alexis Midon <
> > > > > > alexis.midon@airbedandbreakfast.com> wrote:
> > > > > >
> > > > > > > Thanks Jun.
> > > > > > >
> > > > > > > I'll create a jira and try to provide a patch. I think this is
> > > pretty
> > > > > > > serious.
> > > > > > >
> > > > > > > On Friday, August 29, 2014, Jun Rao <ju...@gmail.com> wrote:
> > > > > > >
> > > > > > > > The goal of batching is mostly to reduce the # RPC calls to
> the
> > > > > broker.
> > > > > > > If
> > > > > > > > compression is enabled, a larger batch typically implies
> better
> > > > > > > compression
> > > > > > > > ratio.
> > > > > > > >
> > > > > > > > The reason that we have to fail the whole batch is that the
> > error
> > > > > code
> > > > > > in
> > > > > > > > the produce response is per partition, instead of per
> message.
> > > > > > > >
> > > > > > > > Retrying individual messages on MessageSizeTooLarge seems
> > > > reasonable.
> > > > > > > >
> > > > > > > > Thanks,
> > > > > > > >
> > > > > > > > Jun
> > > > > > > >
> > > > > > > >
> > > > > > > > On Fri, Aug 29, 2014 at 4:28 PM, Alexis Midon <
> > > > > > > > alexis.midon@airbedandbreakfast.com <javascript:;>> wrote:
> > > > > > > >
> > > > > > > > > Could you explain the goals of batches? I was assuming this
> > was
> > > > > > simply
> > > > > > > a
> > > > > > > > > performance optimization, but this behavior makes me think
> > I'm
> > > > > > missing
> > > > > > > > > something.
> > > > > > > > > is a batch more than a list of *independent* messages?
> > > > > > > > >
> > > > > > > > > Why would you reject the whole batch? One invalid message
> > > causes
> > > > > the
> > > > > > > loss
> > > > > > > > > of batch.num.messages-1 messages :(
> > > > > > > > > It seems pretty critical to me.
> > > > > > > > >
> > > > > > > > > If ack=0, the producer will never know about it.
> > > > > > > > > If ack !=0, the producer will retry the whole batch. If the
> > > issue
> > > > > was
> > > > > > > > > related to data corruption (etc), retries might work. But
> in
> > > the
> > > > > case
> > > > > > > of
> > > > > > > > > "big message", the batch will always be rejected and the
> > > producer
> > > > > > will
> > > > > > > > give
> > > > > > > > > up.
> > > > > > > > >
> > > > > > > > > If the messages are indeed considered independent, I think
> > this
> > > > is
> > > > > a
> > > > > > > > pretty
> > > > > > > > > serious issue.
> > > > > > > > >
> > > > > > > > > I see 2 possible fix approaches:
> > > > > > > > > - the broker could reject only the invalid messages
> > > > > > > > > - the broker could reject the whole batch (like today) but
> > the
> > > > > > producer
> > > > > > > > (if
> > > > > > > > > ack!=0) could retry messages one at a time on exception
> like
> > > > > > > > > "MessageSizeTooLarge".
> > > > > > > > >
> > > > > > > > > opinions?
> > > > > > > > >
> > > > > > > > > Alexis
> > > > > > > > >
> > > > > > > > > ```
> > > > > > > > > [2014-08-29 16:00:35,170] WARN Produce request with
> > correlation
> > > > id
> > > > > 46
> > > > > > > > > failed due to [test,1]:
> > > kafka.common.MessageSizeTooLargeException
> > > > > > > > > (kafka.producer.async.DefaultEventHandler)
> > > > > > > > > [2014-08-29 16:00:35,284] WARN Produce request with
> > correlation
> > > > id
> > > > > 51
> > > > > > > > > failed due to [test,0]:
> > > kafka.common.MessageSizeTooLargeException
> > > > > > > > > (kafka.producer.async.DefaultEventHandler)
> > > > > > > > > [2014-08-29 16:00:35,392] WARN Produce request with
> > correlation
> > > > id
> > > > > 56
> > > > > > > > > failed due to [test,0]:
> > > kafka.common.MessageSizeTooLargeException
> > > > > > > > > (kafka.producer.async.DefaultEventHandler)
> > > > > > > > > [2014-08-29 16:00:35,499] WARN Produce request with
> > correlation
> > > > id
> > > > > 61
> > > > > > > > > failed due to [test,1]:
> > > kafka.common.MessageSizeTooLargeException
> > > > > > > > > (kafka.producer.async.DefaultEventHandler)
> > > > > > > > > [2014-08-29 16:00:35,603] ERROR Failed to send requests for
> > > > topics
> > > > > > test
> > > > > > > > > with correlation ids in [43,62]
> > > > > > > > (kafka.producer.async.DefaultEventHandler)
> > > > > > > > > [2014-08-29 16:00:35,603] ERROR Error in handling batch of
> 3
> > > > events
> > > > > > > > > (kafka.producer.async.ProducerSendThread)
> > > > > > > > > kafka.common.FailedToSendMessageException: Failed to send
> > > > messages
> > > > > > > after
> > > > > > > > 3
> > > > > > > > > tries.
> > > > > > > > > at
> > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:90)
> > > > > > > > >  at
> > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.scala:104)
> > > > > > > > > at
> > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:87)
> > > > > > > > >  at
> > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:67)
> > > > > > > > > ```
> > > > > > > > >
> > > > > > > > >
> > > > > > > > > On Thu, Aug 28, 2014 at 7:13 AM, Jun Rao <junrao@gmail.com
> > > > > > > > <javascript:;>> wrote:
> > > > > > > > >
> > > > > > > > > > That's right. If one message in a batch exceeds the size
> > > limit,
> > > > > the
> > > > > > > > whole
> > > > > > > > > > batch is rejected.
> > > > > > > > > >
> > > > > > > > > > When determining message.max.bytes, the most important
> > thing
> > > to
> > > > > > > > consider
> > > > > > > > > is
> > > > > > > > > > probably memory since currently we need to allocate
> memory
> > > for
> > > > a
> > > > > > full
> > > > > > > > > > message in the broker and the producer and the consumer
> > > client.
> > > > > > > > > >
> > > > > > > > > > Thanks,
> > > > > > > > > >
> > > > > > > > > > Jun
> > > > > > > > > >
> > > > > > > > > >
> > > > > > > > > > On Wed, Aug 27, 2014 at 9:52 PM, Alexis Midon <
> > > > > > > > > > alexis.midon@airbedandbreakfast.com <javascript:;>>
> wrote:
> > > > > > > > > >
> > > > > > > > > > > am I miss reading this loop:
> > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://github.com/apache/kafka/blob/0.8.1/core/src/main/scala/kafka/log/Log.scala#L265-L269
> > > > > > > > > > >
> > > > > > > > > > > it seems like all messages from `validMessages` (which
> is
> > > > > > > > > > > ByteBufferMessageSet) are NOT appended if one of the
> > > message
> > > > > size
> > > > > > > > > exceeds
> > > > > > > > > > > the limit.
> > > > > > > > > > >
> > > > > > > > > > > I hope I'm missing something.
> > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > > > On Wed, Aug 27, 2014 at 9:38 PM, Alexis Midon <
> > > > > > > > > > > alexis.midon@airbedandbreakfast.com <javascript:;>>
> > wrote:
> > > > > > > > > > >
> > > > > > > > > > > > Hi Jun,
> > > > > > > > > > > >
> > > > > > > > > > > > thanks for you answer.
> > > > > > > > > > > > Unfortunately the size won't help much, I'd like to
> see
> > > the
> > > > > > > actual
> > > > > > > > > > > message
> > > > > > > > > > > > data.
> > > > > > > > > > > >
> > > > > > > > > > > > By the way what are the things to consider when
> > deciding
> > > on
> > > > > > > > > > > > `message.max.bytes` value?
> > > > > > > > > > > >
> > > > > > > > > > > >
> > > > > > > > > > > >
> > > > > > > > > > > >
> > > > > > > > > > > >
> > > > > > > > > > > >
> > > > > > > > > > > > On Wed, Aug 27, 2014 at 9:06 PM, Jun Rao <
> > > junrao@gmail.com
> > > > > > > > <javascript:;>> wrote:
> > > > > > > > > > > >
> > > > > > > > > > > >> The message size check is currently only done on the
> > > > broker.
> > > > > > If
> > > > > > > > you
> > > > > > > > > > > enable
> > > > > > > > > > > >> trace level logging in RequestChannel, you will see
> > the
> > > > > > produce
> > > > > > > > > > request,
> > > > > > > > > > > >> which includes the size of each partition.
> > > > > > > > > > > >>
> > > > > > > > > > > >> Thanks,
> > > > > > > > > > > >>
> > > > > > > > > > > >> Jun
> > > > > > > > > > > >>
> > > > > > > > > > > >>
> > > > > > > > > > > >> On Wed, Aug 27, 2014 at 4:40 PM, Alexis Midon <
> > > > > > > > > > > >> alexis.midon@airbedandbreakfast.com <javascript:;>>
> > > > wrote:
> > > > > > > > > > > >>
> > > > > > > > > > > >> > Hello,
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > my brokers are reporting that some received
> messages
> > > > > exceed
> > > > > > > the
> > > > > > > > > > > >> > `message.max.bytes` value.
> > > > > > > > > > > >> > I'd like to know what producers are at fault but
> It
> > is
> > > > > > pretty
> > > > > > > > much
> > > > > > > > > > > >> > impossible:
> > > > > > > > > > > >> > - the brokers don't log the content of the
> rejected
> > > > > messages
> > > > > > > > > > > >> > - the log messages do not contain the IP of the
> > > > producers
> > > > > > > > > > > >> > - on the consumer side, no exception is thrown
> > (afaik
> > > it
> > > > > is
> > > > > > > > > because
> > > > > > > > > > > >> Ack-0
> > > > > > > > > > > >> > is used). The only kind of notification is to
> closed
> > > the
> > > > > > > > > connection.
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > [1] Do you have any suggestions to track down the
> > > guilty
> > > > > > > > producers
> > > > > > > > > > or
> > > > > > > > > > > >> find
> > > > > > > > > > > >> > out the message content?
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > Even though it makes total sense to have the limit
> > > > defined
> > > > > > and
> > > > > > > > > > applied
> > > > > > > > > > > >> on
> > > > > > > > > > > >> > the brokers, I was thinking that this check could
> > also
> > > > be
> > > > > > > > applied
> > > > > > > > > by
> > > > > > > > > > > the
> > > > > > > > > > > >> > producers. Some google results suggest that
> > > > > > > `message.max.bytes`
> > > > > > > > > > might
> > > > > > > > > > > be
> > > > > > > > > > > >> > used by the producers but I can't find any trace
> of
> > > that
> > > > > > > > behavior
> > > > > > > > > in
> > > > > > > > > > > the
> > > > > > > > > > > >> > code.
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > The closest thing I have is
> > > > > > > > > > > >> >
> > > > > > > > > > > >> >
> > > > > > > > > > > >>
> > > > > > > > > > >
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://github.com/apache/kafka/blob/0.8.1/core/src/main/scala/kafka/producer/SyncProducer.scala#L67
> > > > > > > > > > > >> > but it simply logs the message size and content
> and
> > > the
> > > > > log
> > > > > > > > level
> > > > > > > > > is
> > > > > > > > > > > >> trace.
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > [2] could you please confirm if such a
> producer-side
> > > > check
> > > > > > > > exists?
> > > > > > > > > > > >> >
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > thanks!
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > Alexis
> > > > > > > > > > > >> >
> > > > > > > > > > > >>
> > > > > > > > > > > >
> > > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
>

Re: message size limit

Posted by Jun Rao <ju...@gmail.com>.
No, the new producer doesn't address that problem.

Thanks,

Jun

On Tue, Sep 9, 2014 at 12:59 PM, Bhavesh Mistry <mi...@gmail.com>
wrote:

> HI Jun,
>
> Thanks for clarification.  Follow up questions, does new producer solve the
> issues highlight.  In event of compression and async mode in new producer,
> will it break down messages to this UPPER limit and submit or new producer
> strictly honor batch size.  I am just asking if compression batch size
> message reaches this configured limit, does batch broken down to sub batch
> that is within limit.  I would like to minimize the data loss due to this
> limit.
>
>
> Thanks,
>
> Bhavesh
>
> On Mon, Sep 8, 2014 at 9:17 PM, Jun Rao <ju...@gmail.com> wrote:
>
> > Are you using compression in the producer? If so, message.max.bytes
> applies
> > to the compressed size of a batch of messages. Otherwise,
> message.max.bytes
> > applies to the size of each individual message.
> >
> > Thanks,
> >
> > Jun
> >
> > On Wed, Sep 3, 2014 at 3:25 PM, Bhavesh Mistry <
> mistry.p.bhavesh@gmail.com
> > >
> > wrote:
> >
> > > I am referring to wiki http://kafka.apache.org/08/configuration.html
> and
> > > following parameter control max batch message bytes as far as I know.
> > > Kafka Community, please correct me if I am wrong.  I do not want to
> > create
> > > confusion for Kafka User Community here.   Also, if you increase this
> > limit
> > > than you have to set the corresponding limit increase on consumer side
> as
> > > well (fetch.message.max.bytes).
> > >
> > > Since we are using batch async mode, our messages are getting drop
> > sometime
> > > if the entire batch bytes  exceed this limit so I was asking Kafka
> > > Developers if any optimal way to determine the batch size based on this
> > > limit to minimize the data loss. Because, entire batch is rejected by
> > > brokers.
> > >
> > > message.max.bytes 1000000 The maximum size of a message that the server
> > can
> > > receive. It is important that this property be in sync with the maximum
> > > fetch size your consumers use or else an unruly producer will be able
> to
> > > publish messages too large for consumers to consume.
> > >
> > > Thanks,
> > >
> > > Bhavesh
> > >
> > >
> > > On Wed, Sep 3, 2014 at 2:59 PM, Alexis Midon <
> > > alexis.midon@airbedandbreakfast.com> wrote:
> > >
> > > > Hi Bhavesh
> > > >
> > > > can you explain what limit you're referring to?
> > > > I'm asking because `message.max.bytes` is applied per message not per
> > > > batch.
> > > > is there another limit I should be aware of?
> > > >
> > > > thanks
> > > >
> > > >
> > > > On Wed, Sep 3, 2014 at 2:07 PM, Bhavesh Mistry <
> > > mistry.p.bhavesh@gmail.com
> > > > >
> > > > wrote:
> > > >
> > > > > Hi Jun,
> > > > >
> > > > > We have similar problem.  We have variable length of messages.  So
> > when
> > > > we
> > > > > have fixed size of Batch sometime the batch exceed the limit set on
> > the
> > > > > brokers (2MB).
> > > > >
> > > > > So can Producer have some extra logic to determine the optimal
> batch
> > > size
> > > > > by looking at configured message.max.bytes  value.
> > > > >
> > > > > During the metadata update, Producer will get this value from the
> > > Broker
> > > > > for each topic and Producer will check if current batch size reach
> > this
> > > > > limit than break batch into smaller chunk such way that It would
> not
> > > > exceed
> > > > > limit (unless single message exceed the limit). Basically try to
> > avoid
> > > > data
> > > > > loss as much as possible.
> > > > >
> > > > > Please let me know what is your opinion on this...
> > > > >
> > > > > Thanks,
> > > > >
> > > > > Bhavesh
> > > > >
> > > > >
> > > > > On Wed, Sep 3, 2014 at 6:21 AM, Alexis Midon <
> > > > > alexis.midon@airbedandbreakfast.com> wrote:
> > > > >
> > > > > > Thanks Jun.
> > > > > >
> > > > > > I'll create a jira and try to provide a patch. I think this is
> > pretty
> > > > > > serious.
> > > > > >
> > > > > > On Friday, August 29, 2014, Jun Rao <ju...@gmail.com> wrote:
> > > > > >
> > > > > > > The goal of batching is mostly to reduce the # RPC calls to the
> > > > broker.
> > > > > > If
> > > > > > > compression is enabled, a larger batch typically implies better
> > > > > > compression
> > > > > > > ratio.
> > > > > > >
> > > > > > > The reason that we have to fail the whole batch is that the
> error
> > > > code
> > > > > in
> > > > > > > the produce response is per partition, instead of per message.
> > > > > > >
> > > > > > > Retrying individual messages on MessageSizeTooLarge seems
> > > reasonable.
> > > > > > >
> > > > > > > Thanks,
> > > > > > >
> > > > > > > Jun
> > > > > > >
> > > > > > >
> > > > > > > On Fri, Aug 29, 2014 at 4:28 PM, Alexis Midon <
> > > > > > > alexis.midon@airbedandbreakfast.com <javascript:;>> wrote:
> > > > > > >
> > > > > > > > Could you explain the goals of batches? I was assuming this
> was
> > > > > simply
> > > > > > a
> > > > > > > > performance optimization, but this behavior makes me think
> I'm
> > > > > missing
> > > > > > > > something.
> > > > > > > > is a batch more than a list of *independent* messages?
> > > > > > > >
> > > > > > > > Why would you reject the whole batch? One invalid message
> > causes
> > > > the
> > > > > > loss
> > > > > > > > of batch.num.messages-1 messages :(
> > > > > > > > It seems pretty critical to me.
> > > > > > > >
> > > > > > > > If ack=0, the producer will never know about it.
> > > > > > > > If ack !=0, the producer will retry the whole batch. If the
> > issue
> > > > was
> > > > > > > > related to data corruption (etc), retries might work. But in
> > the
> > > > case
> > > > > > of
> > > > > > > > "big message", the batch will always be rejected and the
> > producer
> > > > > will
> > > > > > > give
> > > > > > > > up.
> > > > > > > >
> > > > > > > > If the messages are indeed considered independent, I think
> this
> > > is
> > > > a
> > > > > > > pretty
> > > > > > > > serious issue.
> > > > > > > >
> > > > > > > > I see 2 possible fix approaches:
> > > > > > > > - the broker could reject only the invalid messages
> > > > > > > > - the broker could reject the whole batch (like today) but
> the
> > > > > producer
> > > > > > > (if
> > > > > > > > ack!=0) could retry messages one at a time on exception like
> > > > > > > > "MessageSizeTooLarge".
> > > > > > > >
> > > > > > > > opinions?
> > > > > > > >
> > > > > > > > Alexis
> > > > > > > >
> > > > > > > > ```
> > > > > > > > [2014-08-29 16:00:35,170] WARN Produce request with
> correlation
> > > id
> > > > 46
> > > > > > > > failed due to [test,1]:
> > kafka.common.MessageSizeTooLargeException
> > > > > > > > (kafka.producer.async.DefaultEventHandler)
> > > > > > > > [2014-08-29 16:00:35,284] WARN Produce request with
> correlation
> > > id
> > > > 51
> > > > > > > > failed due to [test,0]:
> > kafka.common.MessageSizeTooLargeException
> > > > > > > > (kafka.producer.async.DefaultEventHandler)
> > > > > > > > [2014-08-29 16:00:35,392] WARN Produce request with
> correlation
> > > id
> > > > 56
> > > > > > > > failed due to [test,0]:
> > kafka.common.MessageSizeTooLargeException
> > > > > > > > (kafka.producer.async.DefaultEventHandler)
> > > > > > > > [2014-08-29 16:00:35,499] WARN Produce request with
> correlation
> > > id
> > > > 61
> > > > > > > > failed due to [test,1]:
> > kafka.common.MessageSizeTooLargeException
> > > > > > > > (kafka.producer.async.DefaultEventHandler)
> > > > > > > > [2014-08-29 16:00:35,603] ERROR Failed to send requests for
> > > topics
> > > > > test
> > > > > > > > with correlation ids in [43,62]
> > > > > > > (kafka.producer.async.DefaultEventHandler)
> > > > > > > > [2014-08-29 16:00:35,603] ERROR Error in handling batch of 3
> > > events
> > > > > > > > (kafka.producer.async.ProducerSendThread)
> > > > > > > > kafka.common.FailedToSendMessageException: Failed to send
> > > messages
> > > > > > after
> > > > > > > 3
> > > > > > > > tries.
> > > > > > > > at
> > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:90)
> > > > > > > >  at
> > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.scala:104)
> > > > > > > > at
> > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:87)
> > > > > > > >  at
> > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:67)
> > > > > > > > ```
> > > > > > > >
> > > > > > > >
> > > > > > > > On Thu, Aug 28, 2014 at 7:13 AM, Jun Rao <junrao@gmail.com
> > > > > > > <javascript:;>> wrote:
> > > > > > > >
> > > > > > > > > That's right. If one message in a batch exceeds the size
> > limit,
> > > > the
> > > > > > > whole
> > > > > > > > > batch is rejected.
> > > > > > > > >
> > > > > > > > > When determining message.max.bytes, the most important
> thing
> > to
> > > > > > > consider
> > > > > > > > is
> > > > > > > > > probably memory since currently we need to allocate memory
> > for
> > > a
> > > > > full
> > > > > > > > > message in the broker and the producer and the consumer
> > client.
> > > > > > > > >
> > > > > > > > > Thanks,
> > > > > > > > >
> > > > > > > > > Jun
> > > > > > > > >
> > > > > > > > >
> > > > > > > > > On Wed, Aug 27, 2014 at 9:52 PM, Alexis Midon <
> > > > > > > > > alexis.midon@airbedandbreakfast.com <javascript:;>> wrote:
> > > > > > > > >
> > > > > > > > > > am I miss reading this loop:
> > > > > > > > > >
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://github.com/apache/kafka/blob/0.8.1/core/src/main/scala/kafka/log/Log.scala#L265-L269
> > > > > > > > > >
> > > > > > > > > > it seems like all messages from `validMessages` (which is
> > > > > > > > > > ByteBufferMessageSet) are NOT appended if one of the
> > message
> > > > size
> > > > > > > > exceeds
> > > > > > > > > > the limit.
> > > > > > > > > >
> > > > > > > > > > I hope I'm missing something.
> > > > > > > > > >
> > > > > > > > > >
> > > > > > > > > >
> > > > > > > > > > On Wed, Aug 27, 2014 at 9:38 PM, Alexis Midon <
> > > > > > > > > > alexis.midon@airbedandbreakfast.com <javascript:;>>
> wrote:
> > > > > > > > > >
> > > > > > > > > > > Hi Jun,
> > > > > > > > > > >
> > > > > > > > > > > thanks for you answer.
> > > > > > > > > > > Unfortunately the size won't help much, I'd like to see
> > the
> > > > > > actual
> > > > > > > > > > message
> > > > > > > > > > > data.
> > > > > > > > > > >
> > > > > > > > > > > By the way what are the things to consider when
> deciding
> > on
> > > > > > > > > > > `message.max.bytes` value?
> > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > > > On Wed, Aug 27, 2014 at 9:06 PM, Jun Rao <
> > junrao@gmail.com
> > > > > > > <javascript:;>> wrote:
> > > > > > > > > > >
> > > > > > > > > > >> The message size check is currently only done on the
> > > broker.
> > > > > If
> > > > > > > you
> > > > > > > > > > enable
> > > > > > > > > > >> trace level logging in RequestChannel, you will see
> the
> > > > > produce
> > > > > > > > > request,
> > > > > > > > > > >> which includes the size of each partition.
> > > > > > > > > > >>
> > > > > > > > > > >> Thanks,
> > > > > > > > > > >>
> > > > > > > > > > >> Jun
> > > > > > > > > > >>
> > > > > > > > > > >>
> > > > > > > > > > >> On Wed, Aug 27, 2014 at 4:40 PM, Alexis Midon <
> > > > > > > > > > >> alexis.midon@airbedandbreakfast.com <javascript:;>>
> > > wrote:
> > > > > > > > > > >>
> > > > > > > > > > >> > Hello,
> > > > > > > > > > >> >
> > > > > > > > > > >> > my brokers are reporting that some received messages
> > > > exceed
> > > > > > the
> > > > > > > > > > >> > `message.max.bytes` value.
> > > > > > > > > > >> > I'd like to know what producers are at fault but It
> is
> > > > > pretty
> > > > > > > much
> > > > > > > > > > >> > impossible:
> > > > > > > > > > >> > - the brokers don't log the content of the rejected
> > > > messages
> > > > > > > > > > >> > - the log messages do not contain the IP of the
> > > producers
> > > > > > > > > > >> > - on the consumer side, no exception is thrown
> (afaik
> > it
> > > > is
> > > > > > > > because
> > > > > > > > > > >> Ack-0
> > > > > > > > > > >> > is used). The only kind of notification is to closed
> > the
> > > > > > > > connection.
> > > > > > > > > > >> >
> > > > > > > > > > >> > [1] Do you have any suggestions to track down the
> > guilty
> > > > > > > producers
> > > > > > > > > or
> > > > > > > > > > >> find
> > > > > > > > > > >> > out the message content?
> > > > > > > > > > >> >
> > > > > > > > > > >> > Even though it makes total sense to have the limit
> > > defined
> > > > > and
> > > > > > > > > applied
> > > > > > > > > > >> on
> > > > > > > > > > >> > the brokers, I was thinking that this check could
> also
> > > be
> > > > > > > applied
> > > > > > > > by
> > > > > > > > > > the
> > > > > > > > > > >> > producers. Some google results suggest that
> > > > > > `message.max.bytes`
> > > > > > > > > might
> > > > > > > > > > be
> > > > > > > > > > >> > used by the producers but I can't find any trace of
> > that
> > > > > > > behavior
> > > > > > > > in
> > > > > > > > > > the
> > > > > > > > > > >> > code.
> > > > > > > > > > >> >
> > > > > > > > > > >> > The closest thing I have is
> > > > > > > > > > >> >
> > > > > > > > > > >> >
> > > > > > > > > > >>
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://github.com/apache/kafka/blob/0.8.1/core/src/main/scala/kafka/producer/SyncProducer.scala#L67
> > > > > > > > > > >> > but it simply logs the message size and content and
> > the
> > > > log
> > > > > > > level
> > > > > > > > is
> > > > > > > > > > >> trace.
> > > > > > > > > > >> >
> > > > > > > > > > >> > [2] could you please confirm if such a producer-side
> > > check
> > > > > > > exists?
> > > > > > > > > > >> >
> > > > > > > > > > >> >
> > > > > > > > > > >> > thanks!
> > > > > > > > > > >> >
> > > > > > > > > > >> > Alexis
> > > > > > > > > > >> >
> > > > > > > > > > >>
> > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
>

Re: message size limit

Posted by Bhavesh Mistry <mi...@gmail.com>.
HI Jun,

Thanks for clarification.  Follow up questions, does new producer solve the
issues highlight.  In event of compression and async mode in new producer,
will it break down messages to this UPPER limit and submit or new producer
strictly honor batch size.  I am just asking if compression batch size
message reaches this configured limit, does batch broken down to sub batch
that is within limit.  I would like to minimize the data loss due to this
limit.


Thanks,

Bhavesh

On Mon, Sep 8, 2014 at 9:17 PM, Jun Rao <ju...@gmail.com> wrote:

> Are you using compression in the producer? If so, message.max.bytes applies
> to the compressed size of a batch of messages. Otherwise, message.max.bytes
> applies to the size of each individual message.
>
> Thanks,
>
> Jun
>
> On Wed, Sep 3, 2014 at 3:25 PM, Bhavesh Mistry <mistry.p.bhavesh@gmail.com
> >
> wrote:
>
> > I am referring to wiki http://kafka.apache.org/08/configuration.html and
> > following parameter control max batch message bytes as far as I know.
> > Kafka Community, please correct me if I am wrong.  I do not want to
> create
> > confusion for Kafka User Community here.   Also, if you increase this
> limit
> > than you have to set the corresponding limit increase on consumer side as
> > well (fetch.message.max.bytes).
> >
> > Since we are using batch async mode, our messages are getting drop
> sometime
> > if the entire batch bytes  exceed this limit so I was asking Kafka
> > Developers if any optimal way to determine the batch size based on this
> > limit to minimize the data loss. Because, entire batch is rejected by
> > brokers.
> >
> > message.max.bytes 1000000 The maximum size of a message that the server
> can
> > receive. It is important that this property be in sync with the maximum
> > fetch size your consumers use or else an unruly producer will be able to
> > publish messages too large for consumers to consume.
> >
> > Thanks,
> >
> > Bhavesh
> >
> >
> > On Wed, Sep 3, 2014 at 2:59 PM, Alexis Midon <
> > alexis.midon@airbedandbreakfast.com> wrote:
> >
> > > Hi Bhavesh
> > >
> > > can you explain what limit you're referring to?
> > > I'm asking because `message.max.bytes` is applied per message not per
> > > batch.
> > > is there another limit I should be aware of?
> > >
> > > thanks
> > >
> > >
> > > On Wed, Sep 3, 2014 at 2:07 PM, Bhavesh Mistry <
> > mistry.p.bhavesh@gmail.com
> > > >
> > > wrote:
> > >
> > > > Hi Jun,
> > > >
> > > > We have similar problem.  We have variable length of messages.  So
> when
> > > we
> > > > have fixed size of Batch sometime the batch exceed the limit set on
> the
> > > > brokers (2MB).
> > > >
> > > > So can Producer have some extra logic to determine the optimal batch
> > size
> > > > by looking at configured message.max.bytes  value.
> > > >
> > > > During the metadata update, Producer will get this value from the
> > Broker
> > > > for each topic and Producer will check if current batch size reach
> this
> > > > limit than break batch into smaller chunk such way that It would not
> > > exceed
> > > > limit (unless single message exceed the limit). Basically try to
> avoid
> > > data
> > > > loss as much as possible.
> > > >
> > > > Please let me know what is your opinion on this...
> > > >
> > > > Thanks,
> > > >
> > > > Bhavesh
> > > >
> > > >
> > > > On Wed, Sep 3, 2014 at 6:21 AM, Alexis Midon <
> > > > alexis.midon@airbedandbreakfast.com> wrote:
> > > >
> > > > > Thanks Jun.
> > > > >
> > > > > I'll create a jira and try to provide a patch. I think this is
> pretty
> > > > > serious.
> > > > >
> > > > > On Friday, August 29, 2014, Jun Rao <ju...@gmail.com> wrote:
> > > > >
> > > > > > The goal of batching is mostly to reduce the # RPC calls to the
> > > broker.
> > > > > If
> > > > > > compression is enabled, a larger batch typically implies better
> > > > > compression
> > > > > > ratio.
> > > > > >
> > > > > > The reason that we have to fail the whole batch is that the error
> > > code
> > > > in
> > > > > > the produce response is per partition, instead of per message.
> > > > > >
> > > > > > Retrying individual messages on MessageSizeTooLarge seems
> > reasonable.
> > > > > >
> > > > > > Thanks,
> > > > > >
> > > > > > Jun
> > > > > >
> > > > > >
> > > > > > On Fri, Aug 29, 2014 at 4:28 PM, Alexis Midon <
> > > > > > alexis.midon@airbedandbreakfast.com <javascript:;>> wrote:
> > > > > >
> > > > > > > Could you explain the goals of batches? I was assuming this was
> > > > simply
> > > > > a
> > > > > > > performance optimization, but this behavior makes me think I'm
> > > > missing
> > > > > > > something.
> > > > > > > is a batch more than a list of *independent* messages?
> > > > > > >
> > > > > > > Why would you reject the whole batch? One invalid message
> causes
> > > the
> > > > > loss
> > > > > > > of batch.num.messages-1 messages :(
> > > > > > > It seems pretty critical to me.
> > > > > > >
> > > > > > > If ack=0, the producer will never know about it.
> > > > > > > If ack !=0, the producer will retry the whole batch. If the
> issue
> > > was
> > > > > > > related to data corruption (etc), retries might work. But in
> the
> > > case
> > > > > of
> > > > > > > "big message", the batch will always be rejected and the
> producer
> > > > will
> > > > > > give
> > > > > > > up.
> > > > > > >
> > > > > > > If the messages are indeed considered independent, I think this
> > is
> > > a
> > > > > > pretty
> > > > > > > serious issue.
> > > > > > >
> > > > > > > I see 2 possible fix approaches:
> > > > > > > - the broker could reject only the invalid messages
> > > > > > > - the broker could reject the whole batch (like today) but the
> > > > producer
> > > > > > (if
> > > > > > > ack!=0) could retry messages one at a time on exception like
> > > > > > > "MessageSizeTooLarge".
> > > > > > >
> > > > > > > opinions?
> > > > > > >
> > > > > > > Alexis
> > > > > > >
> > > > > > > ```
> > > > > > > [2014-08-29 16:00:35,170] WARN Produce request with correlation
> > id
> > > 46
> > > > > > > failed due to [test,1]:
> kafka.common.MessageSizeTooLargeException
> > > > > > > (kafka.producer.async.DefaultEventHandler)
> > > > > > > [2014-08-29 16:00:35,284] WARN Produce request with correlation
> > id
> > > 51
> > > > > > > failed due to [test,0]:
> kafka.common.MessageSizeTooLargeException
> > > > > > > (kafka.producer.async.DefaultEventHandler)
> > > > > > > [2014-08-29 16:00:35,392] WARN Produce request with correlation
> > id
> > > 56
> > > > > > > failed due to [test,0]:
> kafka.common.MessageSizeTooLargeException
> > > > > > > (kafka.producer.async.DefaultEventHandler)
> > > > > > > [2014-08-29 16:00:35,499] WARN Produce request with correlation
> > id
> > > 61
> > > > > > > failed due to [test,1]:
> kafka.common.MessageSizeTooLargeException
> > > > > > > (kafka.producer.async.DefaultEventHandler)
> > > > > > > [2014-08-29 16:00:35,603] ERROR Failed to send requests for
> > topics
> > > > test
> > > > > > > with correlation ids in [43,62]
> > > > > > (kafka.producer.async.DefaultEventHandler)
> > > > > > > [2014-08-29 16:00:35,603] ERROR Error in handling batch of 3
> > events
> > > > > > > (kafka.producer.async.ProducerSendThread)
> > > > > > > kafka.common.FailedToSendMessageException: Failed to send
> > messages
> > > > > after
> > > > > > 3
> > > > > > > tries.
> > > > > > > at
> > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:90)
> > > > > > >  at
> > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.scala:104)
> > > > > > > at
> > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:87)
> > > > > > >  at
> > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:67)
> > > > > > > ```
> > > > > > >
> > > > > > >
> > > > > > > On Thu, Aug 28, 2014 at 7:13 AM, Jun Rao <junrao@gmail.com
> > > > > > <javascript:;>> wrote:
> > > > > > >
> > > > > > > > That's right. If one message in a batch exceeds the size
> limit,
> > > the
> > > > > > whole
> > > > > > > > batch is rejected.
> > > > > > > >
> > > > > > > > When determining message.max.bytes, the most important thing
> to
> > > > > > consider
> > > > > > > is
> > > > > > > > probably memory since currently we need to allocate memory
> for
> > a
> > > > full
> > > > > > > > message in the broker and the producer and the consumer
> client.
> > > > > > > >
> > > > > > > > Thanks,
> > > > > > > >
> > > > > > > > Jun
> > > > > > > >
> > > > > > > >
> > > > > > > > On Wed, Aug 27, 2014 at 9:52 PM, Alexis Midon <
> > > > > > > > alexis.midon@airbedandbreakfast.com <javascript:;>> wrote:
> > > > > > > >
> > > > > > > > > am I miss reading this loop:
> > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://github.com/apache/kafka/blob/0.8.1/core/src/main/scala/kafka/log/Log.scala#L265-L269
> > > > > > > > >
> > > > > > > > > it seems like all messages from `validMessages` (which is
> > > > > > > > > ByteBufferMessageSet) are NOT appended if one of the
> message
> > > size
> > > > > > > exceeds
> > > > > > > > > the limit.
> > > > > > > > >
> > > > > > > > > I hope I'm missing something.
> > > > > > > > >
> > > > > > > > >
> > > > > > > > >
> > > > > > > > > On Wed, Aug 27, 2014 at 9:38 PM, Alexis Midon <
> > > > > > > > > alexis.midon@airbedandbreakfast.com <javascript:;>> wrote:
> > > > > > > > >
> > > > > > > > > > Hi Jun,
> > > > > > > > > >
> > > > > > > > > > thanks for you answer.
> > > > > > > > > > Unfortunately the size won't help much, I'd like to see
> the
> > > > > actual
> > > > > > > > > message
> > > > > > > > > > data.
> > > > > > > > > >
> > > > > > > > > > By the way what are the things to consider when deciding
> on
> > > > > > > > > > `message.max.bytes` value?
> > > > > > > > > >
> > > > > > > > > >
> > > > > > > > > >
> > > > > > > > > >
> > > > > > > > > >
> > > > > > > > > >
> > > > > > > > > > On Wed, Aug 27, 2014 at 9:06 PM, Jun Rao <
> junrao@gmail.com
> > > > > > <javascript:;>> wrote:
> > > > > > > > > >
> > > > > > > > > >> The message size check is currently only done on the
> > broker.
> > > > If
> > > > > > you
> > > > > > > > > enable
> > > > > > > > > >> trace level logging in RequestChannel, you will see the
> > > > produce
> > > > > > > > request,
> > > > > > > > > >> which includes the size of each partition.
> > > > > > > > > >>
> > > > > > > > > >> Thanks,
> > > > > > > > > >>
> > > > > > > > > >> Jun
> > > > > > > > > >>
> > > > > > > > > >>
> > > > > > > > > >> On Wed, Aug 27, 2014 at 4:40 PM, Alexis Midon <
> > > > > > > > > >> alexis.midon@airbedandbreakfast.com <javascript:;>>
> > wrote:
> > > > > > > > > >>
> > > > > > > > > >> > Hello,
> > > > > > > > > >> >
> > > > > > > > > >> > my brokers are reporting that some received messages
> > > exceed
> > > > > the
> > > > > > > > > >> > `message.max.bytes` value.
> > > > > > > > > >> > I'd like to know what producers are at fault but It is
> > > > pretty
> > > > > > much
> > > > > > > > > >> > impossible:
> > > > > > > > > >> > - the brokers don't log the content of the rejected
> > > messages
> > > > > > > > > >> > - the log messages do not contain the IP of the
> > producers
> > > > > > > > > >> > - on the consumer side, no exception is thrown (afaik
> it
> > > is
> > > > > > > because
> > > > > > > > > >> Ack-0
> > > > > > > > > >> > is used). The only kind of notification is to closed
> the
> > > > > > > connection.
> > > > > > > > > >> >
> > > > > > > > > >> > [1] Do you have any suggestions to track down the
> guilty
> > > > > > producers
> > > > > > > > or
> > > > > > > > > >> find
> > > > > > > > > >> > out the message content?
> > > > > > > > > >> >
> > > > > > > > > >> > Even though it makes total sense to have the limit
> > defined
> > > > and
> > > > > > > > applied
> > > > > > > > > >> on
> > > > > > > > > >> > the brokers, I was thinking that this check could also
> > be
> > > > > > applied
> > > > > > > by
> > > > > > > > > the
> > > > > > > > > >> > producers. Some google results suggest that
> > > > > `message.max.bytes`
> > > > > > > > might
> > > > > > > > > be
> > > > > > > > > >> > used by the producers but I can't find any trace of
> that
> > > > > > behavior
> > > > > > > in
> > > > > > > > > the
> > > > > > > > > >> > code.
> > > > > > > > > >> >
> > > > > > > > > >> > The closest thing I have is
> > > > > > > > > >> >
> > > > > > > > > >> >
> > > > > > > > > >>
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://github.com/apache/kafka/blob/0.8.1/core/src/main/scala/kafka/producer/SyncProducer.scala#L67
> > > > > > > > > >> > but it simply logs the message size and content and
> the
> > > log
> > > > > > level
> > > > > > > is
> > > > > > > > > >> trace.
> > > > > > > > > >> >
> > > > > > > > > >> > [2] could you please confirm if such a producer-side
> > check
> > > > > > exists?
> > > > > > > > > >> >
> > > > > > > > > >> >
> > > > > > > > > >> > thanks!
> > > > > > > > > >> >
> > > > > > > > > >> > Alexis
> > > > > > > > > >> >
> > > > > > > > > >>
> > > > > > > > > >
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
>

Re: message size limit

Posted by Jun Rao <ju...@gmail.com>.
Are you using compression in the producer? If so, message.max.bytes applies
to the compressed size of a batch of messages. Otherwise, message.max.bytes
applies to the size of each individual message.

Thanks,

Jun

On Wed, Sep 3, 2014 at 3:25 PM, Bhavesh Mistry <mi...@gmail.com>
wrote:

> I am referring to wiki http://kafka.apache.org/08/configuration.html and
> following parameter control max batch message bytes as far as I know.
> Kafka Community, please correct me if I am wrong.  I do not want to create
> confusion for Kafka User Community here.   Also, if you increase this limit
> than you have to set the corresponding limit increase on consumer side as
> well (fetch.message.max.bytes).
>
> Since we are using batch async mode, our messages are getting drop sometime
> if the entire batch bytes  exceed this limit so I was asking Kafka
> Developers if any optimal way to determine the batch size based on this
> limit to minimize the data loss. Because, entire batch is rejected by
> brokers.
>
> message.max.bytes 1000000 The maximum size of a message that the server can
> receive. It is important that this property be in sync with the maximum
> fetch size your consumers use or else an unruly producer will be able to
> publish messages too large for consumers to consume.
>
> Thanks,
>
> Bhavesh
>
>
> On Wed, Sep 3, 2014 at 2:59 PM, Alexis Midon <
> alexis.midon@airbedandbreakfast.com> wrote:
>
> > Hi Bhavesh
> >
> > can you explain what limit you're referring to?
> > I'm asking because `message.max.bytes` is applied per message not per
> > batch.
> > is there another limit I should be aware of?
> >
> > thanks
> >
> >
> > On Wed, Sep 3, 2014 at 2:07 PM, Bhavesh Mistry <
> mistry.p.bhavesh@gmail.com
> > >
> > wrote:
> >
> > > Hi Jun,
> > >
> > > We have similar problem.  We have variable length of messages.  So when
> > we
> > > have fixed size of Batch sometime the batch exceed the limit set on the
> > > brokers (2MB).
> > >
> > > So can Producer have some extra logic to determine the optimal batch
> size
> > > by looking at configured message.max.bytes  value.
> > >
> > > During the metadata update, Producer will get this value from the
> Broker
> > > for each topic and Producer will check if current batch size reach this
> > > limit than break batch into smaller chunk such way that It would not
> > exceed
> > > limit (unless single message exceed the limit). Basically try to avoid
> > data
> > > loss as much as possible.
> > >
> > > Please let me know what is your opinion on this...
> > >
> > > Thanks,
> > >
> > > Bhavesh
> > >
> > >
> > > On Wed, Sep 3, 2014 at 6:21 AM, Alexis Midon <
> > > alexis.midon@airbedandbreakfast.com> wrote:
> > >
> > > > Thanks Jun.
> > > >
> > > > I'll create a jira and try to provide a patch. I think this is pretty
> > > > serious.
> > > >
> > > > On Friday, August 29, 2014, Jun Rao <ju...@gmail.com> wrote:
> > > >
> > > > > The goal of batching is mostly to reduce the # RPC calls to the
> > broker.
> > > > If
> > > > > compression is enabled, a larger batch typically implies better
> > > > compression
> > > > > ratio.
> > > > >
> > > > > The reason that we have to fail the whole batch is that the error
> > code
> > > in
> > > > > the produce response is per partition, instead of per message.
> > > > >
> > > > > Retrying individual messages on MessageSizeTooLarge seems
> reasonable.
> > > > >
> > > > > Thanks,
> > > > >
> > > > > Jun
> > > > >
> > > > >
> > > > > On Fri, Aug 29, 2014 at 4:28 PM, Alexis Midon <
> > > > > alexis.midon@airbedandbreakfast.com <javascript:;>> wrote:
> > > > >
> > > > > > Could you explain the goals of batches? I was assuming this was
> > > simply
> > > > a
> > > > > > performance optimization, but this behavior makes me think I'm
> > > missing
> > > > > > something.
> > > > > > is a batch more than a list of *independent* messages?
> > > > > >
> > > > > > Why would you reject the whole batch? One invalid message causes
> > the
> > > > loss
> > > > > > of batch.num.messages-1 messages :(
> > > > > > It seems pretty critical to me.
> > > > > >
> > > > > > If ack=0, the producer will never know about it.
> > > > > > If ack !=0, the producer will retry the whole batch. If the issue
> > was
> > > > > > related to data corruption (etc), retries might work. But in the
> > case
> > > > of
> > > > > > "big message", the batch will always be rejected and the producer
> > > will
> > > > > give
> > > > > > up.
> > > > > >
> > > > > > If the messages are indeed considered independent, I think this
> is
> > a
> > > > > pretty
> > > > > > serious issue.
> > > > > >
> > > > > > I see 2 possible fix approaches:
> > > > > > - the broker could reject only the invalid messages
> > > > > > - the broker could reject the whole batch (like today) but the
> > > producer
> > > > > (if
> > > > > > ack!=0) could retry messages one at a time on exception like
> > > > > > "MessageSizeTooLarge".
> > > > > >
> > > > > > opinions?
> > > > > >
> > > > > > Alexis
> > > > > >
> > > > > > ```
> > > > > > [2014-08-29 16:00:35,170] WARN Produce request with correlation
> id
> > 46
> > > > > > failed due to [test,1]: kafka.common.MessageSizeTooLargeException
> > > > > > (kafka.producer.async.DefaultEventHandler)
> > > > > > [2014-08-29 16:00:35,284] WARN Produce request with correlation
> id
> > 51
> > > > > > failed due to [test,0]: kafka.common.MessageSizeTooLargeException
> > > > > > (kafka.producer.async.DefaultEventHandler)
> > > > > > [2014-08-29 16:00:35,392] WARN Produce request with correlation
> id
> > 56
> > > > > > failed due to [test,0]: kafka.common.MessageSizeTooLargeException
> > > > > > (kafka.producer.async.DefaultEventHandler)
> > > > > > [2014-08-29 16:00:35,499] WARN Produce request with correlation
> id
> > 61
> > > > > > failed due to [test,1]: kafka.common.MessageSizeTooLargeException
> > > > > > (kafka.producer.async.DefaultEventHandler)
> > > > > > [2014-08-29 16:00:35,603] ERROR Failed to send requests for
> topics
> > > test
> > > > > > with correlation ids in [43,62]
> > > > > (kafka.producer.async.DefaultEventHandler)
> > > > > > [2014-08-29 16:00:35,603] ERROR Error in handling batch of 3
> events
> > > > > > (kafka.producer.async.ProducerSendThread)
> > > > > > kafka.common.FailedToSendMessageException: Failed to send
> messages
> > > > after
> > > > > 3
> > > > > > tries.
> > > > > > at
> > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:90)
> > > > > >  at
> > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.scala:104)
> > > > > > at
> > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:87)
> > > > > >  at
> > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:67)
> > > > > > ```
> > > > > >
> > > > > >
> > > > > > On Thu, Aug 28, 2014 at 7:13 AM, Jun Rao <junrao@gmail.com
> > > > > <javascript:;>> wrote:
> > > > > >
> > > > > > > That's right. If one message in a batch exceeds the size limit,
> > the
> > > > > whole
> > > > > > > batch is rejected.
> > > > > > >
> > > > > > > When determining message.max.bytes, the most important thing to
> > > > > consider
> > > > > > is
> > > > > > > probably memory since currently we need to allocate memory for
> a
> > > full
> > > > > > > message in the broker and the producer and the consumer client.
> > > > > > >
> > > > > > > Thanks,
> > > > > > >
> > > > > > > Jun
> > > > > > >
> > > > > > >
> > > > > > > On Wed, Aug 27, 2014 at 9:52 PM, Alexis Midon <
> > > > > > > alexis.midon@airbedandbreakfast.com <javascript:;>> wrote:
> > > > > > >
> > > > > > > > am I miss reading this loop:
> > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://github.com/apache/kafka/blob/0.8.1/core/src/main/scala/kafka/log/Log.scala#L265-L269
> > > > > > > >
> > > > > > > > it seems like all messages from `validMessages` (which is
> > > > > > > > ByteBufferMessageSet) are NOT appended if one of the message
> > size
> > > > > > exceeds
> > > > > > > > the limit.
> > > > > > > >
> > > > > > > > I hope I'm missing something.
> > > > > > > >
> > > > > > > >
> > > > > > > >
> > > > > > > > On Wed, Aug 27, 2014 at 9:38 PM, Alexis Midon <
> > > > > > > > alexis.midon@airbedandbreakfast.com <javascript:;>> wrote:
> > > > > > > >
> > > > > > > > > Hi Jun,
> > > > > > > > >
> > > > > > > > > thanks for you answer.
> > > > > > > > > Unfortunately the size won't help much, I'd like to see the
> > > > actual
> > > > > > > > message
> > > > > > > > > data.
> > > > > > > > >
> > > > > > > > > By the way what are the things to consider when deciding on
> > > > > > > > > `message.max.bytes` value?
> > > > > > > > >
> > > > > > > > >
> > > > > > > > >
> > > > > > > > >
> > > > > > > > >
> > > > > > > > >
> > > > > > > > > On Wed, Aug 27, 2014 at 9:06 PM, Jun Rao <junrao@gmail.com
> > > > > <javascript:;>> wrote:
> > > > > > > > >
> > > > > > > > >> The message size check is currently only done on the
> broker.
> > > If
> > > > > you
> > > > > > > > enable
> > > > > > > > >> trace level logging in RequestChannel, you will see the
> > > produce
> > > > > > > request,
> > > > > > > > >> which includes the size of each partition.
> > > > > > > > >>
> > > > > > > > >> Thanks,
> > > > > > > > >>
> > > > > > > > >> Jun
> > > > > > > > >>
> > > > > > > > >>
> > > > > > > > >> On Wed, Aug 27, 2014 at 4:40 PM, Alexis Midon <
> > > > > > > > >> alexis.midon@airbedandbreakfast.com <javascript:;>>
> wrote:
> > > > > > > > >>
> > > > > > > > >> > Hello,
> > > > > > > > >> >
> > > > > > > > >> > my brokers are reporting that some received messages
> > exceed
> > > > the
> > > > > > > > >> > `message.max.bytes` value.
> > > > > > > > >> > I'd like to know what producers are at fault but It is
> > > pretty
> > > > > much
> > > > > > > > >> > impossible:
> > > > > > > > >> > - the brokers don't log the content of the rejected
> > messages
> > > > > > > > >> > - the log messages do not contain the IP of the
> producers
> > > > > > > > >> > - on the consumer side, no exception is thrown (afaik it
> > is
> > > > > > because
> > > > > > > > >> Ack-0
> > > > > > > > >> > is used). The only kind of notification is to closed the
> > > > > > connection.
> > > > > > > > >> >
> > > > > > > > >> > [1] Do you have any suggestions to track down the guilty
> > > > > producers
> > > > > > > or
> > > > > > > > >> find
> > > > > > > > >> > out the message content?
> > > > > > > > >> >
> > > > > > > > >> > Even though it makes total sense to have the limit
> defined
> > > and
> > > > > > > applied
> > > > > > > > >> on
> > > > > > > > >> > the brokers, I was thinking that this check could also
> be
> > > > > applied
> > > > > > by
> > > > > > > > the
> > > > > > > > >> > producers. Some google results suggest that
> > > > `message.max.bytes`
> > > > > > > might
> > > > > > > > be
> > > > > > > > >> > used by the producers but I can't find any trace of that
> > > > > behavior
> > > > > > in
> > > > > > > > the
> > > > > > > > >> > code.
> > > > > > > > >> >
> > > > > > > > >> > The closest thing I have is
> > > > > > > > >> >
> > > > > > > > >> >
> > > > > > > > >>
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://github.com/apache/kafka/blob/0.8.1/core/src/main/scala/kafka/producer/SyncProducer.scala#L67
> > > > > > > > >> > but it simply logs the message size and content and the
> > log
> > > > > level
> > > > > > is
> > > > > > > > >> trace.
> > > > > > > > >> >
> > > > > > > > >> > [2] could you please confirm if such a producer-side
> check
> > > > > exists?
> > > > > > > > >> >
> > > > > > > > >> >
> > > > > > > > >> > thanks!
> > > > > > > > >> >
> > > > > > > > >> > Alexis
> > > > > > > > >> >
> > > > > > > > >>
> > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
>

Re: message size limit

Posted by Alexis Midon <al...@airbedandbreakfast.com>.
are you referring to socket.request.max.bytes ?
it looks like it could indeed limit the size of a batch accepted by a
broker.
So, you're right, batch.num.messages * message.max.bytes must be smaller
than socket.request.max.bytes.

It looks like this case has been addressed in the new producer.
See max.request.size and batch.size at
https://kafka.apache.org/documentation.html#newproducerconfigs

Alexis


On Wed, Sep 3, 2014 at 3:25 PM, Bhavesh Mistry <mi...@gmail.com>
wrote:

> I am referring to wiki http://kafka.apache.org/08/configuration.html and
> following parameter control max batch message bytes as far as I know.
> Kafka Community, please correct me if I am wrong.  I do not want to create
> confusion for Kafka User Community here.   Also, if you increase this limit
> than you have to set the corresponding limit increase on consumer side as
> well (fetch.message.max.bytes).
>
> Since we are using batch async mode, our messages are getting drop sometime
> if the entire batch bytes  exceed this limit so I was asking Kafka
> Developers if any optimal way to determine the batch size based on this
> limit to minimize the data loss. Because, entire batch is rejected by
> brokers.
>
> message.max.bytes 1000000 The maximum size of a message that the server can
> receive. It is important that this property be in sync with the maximum
> fetch size your consumers use or else an unruly producer will be able to
> publish messages too large for consumers to consume.
>
> Thanks,
>
> Bhavesh
>
>
> On Wed, Sep 3, 2014 at 2:59 PM, Alexis Midon <
> alexis.midon@airbedandbreakfast.com> wrote:
>
> > Hi Bhavesh
> >
> > can you explain what limit you're referring to?
> > I'm asking because `message.max.bytes` is applied per message not per
> > batch.
> > is there another limit I should be aware of?
> >
> > thanks
> >
> >
> > On Wed, Sep 3, 2014 at 2:07 PM, Bhavesh Mistry <
> mistry.p.bhavesh@gmail.com
> > >
> > wrote:
> >
> > > Hi Jun,
> > >
> > > We have similar problem.  We have variable length of messages.  So when
> > we
> > > have fixed size of Batch sometime the batch exceed the limit set on the
> > > brokers (2MB).
> > >
> > > So can Producer have some extra logic to determine the optimal batch
> size
> > > by looking at configured message.max.bytes  value.
> > >
> > > During the metadata update, Producer will get this value from the
> Broker
> > > for each topic and Producer will check if current batch size reach this
> > > limit than break batch into smaller chunk such way that It would not
> > exceed
> > > limit (unless single message exceed the limit). Basically try to avoid
> > data
> > > loss as much as possible.
> > >
> > > Please let me know what is your opinion on this...
> > >
> > > Thanks,
> > >
> > > Bhavesh
> > >
> > >
> > > On Wed, Sep 3, 2014 at 6:21 AM, Alexis Midon <
> > > alexis.midon@airbedandbreakfast.com> wrote:
> > >
> > > > Thanks Jun.
> > > >
> > > > I'll create a jira and try to provide a patch. I think this is pretty
> > > > serious.
> > > >
> > > > On Friday, August 29, 2014, Jun Rao <ju...@gmail.com> wrote:
> > > >
> > > > > The goal of batching is mostly to reduce the # RPC calls to the
> > broker.
> > > > If
> > > > > compression is enabled, a larger batch typically implies better
> > > > compression
> > > > > ratio.
> > > > >
> > > > > The reason that we have to fail the whole batch is that the error
> > code
> > > in
> > > > > the produce response is per partition, instead of per message.
> > > > >
> > > > > Retrying individual messages on MessageSizeTooLarge seems
> reasonable.
> > > > >
> > > > > Thanks,
> > > > >
> > > > > Jun
> > > > >
> > > > >
> > > > > On Fri, Aug 29, 2014 at 4:28 PM, Alexis Midon <
> > > > > alexis.midon@airbedandbreakfast.com <javascript:;>> wrote:
> > > > >
> > > > > > Could you explain the goals of batches? I was assuming this was
> > > simply
> > > > a
> > > > > > performance optimization, but this behavior makes me think I'm
> > > missing
> > > > > > something.
> > > > > > is a batch more than a list of *independent* messages?
> > > > > >
> > > > > > Why would you reject the whole batch? One invalid message causes
> > the
> > > > loss
> > > > > > of batch.num.messages-1 messages :(
> > > > > > It seems pretty critical to me.
> > > > > >
> > > > > > If ack=0, the producer will never know about it.
> > > > > > If ack !=0, the producer will retry the whole batch. If the issue
> > was
> > > > > > related to data corruption (etc), retries might work. But in the
> > case
> > > > of
> > > > > > "big message", the batch will always be rejected and the producer
> > > will
> > > > > give
> > > > > > up.
> > > > > >
> > > > > > If the messages are indeed considered independent, I think this
> is
> > a
> > > > > pretty
> > > > > > serious issue.
> > > > > >
> > > > > > I see 2 possible fix approaches:
> > > > > > - the broker could reject only the invalid messages
> > > > > > - the broker could reject the whole batch (like today) but the
> > > producer
> > > > > (if
> > > > > > ack!=0) could retry messages one at a time on exception like
> > > > > > "MessageSizeTooLarge".
> > > > > >
> > > > > > opinions?
> > > > > >
> > > > > > Alexis
> > > > > >
> > > > > > ```
> > > > > > [2014-08-29 16:00:35,170] WARN Produce request with correlation
> id
> > 46
> > > > > > failed due to [test,1]: kafka.common.MessageSizeTooLargeException
> > > > > > (kafka.producer.async.DefaultEventHandler)
> > > > > > [2014-08-29 16:00:35,284] WARN Produce request with correlation
> id
> > 51
> > > > > > failed due to [test,0]: kafka.common.MessageSizeTooLargeException
> > > > > > (kafka.producer.async.DefaultEventHandler)
> > > > > > [2014-08-29 16:00:35,392] WARN Produce request with correlation
> id
> > 56
> > > > > > failed due to [test,0]: kafka.common.MessageSizeTooLargeException
> > > > > > (kafka.producer.async.DefaultEventHandler)
> > > > > > [2014-08-29 16:00:35,499] WARN Produce request with correlation
> id
> > 61
> > > > > > failed due to [test,1]: kafka.common.MessageSizeTooLargeException
> > > > > > (kafka.producer.async.DefaultEventHandler)
> > > > > > [2014-08-29 16:00:35,603] ERROR Failed to send requests for
> topics
> > > test
> > > > > > with correlation ids in [43,62]
> > > > > (kafka.producer.async.DefaultEventHandler)
> > > > > > [2014-08-29 16:00:35,603] ERROR Error in handling batch of 3
> events
> > > > > > (kafka.producer.async.ProducerSendThread)
> > > > > > kafka.common.FailedToSendMessageException: Failed to send
> messages
> > > > after
> > > > > 3
> > > > > > tries.
> > > > > > at
> > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:90)
> > > > > >  at
> > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.scala:104)
> > > > > > at
> > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:87)
> > > > > >  at
> > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:67)
> > > > > > ```
> > > > > >
> > > > > >
> > > > > > On Thu, Aug 28, 2014 at 7:13 AM, Jun Rao <junrao@gmail.com
> > > > > <javascript:;>> wrote:
> > > > > >
> > > > > > > That's right. If one message in a batch exceeds the size limit,
> > the
> > > > > whole
> > > > > > > batch is rejected.
> > > > > > >
> > > > > > > When determining message.max.bytes, the most important thing to
> > > > > consider
> > > > > > is
> > > > > > > probably memory since currently we need to allocate memory for
> a
> > > full
> > > > > > > message in the broker and the producer and the consumer client.
> > > > > > >
> > > > > > > Thanks,
> > > > > > >
> > > > > > > Jun
> > > > > > >
> > > > > > >
> > > > > > > On Wed, Aug 27, 2014 at 9:52 PM, Alexis Midon <
> > > > > > > alexis.midon@airbedandbreakfast.com <javascript:;>> wrote:
> > > > > > >
> > > > > > > > am I miss reading this loop:
> > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://github.com/apache/kafka/blob/0.8.1/core/src/main/scala/kafka/log/Log.scala#L265-L269
> > > > > > > >
> > > > > > > > it seems like all messages from `validMessages` (which is
> > > > > > > > ByteBufferMessageSet) are NOT appended if one of the message
> > size
> > > > > > exceeds
> > > > > > > > the limit.
> > > > > > > >
> > > > > > > > I hope I'm missing something.
> > > > > > > >
> > > > > > > >
> > > > > > > >
> > > > > > > > On Wed, Aug 27, 2014 at 9:38 PM, Alexis Midon <
> > > > > > > > alexis.midon@airbedandbreakfast.com <javascript:;>> wrote:
> > > > > > > >
> > > > > > > > > Hi Jun,
> > > > > > > > >
> > > > > > > > > thanks for you answer.
> > > > > > > > > Unfortunately the size won't help much, I'd like to see the
> > > > actual
> > > > > > > > message
> > > > > > > > > data.
> > > > > > > > >
> > > > > > > > > By the way what are the things to consider when deciding on
> > > > > > > > > `message.max.bytes` value?
> > > > > > > > >
> > > > > > > > >
> > > > > > > > >
> > > > > > > > >
> > > > > > > > >
> > > > > > > > >
> > > > > > > > > On Wed, Aug 27, 2014 at 9:06 PM, Jun Rao <junrao@gmail.com
> > > > > <javascript:;>> wrote:
> > > > > > > > >
> > > > > > > > >> The message size check is currently only done on the
> broker.
> > > If
> > > > > you
> > > > > > > > enable
> > > > > > > > >> trace level logging in RequestChannel, you will see the
> > > produce
> > > > > > > request,
> > > > > > > > >> which includes the size of each partition.
> > > > > > > > >>
> > > > > > > > >> Thanks,
> > > > > > > > >>
> > > > > > > > >> Jun
> > > > > > > > >>
> > > > > > > > >>
> > > > > > > > >> On Wed, Aug 27, 2014 at 4:40 PM, Alexis Midon <
> > > > > > > > >> alexis.midon@airbedandbreakfast.com <javascript:;>>
> wrote:
> > > > > > > > >>
> > > > > > > > >> > Hello,
> > > > > > > > >> >
> > > > > > > > >> > my brokers are reporting that some received messages
> > exceed
> > > > the
> > > > > > > > >> > `message.max.bytes` value.
> > > > > > > > >> > I'd like to know what producers are at fault but It is
> > > pretty
> > > > > much
> > > > > > > > >> > impossible:
> > > > > > > > >> > - the brokers don't log the content of the rejected
> > messages
> > > > > > > > >> > - the log messages do not contain the IP of the
> producers
> > > > > > > > >> > - on the consumer side, no exception is thrown (afaik it
> > is
> > > > > > because
> > > > > > > > >> Ack-0
> > > > > > > > >> > is used). The only kind of notification is to closed the
> > > > > > connection.
> > > > > > > > >> >
> > > > > > > > >> > [1] Do you have any suggestions to track down the guilty
> > > > > producers
> > > > > > > or
> > > > > > > > >> find
> > > > > > > > >> > out the message content?
> > > > > > > > >> >
> > > > > > > > >> > Even though it makes total sense to have the limit
> defined
> > > and
> > > > > > > applied
> > > > > > > > >> on
> > > > > > > > >> > the brokers, I was thinking that this check could also
> be
> > > > > applied
> > > > > > by
> > > > > > > > the
> > > > > > > > >> > producers. Some google results suggest that
> > > > `message.max.bytes`
> > > > > > > might
> > > > > > > > be
> > > > > > > > >> > used by the producers but I can't find any trace of that
> > > > > behavior
> > > > > > in
> > > > > > > > the
> > > > > > > > >> > code.
> > > > > > > > >> >
> > > > > > > > >> > The closest thing I have is
> > > > > > > > >> >
> > > > > > > > >> >
> > > > > > > > >>
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://github.com/apache/kafka/blob/0.8.1/core/src/main/scala/kafka/producer/SyncProducer.scala#L67
> > > > > > > > >> > but it simply logs the message size and content and the
> > log
> > > > > level
> > > > > > is
> > > > > > > > >> trace.
> > > > > > > > >> >
> > > > > > > > >> > [2] could you please confirm if such a producer-side
> check
> > > > > exists?
> > > > > > > > >> >
> > > > > > > > >> >
> > > > > > > > >> > thanks!
> > > > > > > > >> >
> > > > > > > > >> > Alexis
> > > > > > > > >> >
> > > > > > > > >>
> > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
>

Re: message size limit

Posted by Bhavesh Mistry <mi...@gmail.com>.
I am referring to wiki http://kafka.apache.org/08/configuration.html and
following parameter control max batch message bytes as far as I know.
Kafka Community, please correct me if I am wrong.  I do not want to create
confusion for Kafka User Community here.   Also, if you increase this limit
than you have to set the corresponding limit increase on consumer side as
well (fetch.message.max.bytes).

Since we are using batch async mode, our messages are getting drop sometime
if the entire batch bytes  exceed this limit so I was asking Kafka
Developers if any optimal way to determine the batch size based on this
limit to minimize the data loss. Because, entire batch is rejected by
brokers.

message.max.bytes 1000000 The maximum size of a message that the server can
receive. It is important that this property be in sync with the maximum
fetch size your consumers use or else an unruly producer will be able to
publish messages too large for consumers to consume.

Thanks,

Bhavesh


On Wed, Sep 3, 2014 at 2:59 PM, Alexis Midon <
alexis.midon@airbedandbreakfast.com> wrote:

> Hi Bhavesh
>
> can you explain what limit you're referring to?
> I'm asking because `message.max.bytes` is applied per message not per
> batch.
> is there another limit I should be aware of?
>
> thanks
>
>
> On Wed, Sep 3, 2014 at 2:07 PM, Bhavesh Mistry <mistry.p.bhavesh@gmail.com
> >
> wrote:
>
> > Hi Jun,
> >
> > We have similar problem.  We have variable length of messages.  So when
> we
> > have fixed size of Batch sometime the batch exceed the limit set on the
> > brokers (2MB).
> >
> > So can Producer have some extra logic to determine the optimal batch size
> > by looking at configured message.max.bytes  value.
> >
> > During the metadata update, Producer will get this value from the Broker
> > for each topic and Producer will check if current batch size reach this
> > limit than break batch into smaller chunk such way that It would not
> exceed
> > limit (unless single message exceed the limit). Basically try to avoid
> data
> > loss as much as possible.
> >
> > Please let me know what is your opinion on this...
> >
> > Thanks,
> >
> > Bhavesh
> >
> >
> > On Wed, Sep 3, 2014 at 6:21 AM, Alexis Midon <
> > alexis.midon@airbedandbreakfast.com> wrote:
> >
> > > Thanks Jun.
> > >
> > > I'll create a jira and try to provide a patch. I think this is pretty
> > > serious.
> > >
> > > On Friday, August 29, 2014, Jun Rao <ju...@gmail.com> wrote:
> > >
> > > > The goal of batching is mostly to reduce the # RPC calls to the
> broker.
> > > If
> > > > compression is enabled, a larger batch typically implies better
> > > compression
> > > > ratio.
> > > >
> > > > The reason that we have to fail the whole batch is that the error
> code
> > in
> > > > the produce response is per partition, instead of per message.
> > > >
> > > > Retrying individual messages on MessageSizeTooLarge seems reasonable.
> > > >
> > > > Thanks,
> > > >
> > > > Jun
> > > >
> > > >
> > > > On Fri, Aug 29, 2014 at 4:28 PM, Alexis Midon <
> > > > alexis.midon@airbedandbreakfast.com <javascript:;>> wrote:
> > > >
> > > > > Could you explain the goals of batches? I was assuming this was
> > simply
> > > a
> > > > > performance optimization, but this behavior makes me think I'm
> > missing
> > > > > something.
> > > > > is a batch more than a list of *independent* messages?
> > > > >
> > > > > Why would you reject the whole batch? One invalid message causes
> the
> > > loss
> > > > > of batch.num.messages-1 messages :(
> > > > > It seems pretty critical to me.
> > > > >
> > > > > If ack=0, the producer will never know about it.
> > > > > If ack !=0, the producer will retry the whole batch. If the issue
> was
> > > > > related to data corruption (etc), retries might work. But in the
> case
> > > of
> > > > > "big message", the batch will always be rejected and the producer
> > will
> > > > give
> > > > > up.
> > > > >
> > > > > If the messages are indeed considered independent, I think this is
> a
> > > > pretty
> > > > > serious issue.
> > > > >
> > > > > I see 2 possible fix approaches:
> > > > > - the broker could reject only the invalid messages
> > > > > - the broker could reject the whole batch (like today) but the
> > producer
> > > > (if
> > > > > ack!=0) could retry messages one at a time on exception like
> > > > > "MessageSizeTooLarge".
> > > > >
> > > > > opinions?
> > > > >
> > > > > Alexis
> > > > >
> > > > > ```
> > > > > [2014-08-29 16:00:35,170] WARN Produce request with correlation id
> 46
> > > > > failed due to [test,1]: kafka.common.MessageSizeTooLargeException
> > > > > (kafka.producer.async.DefaultEventHandler)
> > > > > [2014-08-29 16:00:35,284] WARN Produce request with correlation id
> 51
> > > > > failed due to [test,0]: kafka.common.MessageSizeTooLargeException
> > > > > (kafka.producer.async.DefaultEventHandler)
> > > > > [2014-08-29 16:00:35,392] WARN Produce request with correlation id
> 56
> > > > > failed due to [test,0]: kafka.common.MessageSizeTooLargeException
> > > > > (kafka.producer.async.DefaultEventHandler)
> > > > > [2014-08-29 16:00:35,499] WARN Produce request with correlation id
> 61
> > > > > failed due to [test,1]: kafka.common.MessageSizeTooLargeException
> > > > > (kafka.producer.async.DefaultEventHandler)
> > > > > [2014-08-29 16:00:35,603] ERROR Failed to send requests for topics
> > test
> > > > > with correlation ids in [43,62]
> > > > (kafka.producer.async.DefaultEventHandler)
> > > > > [2014-08-29 16:00:35,603] ERROR Error in handling batch of 3 events
> > > > > (kafka.producer.async.ProducerSendThread)
> > > > > kafka.common.FailedToSendMessageException: Failed to send messages
> > > after
> > > > 3
> > > > > tries.
> > > > > at
> > > > >
> > > > >
> > > >
> > >
> >
> kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:90)
> > > > >  at
> > > > >
> > > > >
> > > >
> > >
> >
> kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.scala:104)
> > > > > at
> > > > >
> > > > >
> > > >
> > >
> >
> kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:87)
> > > > >  at
> > > > >
> > > > >
> > > >
> > >
> >
> kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:67)
> > > > > ```
> > > > >
> > > > >
> > > > > On Thu, Aug 28, 2014 at 7:13 AM, Jun Rao <junrao@gmail.com
> > > > <javascript:;>> wrote:
> > > > >
> > > > > > That's right. If one message in a batch exceeds the size limit,
> the
> > > > whole
> > > > > > batch is rejected.
> > > > > >
> > > > > > When determining message.max.bytes, the most important thing to
> > > > consider
> > > > > is
> > > > > > probably memory since currently we need to allocate memory for a
> > full
> > > > > > message in the broker and the producer and the consumer client.
> > > > > >
> > > > > > Thanks,
> > > > > >
> > > > > > Jun
> > > > > >
> > > > > >
> > > > > > On Wed, Aug 27, 2014 at 9:52 PM, Alexis Midon <
> > > > > > alexis.midon@airbedandbreakfast.com <javascript:;>> wrote:
> > > > > >
> > > > > > > am I miss reading this loop:
> > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://github.com/apache/kafka/blob/0.8.1/core/src/main/scala/kafka/log/Log.scala#L265-L269
> > > > > > >
> > > > > > > it seems like all messages from `validMessages` (which is
> > > > > > > ByteBufferMessageSet) are NOT appended if one of the message
> size
> > > > > exceeds
> > > > > > > the limit.
> > > > > > >
> > > > > > > I hope I'm missing something.
> > > > > > >
> > > > > > >
> > > > > > >
> > > > > > > On Wed, Aug 27, 2014 at 9:38 PM, Alexis Midon <
> > > > > > > alexis.midon@airbedandbreakfast.com <javascript:;>> wrote:
> > > > > > >
> > > > > > > > Hi Jun,
> > > > > > > >
> > > > > > > > thanks for you answer.
> > > > > > > > Unfortunately the size won't help much, I'd like to see the
> > > actual
> > > > > > > message
> > > > > > > > data.
> > > > > > > >
> > > > > > > > By the way what are the things to consider when deciding on
> > > > > > > > `message.max.bytes` value?
> > > > > > > >
> > > > > > > >
> > > > > > > >
> > > > > > > >
> > > > > > > >
> > > > > > > >
> > > > > > > > On Wed, Aug 27, 2014 at 9:06 PM, Jun Rao <junrao@gmail.com
> > > > <javascript:;>> wrote:
> > > > > > > >
> > > > > > > >> The message size check is currently only done on the broker.
> > If
> > > > you
> > > > > > > enable
> > > > > > > >> trace level logging in RequestChannel, you will see the
> > produce
> > > > > > request,
> > > > > > > >> which includes the size of each partition.
> > > > > > > >>
> > > > > > > >> Thanks,
> > > > > > > >>
> > > > > > > >> Jun
> > > > > > > >>
> > > > > > > >>
> > > > > > > >> On Wed, Aug 27, 2014 at 4:40 PM, Alexis Midon <
> > > > > > > >> alexis.midon@airbedandbreakfast.com <javascript:;>> wrote:
> > > > > > > >>
> > > > > > > >> > Hello,
> > > > > > > >> >
> > > > > > > >> > my brokers are reporting that some received messages
> exceed
> > > the
> > > > > > > >> > `message.max.bytes` value.
> > > > > > > >> > I'd like to know what producers are at fault but It is
> > pretty
> > > > much
> > > > > > > >> > impossible:
> > > > > > > >> > - the brokers don't log the content of the rejected
> messages
> > > > > > > >> > - the log messages do not contain the IP of the producers
> > > > > > > >> > - on the consumer side, no exception is thrown (afaik it
> is
> > > > > because
> > > > > > > >> Ack-0
> > > > > > > >> > is used). The only kind of notification is to closed the
> > > > > connection.
> > > > > > > >> >
> > > > > > > >> > [1] Do you have any suggestions to track down the guilty
> > > > producers
> > > > > > or
> > > > > > > >> find
> > > > > > > >> > out the message content?
> > > > > > > >> >
> > > > > > > >> > Even though it makes total sense to have the limit defined
> > and
> > > > > > applied
> > > > > > > >> on
> > > > > > > >> > the brokers, I was thinking that this check could also be
> > > > applied
> > > > > by
> > > > > > > the
> > > > > > > >> > producers. Some google results suggest that
> > > `message.max.bytes`
> > > > > > might
> > > > > > > be
> > > > > > > >> > used by the producers but I can't find any trace of that
> > > > behavior
> > > > > in
> > > > > > > the
> > > > > > > >> > code.
> > > > > > > >> >
> > > > > > > >> > The closest thing I have is
> > > > > > > >> >
> > > > > > > >> >
> > > > > > > >>
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://github.com/apache/kafka/blob/0.8.1/core/src/main/scala/kafka/producer/SyncProducer.scala#L67
> > > > > > > >> > but it simply logs the message size and content and the
> log
> > > > level
> > > > > is
> > > > > > > >> trace.
> > > > > > > >> >
> > > > > > > >> > [2] could you please confirm if such a producer-side check
> > > > exists?
> > > > > > > >> >
> > > > > > > >> >
> > > > > > > >> > thanks!
> > > > > > > >> >
> > > > > > > >> > Alexis
> > > > > > > >> >
> > > > > > > >>
> > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
>

Re: message size limit

Posted by Alexis Midon <al...@airbedandbreakfast.com>.
Hi Bhavesh

can you explain what limit you're referring to?
I'm asking because `message.max.bytes` is applied per message not per batch.
is there another limit I should be aware of?

thanks


On Wed, Sep 3, 2014 at 2:07 PM, Bhavesh Mistry <mi...@gmail.com>
wrote:

> Hi Jun,
>
> We have similar problem.  We have variable length of messages.  So when we
> have fixed size of Batch sometime the batch exceed the limit set on the
> brokers (2MB).
>
> So can Producer have some extra logic to determine the optimal batch size
> by looking at configured message.max.bytes  value.
>
> During the metadata update, Producer will get this value from the Broker
> for each topic and Producer will check if current batch size reach this
> limit than break batch into smaller chunk such way that It would not exceed
> limit (unless single message exceed the limit). Basically try to avoid data
> loss as much as possible.
>
> Please let me know what is your opinion on this...
>
> Thanks,
>
> Bhavesh
>
>
> On Wed, Sep 3, 2014 at 6:21 AM, Alexis Midon <
> alexis.midon@airbedandbreakfast.com> wrote:
>
> > Thanks Jun.
> >
> > I'll create a jira and try to provide a patch. I think this is pretty
> > serious.
> >
> > On Friday, August 29, 2014, Jun Rao <ju...@gmail.com> wrote:
> >
> > > The goal of batching is mostly to reduce the # RPC calls to the broker.
> > If
> > > compression is enabled, a larger batch typically implies better
> > compression
> > > ratio.
> > >
> > > The reason that we have to fail the whole batch is that the error code
> in
> > > the produce response is per partition, instead of per message.
> > >
> > > Retrying individual messages on MessageSizeTooLarge seems reasonable.
> > >
> > > Thanks,
> > >
> > > Jun
> > >
> > >
> > > On Fri, Aug 29, 2014 at 4:28 PM, Alexis Midon <
> > > alexis.midon@airbedandbreakfast.com <javascript:;>> wrote:
> > >
> > > > Could you explain the goals of batches? I was assuming this was
> simply
> > a
> > > > performance optimization, but this behavior makes me think I'm
> missing
> > > > something.
> > > > is a batch more than a list of *independent* messages?
> > > >
> > > > Why would you reject the whole batch? One invalid message causes the
> > loss
> > > > of batch.num.messages-1 messages :(
> > > > It seems pretty critical to me.
> > > >
> > > > If ack=0, the producer will never know about it.
> > > > If ack !=0, the producer will retry the whole batch. If the issue was
> > > > related to data corruption (etc), retries might work. But in the case
> > of
> > > > "big message", the batch will always be rejected and the producer
> will
> > > give
> > > > up.
> > > >
> > > > If the messages are indeed considered independent, I think this is a
> > > pretty
> > > > serious issue.
> > > >
> > > > I see 2 possible fix approaches:
> > > > - the broker could reject only the invalid messages
> > > > - the broker could reject the whole batch (like today) but the
> producer
> > > (if
> > > > ack!=0) could retry messages one at a time on exception like
> > > > "MessageSizeTooLarge".
> > > >
> > > > opinions?
> > > >
> > > > Alexis
> > > >
> > > > ```
> > > > [2014-08-29 16:00:35,170] WARN Produce request with correlation id 46
> > > > failed due to [test,1]: kafka.common.MessageSizeTooLargeException
> > > > (kafka.producer.async.DefaultEventHandler)
> > > > [2014-08-29 16:00:35,284] WARN Produce request with correlation id 51
> > > > failed due to [test,0]: kafka.common.MessageSizeTooLargeException
> > > > (kafka.producer.async.DefaultEventHandler)
> > > > [2014-08-29 16:00:35,392] WARN Produce request with correlation id 56
> > > > failed due to [test,0]: kafka.common.MessageSizeTooLargeException
> > > > (kafka.producer.async.DefaultEventHandler)
> > > > [2014-08-29 16:00:35,499] WARN Produce request with correlation id 61
> > > > failed due to [test,1]: kafka.common.MessageSizeTooLargeException
> > > > (kafka.producer.async.DefaultEventHandler)
> > > > [2014-08-29 16:00:35,603] ERROR Failed to send requests for topics
> test
> > > > with correlation ids in [43,62]
> > > (kafka.producer.async.DefaultEventHandler)
> > > > [2014-08-29 16:00:35,603] ERROR Error in handling batch of 3 events
> > > > (kafka.producer.async.ProducerSendThread)
> > > > kafka.common.FailedToSendMessageException: Failed to send messages
> > after
> > > 3
> > > > tries.
> > > > at
> > > >
> > > >
> > >
> >
> kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:90)
> > > >  at
> > > >
> > > >
> > >
> >
> kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.scala:104)
> > > > at
> > > >
> > > >
> > >
> >
> kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:87)
> > > >  at
> > > >
> > > >
> > >
> >
> kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:67)
> > > > ```
> > > >
> > > >
> > > > On Thu, Aug 28, 2014 at 7:13 AM, Jun Rao <junrao@gmail.com
> > > <javascript:;>> wrote:
> > > >
> > > > > That's right. If one message in a batch exceeds the size limit, the
> > > whole
> > > > > batch is rejected.
> > > > >
> > > > > When determining message.max.bytes, the most important thing to
> > > consider
> > > > is
> > > > > probably memory since currently we need to allocate memory for a
> full
> > > > > message in the broker and the producer and the consumer client.
> > > > >
> > > > > Thanks,
> > > > >
> > > > > Jun
> > > > >
> > > > >
> > > > > On Wed, Aug 27, 2014 at 9:52 PM, Alexis Midon <
> > > > > alexis.midon@airbedandbreakfast.com <javascript:;>> wrote:
> > > > >
> > > > > > am I miss reading this loop:
> > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://github.com/apache/kafka/blob/0.8.1/core/src/main/scala/kafka/log/Log.scala#L265-L269
> > > > > >
> > > > > > it seems like all messages from `validMessages` (which is
> > > > > > ByteBufferMessageSet) are NOT appended if one of the message size
> > > > exceeds
> > > > > > the limit.
> > > > > >
> > > > > > I hope I'm missing something.
> > > > > >
> > > > > >
> > > > > >
> > > > > > On Wed, Aug 27, 2014 at 9:38 PM, Alexis Midon <
> > > > > > alexis.midon@airbedandbreakfast.com <javascript:;>> wrote:
> > > > > >
> > > > > > > Hi Jun,
> > > > > > >
> > > > > > > thanks for you answer.
> > > > > > > Unfortunately the size won't help much, I'd like to see the
> > actual
> > > > > > message
> > > > > > > data.
> > > > > > >
> > > > > > > By the way what are the things to consider when deciding on
> > > > > > > `message.max.bytes` value?
> > > > > > >
> > > > > > >
> > > > > > >
> > > > > > >
> > > > > > >
> > > > > > >
> > > > > > > On Wed, Aug 27, 2014 at 9:06 PM, Jun Rao <junrao@gmail.com
> > > <javascript:;>> wrote:
> > > > > > >
> > > > > > >> The message size check is currently only done on the broker.
> If
> > > you
> > > > > > enable
> > > > > > >> trace level logging in RequestChannel, you will see the
> produce
> > > > > request,
> > > > > > >> which includes the size of each partition.
> > > > > > >>
> > > > > > >> Thanks,
> > > > > > >>
> > > > > > >> Jun
> > > > > > >>
> > > > > > >>
> > > > > > >> On Wed, Aug 27, 2014 at 4:40 PM, Alexis Midon <
> > > > > > >> alexis.midon@airbedandbreakfast.com <javascript:;>> wrote:
> > > > > > >>
> > > > > > >> > Hello,
> > > > > > >> >
> > > > > > >> > my brokers are reporting that some received messages exceed
> > the
> > > > > > >> > `message.max.bytes` value.
> > > > > > >> > I'd like to know what producers are at fault but It is
> pretty
> > > much
> > > > > > >> > impossible:
> > > > > > >> > - the brokers don't log the content of the rejected messages
> > > > > > >> > - the log messages do not contain the IP of the producers
> > > > > > >> > - on the consumer side, no exception is thrown (afaik it is
> > > > because
> > > > > > >> Ack-0
> > > > > > >> > is used). The only kind of notification is to closed the
> > > > connection.
> > > > > > >> >
> > > > > > >> > [1] Do you have any suggestions to track down the guilty
> > > producers
> > > > > or
> > > > > > >> find
> > > > > > >> > out the message content?
> > > > > > >> >
> > > > > > >> > Even though it makes total sense to have the limit defined
> and
> > > > > applied
> > > > > > >> on
> > > > > > >> > the brokers, I was thinking that this check could also be
> > > applied
> > > > by
> > > > > > the
> > > > > > >> > producers. Some google results suggest that
> > `message.max.bytes`
> > > > > might
> > > > > > be
> > > > > > >> > used by the producers but I can't find any trace of that
> > > behavior
> > > > in
> > > > > > the
> > > > > > >> > code.
> > > > > > >> >
> > > > > > >> > The closest thing I have is
> > > > > > >> >
> > > > > > >> >
> > > > > > >>
> > > > > >
> > > > >
> > > >
> > >
> >
> https://github.com/apache/kafka/blob/0.8.1/core/src/main/scala/kafka/producer/SyncProducer.scala#L67
> > > > > > >> > but it simply logs the message size and content and the log
> > > level
> > > > is
> > > > > > >> trace.
> > > > > > >> >
> > > > > > >> > [2] could you please confirm if such a producer-side check
> > > exists?
> > > > > > >> >
> > > > > > >> >
> > > > > > >> > thanks!
> > > > > > >> >
> > > > > > >> > Alexis
> > > > > > >> >
> > > > > > >>
> > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
>

Re: message size limit

Posted by Bhavesh Mistry <mi...@gmail.com>.
Hi Jun,

We have similar problem.  We have variable length of messages.  So when we
have fixed size of Batch sometime the batch exceed the limit set on the
brokers (2MB).

So can Producer have some extra logic to determine the optimal batch size
by looking at configured message.max.bytes  value.

During the metadata update, Producer will get this value from the Broker
for each topic and Producer will check if current batch size reach this
limit than break batch into smaller chunk such way that It would not exceed
limit (unless single message exceed the limit). Basically try to avoid data
loss as much as possible.

Please let me know what is your opinion on this...

Thanks,

Bhavesh


On Wed, Sep 3, 2014 at 6:21 AM, Alexis Midon <
alexis.midon@airbedandbreakfast.com> wrote:

> Thanks Jun.
>
> I'll create a jira and try to provide a patch. I think this is pretty
> serious.
>
> On Friday, August 29, 2014, Jun Rao <ju...@gmail.com> wrote:
>
> > The goal of batching is mostly to reduce the # RPC calls to the broker.
> If
> > compression is enabled, a larger batch typically implies better
> compression
> > ratio.
> >
> > The reason that we have to fail the whole batch is that the error code in
> > the produce response is per partition, instead of per message.
> >
> > Retrying individual messages on MessageSizeTooLarge seems reasonable.
> >
> > Thanks,
> >
> > Jun
> >
> >
> > On Fri, Aug 29, 2014 at 4:28 PM, Alexis Midon <
> > alexis.midon@airbedandbreakfast.com <javascript:;>> wrote:
> >
> > > Could you explain the goals of batches? I was assuming this was simply
> a
> > > performance optimization, but this behavior makes me think I'm missing
> > > something.
> > > is a batch more than a list of *independent* messages?
> > >
> > > Why would you reject the whole batch? One invalid message causes the
> loss
> > > of batch.num.messages-1 messages :(
> > > It seems pretty critical to me.
> > >
> > > If ack=0, the producer will never know about it.
> > > If ack !=0, the producer will retry the whole batch. If the issue was
> > > related to data corruption (etc), retries might work. But in the case
> of
> > > "big message", the batch will always be rejected and the producer will
> > give
> > > up.
> > >
> > > If the messages are indeed considered independent, I think this is a
> > pretty
> > > serious issue.
> > >
> > > I see 2 possible fix approaches:
> > > - the broker could reject only the invalid messages
> > > - the broker could reject the whole batch (like today) but the producer
> > (if
> > > ack!=0) could retry messages one at a time on exception like
> > > "MessageSizeTooLarge".
> > >
> > > opinions?
> > >
> > > Alexis
> > >
> > > ```
> > > [2014-08-29 16:00:35,170] WARN Produce request with correlation id 46
> > > failed due to [test,1]: kafka.common.MessageSizeTooLargeException
> > > (kafka.producer.async.DefaultEventHandler)
> > > [2014-08-29 16:00:35,284] WARN Produce request with correlation id 51
> > > failed due to [test,0]: kafka.common.MessageSizeTooLargeException
> > > (kafka.producer.async.DefaultEventHandler)
> > > [2014-08-29 16:00:35,392] WARN Produce request with correlation id 56
> > > failed due to [test,0]: kafka.common.MessageSizeTooLargeException
> > > (kafka.producer.async.DefaultEventHandler)
> > > [2014-08-29 16:00:35,499] WARN Produce request with correlation id 61
> > > failed due to [test,1]: kafka.common.MessageSizeTooLargeException
> > > (kafka.producer.async.DefaultEventHandler)
> > > [2014-08-29 16:00:35,603] ERROR Failed to send requests for topics test
> > > with correlation ids in [43,62]
> > (kafka.producer.async.DefaultEventHandler)
> > > [2014-08-29 16:00:35,603] ERROR Error in handling batch of 3 events
> > > (kafka.producer.async.ProducerSendThread)
> > > kafka.common.FailedToSendMessageException: Failed to send messages
> after
> > 3
> > > tries.
> > > at
> > >
> > >
> >
> kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:90)
> > >  at
> > >
> > >
> >
> kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.scala:104)
> > > at
> > >
> > >
> >
> kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:87)
> > >  at
> > >
> > >
> >
> kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:67)
> > > ```
> > >
> > >
> > > On Thu, Aug 28, 2014 at 7:13 AM, Jun Rao <junrao@gmail.com
> > <javascript:;>> wrote:
> > >
> > > > That's right. If one message in a batch exceeds the size limit, the
> > whole
> > > > batch is rejected.
> > > >
> > > > When determining message.max.bytes, the most important thing to
> > consider
> > > is
> > > > probably memory since currently we need to allocate memory for a full
> > > > message in the broker and the producer and the consumer client.
> > > >
> > > > Thanks,
> > > >
> > > > Jun
> > > >
> > > >
> > > > On Wed, Aug 27, 2014 at 9:52 PM, Alexis Midon <
> > > > alexis.midon@airbedandbreakfast.com <javascript:;>> wrote:
> > > >
> > > > > am I miss reading this loop:
> > > > >
> > > > >
> > > >
> > >
> >
> https://github.com/apache/kafka/blob/0.8.1/core/src/main/scala/kafka/log/Log.scala#L265-L269
> > > > >
> > > > > it seems like all messages from `validMessages` (which is
> > > > > ByteBufferMessageSet) are NOT appended if one of the message size
> > > exceeds
> > > > > the limit.
> > > > >
> > > > > I hope I'm missing something.
> > > > >
> > > > >
> > > > >
> > > > > On Wed, Aug 27, 2014 at 9:38 PM, Alexis Midon <
> > > > > alexis.midon@airbedandbreakfast.com <javascript:;>> wrote:
> > > > >
> > > > > > Hi Jun,
> > > > > >
> > > > > > thanks for you answer.
> > > > > > Unfortunately the size won't help much, I'd like to see the
> actual
> > > > > message
> > > > > > data.
> > > > > >
> > > > > > By the way what are the things to consider when deciding on
> > > > > > `message.max.bytes` value?
> > > > > >
> > > > > >
> > > > > >
> > > > > >
> > > > > >
> > > > > >
> > > > > > On Wed, Aug 27, 2014 at 9:06 PM, Jun Rao <junrao@gmail.com
> > <javascript:;>> wrote:
> > > > > >
> > > > > >> The message size check is currently only done on the broker. If
> > you
> > > > > enable
> > > > > >> trace level logging in RequestChannel, you will see the produce
> > > > request,
> > > > > >> which includes the size of each partition.
> > > > > >>
> > > > > >> Thanks,
> > > > > >>
> > > > > >> Jun
> > > > > >>
> > > > > >>
> > > > > >> On Wed, Aug 27, 2014 at 4:40 PM, Alexis Midon <
> > > > > >> alexis.midon@airbedandbreakfast.com <javascript:;>> wrote:
> > > > > >>
> > > > > >> > Hello,
> > > > > >> >
> > > > > >> > my brokers are reporting that some received messages exceed
> the
> > > > > >> > `message.max.bytes` value.
> > > > > >> > I'd like to know what producers are at fault but It is pretty
> > much
> > > > > >> > impossible:
> > > > > >> > - the brokers don't log the content of the rejected messages
> > > > > >> > - the log messages do not contain the IP of the producers
> > > > > >> > - on the consumer side, no exception is thrown (afaik it is
> > > because
> > > > > >> Ack-0
> > > > > >> > is used). The only kind of notification is to closed the
> > > connection.
> > > > > >> >
> > > > > >> > [1] Do you have any suggestions to track down the guilty
> > producers
> > > > or
> > > > > >> find
> > > > > >> > out the message content?
> > > > > >> >
> > > > > >> > Even though it makes total sense to have the limit defined and
> > > > applied
> > > > > >> on
> > > > > >> > the brokers, I was thinking that this check could also be
> > applied
> > > by
> > > > > the
> > > > > >> > producers. Some google results suggest that
> `message.max.bytes`
> > > > might
> > > > > be
> > > > > >> > used by the producers but I can't find any trace of that
> > behavior
> > > in
> > > > > the
> > > > > >> > code.
> > > > > >> >
> > > > > >> > The closest thing I have is
> > > > > >> >
> > > > > >> >
> > > > > >>
> > > > >
> > > >
> > >
> >
> https://github.com/apache/kafka/blob/0.8.1/core/src/main/scala/kafka/producer/SyncProducer.scala#L67
> > > > > >> > but it simply logs the message size and content and the log
> > level
> > > is
> > > > > >> trace.
> > > > > >> >
> > > > > >> > [2] could you please confirm if such a producer-side check
> > exists?
> > > > > >> >
> > > > > >> >
> > > > > >> > thanks!
> > > > > >> >
> > > > > >> > Alexis
> > > > > >> >
> > > > > >>
> > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
>

Re: message size limit

Posted by Alexis Midon <al...@airbedandbreakfast.com>.
Thanks Jun.

I'll create a jira and try to provide a patch. I think this is pretty
serious.

On Friday, August 29, 2014, Jun Rao <ju...@gmail.com> wrote:

> The goal of batching is mostly to reduce the # RPC calls to the broker. If
> compression is enabled, a larger batch typically implies better compression
> ratio.
>
> The reason that we have to fail the whole batch is that the error code in
> the produce response is per partition, instead of per message.
>
> Retrying individual messages on MessageSizeTooLarge seems reasonable.
>
> Thanks,
>
> Jun
>
>
> On Fri, Aug 29, 2014 at 4:28 PM, Alexis Midon <
> alexis.midon@airbedandbreakfast.com <javascript:;>> wrote:
>
> > Could you explain the goals of batches? I was assuming this was simply a
> > performance optimization, but this behavior makes me think I'm missing
> > something.
> > is a batch more than a list of *independent* messages?
> >
> > Why would you reject the whole batch? One invalid message causes the loss
> > of batch.num.messages-1 messages :(
> > It seems pretty critical to me.
> >
> > If ack=0, the producer will never know about it.
> > If ack !=0, the producer will retry the whole batch. If the issue was
> > related to data corruption (etc), retries might work. But in the case of
> > "big message", the batch will always be rejected and the producer will
> give
> > up.
> >
> > If the messages are indeed considered independent, I think this is a
> pretty
> > serious issue.
> >
> > I see 2 possible fix approaches:
> > - the broker could reject only the invalid messages
> > - the broker could reject the whole batch (like today) but the producer
> (if
> > ack!=0) could retry messages one at a time on exception like
> > "MessageSizeTooLarge".
> >
> > opinions?
> >
> > Alexis
> >
> > ```
> > [2014-08-29 16:00:35,170] WARN Produce request with correlation id 46
> > failed due to [test,1]: kafka.common.MessageSizeTooLargeException
> > (kafka.producer.async.DefaultEventHandler)
> > [2014-08-29 16:00:35,284] WARN Produce request with correlation id 51
> > failed due to [test,0]: kafka.common.MessageSizeTooLargeException
> > (kafka.producer.async.DefaultEventHandler)
> > [2014-08-29 16:00:35,392] WARN Produce request with correlation id 56
> > failed due to [test,0]: kafka.common.MessageSizeTooLargeException
> > (kafka.producer.async.DefaultEventHandler)
> > [2014-08-29 16:00:35,499] WARN Produce request with correlation id 61
> > failed due to [test,1]: kafka.common.MessageSizeTooLargeException
> > (kafka.producer.async.DefaultEventHandler)
> > [2014-08-29 16:00:35,603] ERROR Failed to send requests for topics test
> > with correlation ids in [43,62]
> (kafka.producer.async.DefaultEventHandler)
> > [2014-08-29 16:00:35,603] ERROR Error in handling batch of 3 events
> > (kafka.producer.async.ProducerSendThread)
> > kafka.common.FailedToSendMessageException: Failed to send messages after
> 3
> > tries.
> > at
> >
> >
> kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:90)
> >  at
> >
> >
> kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.scala:104)
> > at
> >
> >
> kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:87)
> >  at
> >
> >
> kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:67)
> > ```
> >
> >
> > On Thu, Aug 28, 2014 at 7:13 AM, Jun Rao <junrao@gmail.com
> <javascript:;>> wrote:
> >
> > > That's right. If one message in a batch exceeds the size limit, the
> whole
> > > batch is rejected.
> > >
> > > When determining message.max.bytes, the most important thing to
> consider
> > is
> > > probably memory since currently we need to allocate memory for a full
> > > message in the broker and the producer and the consumer client.
> > >
> > > Thanks,
> > >
> > > Jun
> > >
> > >
> > > On Wed, Aug 27, 2014 at 9:52 PM, Alexis Midon <
> > > alexis.midon@airbedandbreakfast.com <javascript:;>> wrote:
> > >
> > > > am I miss reading this loop:
> > > >
> > > >
> > >
> >
> https://github.com/apache/kafka/blob/0.8.1/core/src/main/scala/kafka/log/Log.scala#L265-L269
> > > >
> > > > it seems like all messages from `validMessages` (which is
> > > > ByteBufferMessageSet) are NOT appended if one of the message size
> > exceeds
> > > > the limit.
> > > >
> > > > I hope I'm missing something.
> > > >
> > > >
> > > >
> > > > On Wed, Aug 27, 2014 at 9:38 PM, Alexis Midon <
> > > > alexis.midon@airbedandbreakfast.com <javascript:;>> wrote:
> > > >
> > > > > Hi Jun,
> > > > >
> > > > > thanks for you answer.
> > > > > Unfortunately the size won't help much, I'd like to see the actual
> > > > message
> > > > > data.
> > > > >
> > > > > By the way what are the things to consider when deciding on
> > > > > `message.max.bytes` value?
> > > > >
> > > > >
> > > > >
> > > > >
> > > > >
> > > > >
> > > > > On Wed, Aug 27, 2014 at 9:06 PM, Jun Rao <junrao@gmail.com
> <javascript:;>> wrote:
> > > > >
> > > > >> The message size check is currently only done on the broker. If
> you
> > > > enable
> > > > >> trace level logging in RequestChannel, you will see the produce
> > > request,
> > > > >> which includes the size of each partition.
> > > > >>
> > > > >> Thanks,
> > > > >>
> > > > >> Jun
> > > > >>
> > > > >>
> > > > >> On Wed, Aug 27, 2014 at 4:40 PM, Alexis Midon <
> > > > >> alexis.midon@airbedandbreakfast.com <javascript:;>> wrote:
> > > > >>
> > > > >> > Hello,
> > > > >> >
> > > > >> > my brokers are reporting that some received messages exceed the
> > > > >> > `message.max.bytes` value.
> > > > >> > I'd like to know what producers are at fault but It is pretty
> much
> > > > >> > impossible:
> > > > >> > - the brokers don't log the content of the rejected messages
> > > > >> > - the log messages do not contain the IP of the producers
> > > > >> > - on the consumer side, no exception is thrown (afaik it is
> > because
> > > > >> Ack-0
> > > > >> > is used). The only kind of notification is to closed the
> > connection.
> > > > >> >
> > > > >> > [1] Do you have any suggestions to track down the guilty
> producers
> > > or
> > > > >> find
> > > > >> > out the message content?
> > > > >> >
> > > > >> > Even though it makes total sense to have the limit defined and
> > > applied
> > > > >> on
> > > > >> > the brokers, I was thinking that this check could also be
> applied
> > by
> > > > the
> > > > >> > producers. Some google results suggest that `message.max.bytes`
> > > might
> > > > be
> > > > >> > used by the producers but I can't find any trace of that
> behavior
> > in
> > > > the
> > > > >> > code.
> > > > >> >
> > > > >> > The closest thing I have is
> > > > >> >
> > > > >> >
> > > > >>
> > > >
> > >
> >
> https://github.com/apache/kafka/blob/0.8.1/core/src/main/scala/kafka/producer/SyncProducer.scala#L67
> > > > >> > but it simply logs the message size and content and the log
> level
> > is
> > > > >> trace.
> > > > >> >
> > > > >> > [2] could you please confirm if such a producer-side check
> exists?
> > > > >> >
> > > > >> >
> > > > >> > thanks!
> > > > >> >
> > > > >> > Alexis
> > > > >> >
> > > > >>
> > > > >
> > > > >
> > > >
> > >
> >
>

Re: message size limit

Posted by Jun Rao <ju...@gmail.com>.
The goal of batching is mostly to reduce the # RPC calls to the broker. If
compression is enabled, a larger batch typically implies better compression
ratio.

The reason that we have to fail the whole batch is that the error code in
the produce response is per partition, instead of per message.

Retrying individual messages on MessageSizeTooLarge seems reasonable.

Thanks,

Jun


On Fri, Aug 29, 2014 at 4:28 PM, Alexis Midon <
alexis.midon@airbedandbreakfast.com> wrote:

> Could you explain the goals of batches? I was assuming this was simply a
> performance optimization, but this behavior makes me think I'm missing
> something.
> is a batch more than a list of *independent* messages?
>
> Why would you reject the whole batch? One invalid message causes the loss
> of batch.num.messages-1 messages :(
> It seems pretty critical to me.
>
> If ack=0, the producer will never know about it.
> If ack !=0, the producer will retry the whole batch. If the issue was
> related to data corruption (etc), retries might work. But in the case of
> "big message", the batch will always be rejected and the producer will give
> up.
>
> If the messages are indeed considered independent, I think this is a pretty
> serious issue.
>
> I see 2 possible fix approaches:
> - the broker could reject only the invalid messages
> - the broker could reject the whole batch (like today) but the producer (if
> ack!=0) could retry messages one at a time on exception like
> "MessageSizeTooLarge".
>
> opinions?
>
> Alexis
>
> ```
> [2014-08-29 16:00:35,170] WARN Produce request with correlation id 46
> failed due to [test,1]: kafka.common.MessageSizeTooLargeException
> (kafka.producer.async.DefaultEventHandler)
> [2014-08-29 16:00:35,284] WARN Produce request with correlation id 51
> failed due to [test,0]: kafka.common.MessageSizeTooLargeException
> (kafka.producer.async.DefaultEventHandler)
> [2014-08-29 16:00:35,392] WARN Produce request with correlation id 56
> failed due to [test,0]: kafka.common.MessageSizeTooLargeException
> (kafka.producer.async.DefaultEventHandler)
> [2014-08-29 16:00:35,499] WARN Produce request with correlation id 61
> failed due to [test,1]: kafka.common.MessageSizeTooLargeException
> (kafka.producer.async.DefaultEventHandler)
> [2014-08-29 16:00:35,603] ERROR Failed to send requests for topics test
> with correlation ids in [43,62] (kafka.producer.async.DefaultEventHandler)
> [2014-08-29 16:00:35,603] ERROR Error in handling batch of 3 events
> (kafka.producer.async.ProducerSendThread)
> kafka.common.FailedToSendMessageException: Failed to send messages after 3
> tries.
> at
>
> kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:90)
>  at
>
> kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.scala:104)
> at
>
> kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:87)
>  at
>
> kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:67)
> ```
>
>
> On Thu, Aug 28, 2014 at 7:13 AM, Jun Rao <ju...@gmail.com> wrote:
>
> > That's right. If one message in a batch exceeds the size limit, the whole
> > batch is rejected.
> >
> > When determining message.max.bytes, the most important thing to consider
> is
> > probably memory since currently we need to allocate memory for a full
> > message in the broker and the producer and the consumer client.
> >
> > Thanks,
> >
> > Jun
> >
> >
> > On Wed, Aug 27, 2014 at 9:52 PM, Alexis Midon <
> > alexis.midon@airbedandbreakfast.com> wrote:
> >
> > > am I miss reading this loop:
> > >
> > >
> >
> https://github.com/apache/kafka/blob/0.8.1/core/src/main/scala/kafka/log/Log.scala#L265-L269
> > >
> > > it seems like all messages from `validMessages` (which is
> > > ByteBufferMessageSet) are NOT appended if one of the message size
> exceeds
> > > the limit.
> > >
> > > I hope I'm missing something.
> > >
> > >
> > >
> > > On Wed, Aug 27, 2014 at 9:38 PM, Alexis Midon <
> > > alexis.midon@airbedandbreakfast.com> wrote:
> > >
> > > > Hi Jun,
> > > >
> > > > thanks for you answer.
> > > > Unfortunately the size won't help much, I'd like to see the actual
> > > message
> > > > data.
> > > >
> > > > By the way what are the things to consider when deciding on
> > > > `message.max.bytes` value?
> > > >
> > > >
> > > >
> > > >
> > > >
> > > >
> > > > On Wed, Aug 27, 2014 at 9:06 PM, Jun Rao <ju...@gmail.com> wrote:
> > > >
> > > >> The message size check is currently only done on the broker. If you
> > > enable
> > > >> trace level logging in RequestChannel, you will see the produce
> > request,
> > > >> which includes the size of each partition.
> > > >>
> > > >> Thanks,
> > > >>
> > > >> Jun
> > > >>
> > > >>
> > > >> On Wed, Aug 27, 2014 at 4:40 PM, Alexis Midon <
> > > >> alexis.midon@airbedandbreakfast.com> wrote:
> > > >>
> > > >> > Hello,
> > > >> >
> > > >> > my brokers are reporting that some received messages exceed the
> > > >> > `message.max.bytes` value.
> > > >> > I'd like to know what producers are at fault but It is pretty much
> > > >> > impossible:
> > > >> > - the brokers don't log the content of the rejected messages
> > > >> > - the log messages do not contain the IP of the producers
> > > >> > - on the consumer side, no exception is thrown (afaik it is
> because
> > > >> Ack-0
> > > >> > is used). The only kind of notification is to closed the
> connection.
> > > >> >
> > > >> > [1] Do you have any suggestions to track down the guilty producers
> > or
> > > >> find
> > > >> > out the message content?
> > > >> >
> > > >> > Even though it makes total sense to have the limit defined and
> > applied
> > > >> on
> > > >> > the brokers, I was thinking that this check could also be applied
> by
> > > the
> > > >> > producers. Some google results suggest that `message.max.bytes`
> > might
> > > be
> > > >> > used by the producers but I can't find any trace of that behavior
> in
> > > the
> > > >> > code.
> > > >> >
> > > >> > The closest thing I have is
> > > >> >
> > > >> >
> > > >>
> > >
> >
> https://github.com/apache/kafka/blob/0.8.1/core/src/main/scala/kafka/producer/SyncProducer.scala#L67
> > > >> > but it simply logs the message size and content and the log level
> is
> > > >> trace.
> > > >> >
> > > >> > [2] could you please confirm if such a producer-side check exists?
> > > >> >
> > > >> >
> > > >> > thanks!
> > > >> >
> > > >> > Alexis
> > > >> >
> > > >>
> > > >
> > > >
> > >
> >
>

Re: message size limit

Posted by Alexis Midon <al...@airbedandbreakfast.com>.
Could you explain the goals of batches? I was assuming this was simply a
performance optimization, but this behavior makes me think I'm missing
something.
is a batch more than a list of *independent* messages?

Why would you reject the whole batch? One invalid message causes the loss
of batch.num.messages-1 messages :(
It seems pretty critical to me.

If ack=0, the producer will never know about it.
If ack !=0, the producer will retry the whole batch. If the issue was
related to data corruption (etc), retries might work. But in the case of
"big message", the batch will always be rejected and the producer will give
up.

If the messages are indeed considered independent, I think this is a pretty
serious issue.

I see 2 possible fix approaches:
- the broker could reject only the invalid messages
- the broker could reject the whole batch (like today) but the producer (if
ack!=0) could retry messages one at a time on exception like
"MessageSizeTooLarge".

opinions?

Alexis

```
[2014-08-29 16:00:35,170] WARN Produce request with correlation id 46
failed due to [test,1]: kafka.common.MessageSizeTooLargeException
(kafka.producer.async.DefaultEventHandler)
[2014-08-29 16:00:35,284] WARN Produce request with correlation id 51
failed due to [test,0]: kafka.common.MessageSizeTooLargeException
(kafka.producer.async.DefaultEventHandler)
[2014-08-29 16:00:35,392] WARN Produce request with correlation id 56
failed due to [test,0]: kafka.common.MessageSizeTooLargeException
(kafka.producer.async.DefaultEventHandler)
[2014-08-29 16:00:35,499] WARN Produce request with correlation id 61
failed due to [test,1]: kafka.common.MessageSizeTooLargeException
(kafka.producer.async.DefaultEventHandler)
[2014-08-29 16:00:35,603] ERROR Failed to send requests for topics test
with correlation ids in [43,62] (kafka.producer.async.DefaultEventHandler)
[2014-08-29 16:00:35,603] ERROR Error in handling batch of 3 events
(kafka.producer.async.ProducerSendThread)
kafka.common.FailedToSendMessageException: Failed to send messages after 3
tries.
at
kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:90)
 at
kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.scala:104)
at
kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:87)
 at
kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:67)
```


On Thu, Aug 28, 2014 at 7:13 AM, Jun Rao <ju...@gmail.com> wrote:

> That's right. If one message in a batch exceeds the size limit, the whole
> batch is rejected.
>
> When determining message.max.bytes, the most important thing to consider is
> probably memory since currently we need to allocate memory for a full
> message in the broker and the producer and the consumer client.
>
> Thanks,
>
> Jun
>
>
> On Wed, Aug 27, 2014 at 9:52 PM, Alexis Midon <
> alexis.midon@airbedandbreakfast.com> wrote:
>
> > am I miss reading this loop:
> >
> >
> https://github.com/apache/kafka/blob/0.8.1/core/src/main/scala/kafka/log/Log.scala#L265-L269
> >
> > it seems like all messages from `validMessages` (which is
> > ByteBufferMessageSet) are NOT appended if one of the message size exceeds
> > the limit.
> >
> > I hope I'm missing something.
> >
> >
> >
> > On Wed, Aug 27, 2014 at 9:38 PM, Alexis Midon <
> > alexis.midon@airbedandbreakfast.com> wrote:
> >
> > > Hi Jun,
> > >
> > > thanks for you answer.
> > > Unfortunately the size won't help much, I'd like to see the actual
> > message
> > > data.
> > >
> > > By the way what are the things to consider when deciding on
> > > `message.max.bytes` value?
> > >
> > >
> > >
> > >
> > >
> > >
> > > On Wed, Aug 27, 2014 at 9:06 PM, Jun Rao <ju...@gmail.com> wrote:
> > >
> > >> The message size check is currently only done on the broker. If you
> > enable
> > >> trace level logging in RequestChannel, you will see the produce
> request,
> > >> which includes the size of each partition.
> > >>
> > >> Thanks,
> > >>
> > >> Jun
> > >>
> > >>
> > >> On Wed, Aug 27, 2014 at 4:40 PM, Alexis Midon <
> > >> alexis.midon@airbedandbreakfast.com> wrote:
> > >>
> > >> > Hello,
> > >> >
> > >> > my brokers are reporting that some received messages exceed the
> > >> > `message.max.bytes` value.
> > >> > I'd like to know what producers are at fault but It is pretty much
> > >> > impossible:
> > >> > - the brokers don't log the content of the rejected messages
> > >> > - the log messages do not contain the IP of the producers
> > >> > - on the consumer side, no exception is thrown (afaik it is because
> > >> Ack-0
> > >> > is used). The only kind of notification is to closed the connection.
> > >> >
> > >> > [1] Do you have any suggestions to track down the guilty producers
> or
> > >> find
> > >> > out the message content?
> > >> >
> > >> > Even though it makes total sense to have the limit defined and
> applied
> > >> on
> > >> > the brokers, I was thinking that this check could also be applied by
> > the
> > >> > producers. Some google results suggest that `message.max.bytes`
> might
> > be
> > >> > used by the producers but I can't find any trace of that behavior in
> > the
> > >> > code.
> > >> >
> > >> > The closest thing I have is
> > >> >
> > >> >
> > >>
> >
> https://github.com/apache/kafka/blob/0.8.1/core/src/main/scala/kafka/producer/SyncProducer.scala#L67
> > >> > but it simply logs the message size and content and the log level is
> > >> trace.
> > >> >
> > >> > [2] could you please confirm if such a producer-side check exists?
> > >> >
> > >> >
> > >> > thanks!
> > >> >
> > >> > Alexis
> > >> >
> > >>
> > >
> > >
> >
>

Re: message size limit

Posted by Jun Rao <ju...@gmail.com>.
That's right. If one message in a batch exceeds the size limit, the whole
batch is rejected.

When determining message.max.bytes, the most important thing to consider is
probably memory since currently we need to allocate memory for a full
message in the broker and the producer and the consumer client.

Thanks,

Jun


On Wed, Aug 27, 2014 at 9:52 PM, Alexis Midon <
alexis.midon@airbedandbreakfast.com> wrote:

> am I miss reading this loop:
>
> https://github.com/apache/kafka/blob/0.8.1/core/src/main/scala/kafka/log/Log.scala#L265-L269
>
> it seems like all messages from `validMessages` (which is
> ByteBufferMessageSet) are NOT appended if one of the message size exceeds
> the limit.
>
> I hope I'm missing something.
>
>
>
> On Wed, Aug 27, 2014 at 9:38 PM, Alexis Midon <
> alexis.midon@airbedandbreakfast.com> wrote:
>
> > Hi Jun,
> >
> > thanks for you answer.
> > Unfortunately the size won't help much, I'd like to see the actual
> message
> > data.
> >
> > By the way what are the things to consider when deciding on
> > `message.max.bytes` value?
> >
> >
> >
> >
> >
> >
> > On Wed, Aug 27, 2014 at 9:06 PM, Jun Rao <ju...@gmail.com> wrote:
> >
> >> The message size check is currently only done on the broker. If you
> enable
> >> trace level logging in RequestChannel, you will see the produce request,
> >> which includes the size of each partition.
> >>
> >> Thanks,
> >>
> >> Jun
> >>
> >>
> >> On Wed, Aug 27, 2014 at 4:40 PM, Alexis Midon <
> >> alexis.midon@airbedandbreakfast.com> wrote:
> >>
> >> > Hello,
> >> >
> >> > my brokers are reporting that some received messages exceed the
> >> > `message.max.bytes` value.
> >> > I'd like to know what producers are at fault but It is pretty much
> >> > impossible:
> >> > - the brokers don't log the content of the rejected messages
> >> > - the log messages do not contain the IP of the producers
> >> > - on the consumer side, no exception is thrown (afaik it is because
> >> Ack-0
> >> > is used). The only kind of notification is to closed the connection.
> >> >
> >> > [1] Do you have any suggestions to track down the guilty producers or
> >> find
> >> > out the message content?
> >> >
> >> > Even though it makes total sense to have the limit defined and applied
> >> on
> >> > the brokers, I was thinking that this check could also be applied by
> the
> >> > producers. Some google results suggest that `message.max.bytes` might
> be
> >> > used by the producers but I can't find any trace of that behavior in
> the
> >> > code.
> >> >
> >> > The closest thing I have is
> >> >
> >> >
> >>
> https://github.com/apache/kafka/blob/0.8.1/core/src/main/scala/kafka/producer/SyncProducer.scala#L67
> >> > but it simply logs the message size and content and the log level is
> >> trace.
> >> >
> >> > [2] could you please confirm if such a producer-side check exists?
> >> >
> >> >
> >> > thanks!
> >> >
> >> > Alexis
> >> >
> >>
> >
> >
>

Re: message size limit

Posted by Alexis Midon <al...@airbedandbreakfast.com>.
am I miss reading this loop:
https://github.com/apache/kafka/blob/0.8.1/core/src/main/scala/kafka/log/Log.scala#L265-L269

it seems like all messages from `validMessages` (which is
ByteBufferMessageSet) are NOT appended if one of the message size exceeds
the limit.

I hope I'm missing something.



On Wed, Aug 27, 2014 at 9:38 PM, Alexis Midon <
alexis.midon@airbedandbreakfast.com> wrote:

> Hi Jun,
>
> thanks for you answer.
> Unfortunately the size won't help much, I'd like to see the actual message
> data.
>
> By the way what are the things to consider when deciding on
> `message.max.bytes` value?
>
>
>
>
>
>
> On Wed, Aug 27, 2014 at 9:06 PM, Jun Rao <ju...@gmail.com> wrote:
>
>> The message size check is currently only done on the broker. If you enable
>> trace level logging in RequestChannel, you will see the produce request,
>> which includes the size of each partition.
>>
>> Thanks,
>>
>> Jun
>>
>>
>> On Wed, Aug 27, 2014 at 4:40 PM, Alexis Midon <
>> alexis.midon@airbedandbreakfast.com> wrote:
>>
>> > Hello,
>> >
>> > my brokers are reporting that some received messages exceed the
>> > `message.max.bytes` value.
>> > I'd like to know what producers are at fault but It is pretty much
>> > impossible:
>> > - the brokers don't log the content of the rejected messages
>> > - the log messages do not contain the IP of the producers
>> > - on the consumer side, no exception is thrown (afaik it is because
>> Ack-0
>> > is used). The only kind of notification is to closed the connection.
>> >
>> > [1] Do you have any suggestions to track down the guilty producers or
>> find
>> > out the message content?
>> >
>> > Even though it makes total sense to have the limit defined and applied
>> on
>> > the brokers, I was thinking that this check could also be applied by the
>> > producers. Some google results suggest that `message.max.bytes` might be
>> > used by the producers but I can't find any trace of that behavior in the
>> > code.
>> >
>> > The closest thing I have is
>> >
>> >
>> https://github.com/apache/kafka/blob/0.8.1/core/src/main/scala/kafka/producer/SyncProducer.scala#L67
>> > but it simply logs the message size and content and the log level is
>> trace.
>> >
>> > [2] could you please confirm if such a producer-side check exists?
>> >
>> >
>> > thanks!
>> >
>> > Alexis
>> >
>>
>
>

Re: message size limit

Posted by Alexis Midon <al...@airbedandbreakfast.com>.
Hi Jun,

thanks for you answer.
Unfortunately the size won't help much, I'd like to see the actual message
data.

By the way what are the things to consider when deciding on
`message.max.bytes` value?






On Wed, Aug 27, 2014 at 9:06 PM, Jun Rao <ju...@gmail.com> wrote:

> The message size check is currently only done on the broker. If you enable
> trace level logging in RequestChannel, you will see the produce request,
> which includes the size of each partition.
>
> Thanks,
>
> Jun
>
>
> On Wed, Aug 27, 2014 at 4:40 PM, Alexis Midon <
> alexis.midon@airbedandbreakfast.com> wrote:
>
> > Hello,
> >
> > my brokers are reporting that some received messages exceed the
> > `message.max.bytes` value.
> > I'd like to know what producers are at fault but It is pretty much
> > impossible:
> > - the brokers don't log the content of the rejected messages
> > - the log messages do not contain the IP of the producers
> > - on the consumer side, no exception is thrown (afaik it is because Ack-0
> > is used). The only kind of notification is to closed the connection.
> >
> > [1] Do you have any suggestions to track down the guilty producers or
> find
> > out the message content?
> >
> > Even though it makes total sense to have the limit defined and applied on
> > the brokers, I was thinking that this check could also be applied by the
> > producers. Some google results suggest that `message.max.bytes` might be
> > used by the producers but I can't find any trace of that behavior in the
> > code.
> >
> > The closest thing I have is
> >
> >
> https://github.com/apache/kafka/blob/0.8.1/core/src/main/scala/kafka/producer/SyncProducer.scala#L67
> > but it simply logs the message size and content and the log level is
> trace.
> >
> > [2] could you please confirm if such a producer-side check exists?
> >
> >
> > thanks!
> >
> > Alexis
> >
>

Re: message size limit

Posted by Jun Rao <ju...@gmail.com>.
The message size check is currently only done on the broker. If you enable
trace level logging in RequestChannel, you will see the produce request,
which includes the size of each partition.

Thanks,

Jun


On Wed, Aug 27, 2014 at 4:40 PM, Alexis Midon <
alexis.midon@airbedandbreakfast.com> wrote:

> Hello,
>
> my brokers are reporting that some received messages exceed the
> `message.max.bytes` value.
> I'd like to know what producers are at fault but It is pretty much
> impossible:
> - the brokers don't log the content of the rejected messages
> - the log messages do not contain the IP of the producers
> - on the consumer side, no exception is thrown (afaik it is because Ack-0
> is used). The only kind of notification is to closed the connection.
>
> [1] Do you have any suggestions to track down the guilty producers or find
> out the message content?
>
> Even though it makes total sense to have the limit defined and applied on
> the brokers, I was thinking that this check could also be applied by the
> producers. Some google results suggest that `message.max.bytes` might be
> used by the producers but I can't find any trace of that behavior in the
> code.
>
> The closest thing I have is
>
> https://github.com/apache/kafka/blob/0.8.1/core/src/main/scala/kafka/producer/SyncProducer.scala#L67
> but it simply logs the message size and content and the log level is trace.
>
> [2] could you please confirm if such a producer-side check exists?
>
>
> thanks!
>
> Alexis
>