You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@kafka.apache.org by radai <ra...@gmail.com> on 2016/11/05 00:04:25 UTC

Re: [DISCUSS] KIP-72 Allow Sizing Incoming Request Queue in Bytes

here are my proposed changes:

https://github.com/radai-rosenblatt/kafka/commit/8d7744ab8a6c660c4749b495b033b948a68efd3c

at this point i've run this code on a test cluster under load that OOMs
"vanilla" 0.10.1.0 and verified that my code deployed under the same
condition remains stable.

what i've done:

1. configure max heap size to 1.5GB and a single io thread (makes it easier
to DOS)
2. set up a topic with 100 partitions all on the same broker (makes it
easier to focus IO) - ./kafka-topics.sh --zookeeper <zk> --create --topic
dos --replica-assignment [100 times the same broker id]
3. spin up load from 10 machines - ./kafka-producer-perf-test.sh --topic
dos --num-records 1000000 --record-size 991600 --throughput 100000
--producer-props bootstrap.servers=<broker> max.request.size=104857600
acks=0 linger.ms=30000 buffer.memory=209715200 batch.size=1048576

this would result in single requests that are just under 100MB in size,
times 10 for ~1GB max oustanding memory requirement. on my setup it was
enough to reliably DOS 10.0.1.0. under my patch the broker held up (request
rate was throttled).

performance when not under memory load was roughly the same (note the
longest run was ~1 hour, havent done long term stress tests yet).

At this point I think I've addressed most (all?) the concerns and would
like to move on to a vote? (obviously tha code has not been reviewed yet,
but in terms of high-level approach and changes to public API the KIP is
ready)




On Sun, Oct 30, 2016 at 5:05 PM, radai <ra...@gmail.com> wrote:

> Hi Jun,
>
> the benchmarks just spawn 16 threads where each thread allocates a chunk
> of memory from the pool and immediately releases it. 16 was chosen because
> its typical for LinkedIn setups. the benchmarks never "consume" more than
> 16 * [single allocation size] and so do not test out-of-memory performance,
> but rather "normal" operating conditions. tests were run with 4 memory
> allocation sizes - 1k, 10k, 100k and 1M (1M being the largest typical
> single request size setting at LinkedIn). the results are in ops/sec (for
> context - a single request involves a single allocation/release cycle,
> typical LinkedIn setups dont go beyond 20k requests/sec on a single broker).
>
> results show that the GC pool (which is a combination of an AtomicLong
> outstanding bytes count + weak references for allocated buffers) has a
> negligible performance cost vs the simple benchmark (which does nothing,
> same as current code).
>
> the more interesting thing that the results show is that as the requested
> buffer size gets larger a single allocate/release cycle becomes more
> expensive. since the benchmark never hold a lot of outstanding memory (16 *
> buf size tops) i suspect the issue is memory fragmentation - its harder to
> find larger contiguous chunks of heap.
>
> this indicates that for throughput scenarios (large request batches)
> broker performance may actually be impacted by the overhead of allocating
> and releasing buffers (the situation may even be worse - inter-broker
> requests are much larger), and an implementation of memory pool that
> actually recycles buffers (mine just acts as a limiter and leak detector)
> might improve broker performance under high throughput conditions (but
> thats probably a separate followup change).
>
> I expect to stress test my code this week (though no guarantees).
>
> I'll look at KIP-81.
>
> On Sun, Oct 30, 2016 at 12:27 PM, Jun Rao <ju...@confluent.io> wrote:
>
>> Hi, Radai,
>>
>> Sorry for the late response. How should the benchmark results be
>> interpreted? The higher the ops/s, the better? It would also be useful to
>> test this out on LinkedIn's traffic with enough socket connections to see
>> if there is any performance degradation.
>>
>> Also, there is a separate proposal KIP-81 to bound the consumer memory
>> usage. Perhaps you can chime it there on whether this proposal can be
>> utilized there too.
>>
>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-81%3A+
>> Bound+Fetch+memory+usage+in+the+consumer
>>
>> Thanks,
>>
>> Jun
>>
>> On Tue, Sep 27, 2016 at 10:23 AM, radai <ra...@gmail.com>
>> wrote:
>>
>> > Hi Jun,
>> >
>> > 10 - mute/unmute functionality has been added in
>> > https://github.com/radai-rosenblatt/kafka/tree/broker-
>> > memory-pool-with-muting.
>> > I have yet to run stress tests to see how it behaves versus without
>> muting
>> >
>> > 11 - I've added a SimplePool implementation (nothing more than an
>> > AtomicLong really) and compared it with my GC pool (that uses weak
>> refs) -
>> > https://github.com/radai-rosenblatt/kafka-benchmarks/
>> > tree/master/memorypool-benchmarks.
>> > the results show no noticeable difference. what the results _do_ show
>> > though is that for large requests (1M) performance drops very sharply.
>> > since the SimplePool is essentially identical to current kafka code
>> > behaviour (the nechmark never reaches out of memory conditions) it would
>> > suggest to me that kafka performance for large request suffers greatly
>> from
>> > the cost of allocating (and releasing) large buffers (instead of
>> actually
>> > pooling them for later re-use). this means that an implementation of
>> memory
>> > pool that actually pools ( :-) ) is very likely to improve broker
>> > performance for large requests.
>> >
>> > 12 - if there was a single thread iterating over selection keys then
>> > stopping at 1st unsatisfiable request might work (if iteration order
>> over
>> > selection keys is deterministic, which is OS-dependent). however, kafka
>> > spawns multiple selectors sharing the same pool so i doubt the approach
>> > would gain anything. also notice that the current code already shuffles
>> the
>> > selection keys if memory is low (<10%) to try and guarantee fairness.
>> >
>> > attached the benchmark results for the pool implementations:
>> >
>> > Benchmark                                        Mode  Cnt
>> > Score        Error  Units
>> > GarbageCollectedMemoryPoolBenchmark.alloc_100k  thrpt    5
>> > 198272.519 ±  16045.965  ops/s
>> > GarbageCollectedMemoryPoolBenchmark.alloc_10k   thrpt    5
>> > 2781439.307 ± 185287.072  ops/s
>> > GarbageCollectedMemoryPoolBenchmark.alloc_1k    thrpt    5
>> > 6029199.952 ± 465936.118  ops/s
>> > GarbageCollectedMemoryPoolBenchmark.alloc_1m    thrpt    5
>> > 18464.272 ±    332.861  ops/s
>> > SimpleMemoryPoolBenchmark.alloc_100k            thrpt    5
>> > 204240.066 ±   2207.619  ops/s
>> > SimpleMemoryPoolBenchmark.alloc_10k             thrpt    5
>> > 3000794.525 ±  83510.836  ops/s
>> > SimpleMemoryPoolBenchmark.alloc_1k              thrpt    5
>> > 5893671.778 ± 274239.541  ops/s
>> > SimpleMemoryPoolBenchmark.alloc_1m              thrpt    5
>> > 18728.085 ±    792.563  ops/s
>> >
>> >
>> >
>> > On Sat, Sep 24, 2016 at 9:07 AM, Jun Rao <ju...@confluent.io> wrote:
>> >
>> > > Hi, Radi,
>> > >
>> > > For 10, yes, we don't want the buffer pool to wake up the selector
>> every
>> > > time some memory is freed up. We only want to do that when there is
>> > pending
>> > > requests to the buffer pool not honored due to not enough memory.
>> > >
>> > > For 11, we probably want to be a bit careful with Weak References. In
>> > > https://issues.apache.org/jira/browse/KAFKA-1989, we initially tried
>> an
>> > > implementation based on Weak Reference, but abandoned it due to too
>> much
>> > GC
>> > > overhead. It probably also makes the code a bit harder to understand.
>> So,
>> > > perhaps it would be better if we can avoid it.
>> > >
>> > > For 12, that's a good point. I thought the selector will do some
>> > shuffling
>> > > for fairness. Perhaps we should stop allocating from the buffer pool
>> when
>> > > we see the first key whose memory can't be honored?
>> > >
>> > > Thanks,
>> > >
>> > > Jun
>> > >
>> > >
>> > > On Sat, Sep 24, 2016 at 8:44 AM, radai <ra...@gmail.com>
>> > wrote:
>> > >
>> > > > Hi Jun,
>> > > >
>> > > > 10 - I'll add this functionality to the mute/unmute branch. as every
>> > > > mute/unmute operation is O(#connections / #selectorThreads) maybe a
>> > > > watermark approach is better than waking when _any_ mem is
>> available?
>> > > >
>> > > > 11 - "gc notifications" are done by using a ReferenceQueue (
>> > > > https://docs.oracle.com/javase/8/docs/api/java/lang/
>> > > > ref/ReferenceQueue.html)
>> > > > in combination with weak references to allocated buffers. when a
>> buffer
>> > > is
>> > > > reclaimed by the GC the corresponding weak ref to it is enqueued.
>> the
>> > > pool
>> > > > maintains a set of outstanding buffer IDs (every allocated buffer
>> gets
>> > a
>> > > > unique id - basically a sequence). a buffer explicitly returned has
>> its
>> > > id
>> > > > removed from the tracking set and the weak reference to it
>> destroyed,
>> > so
>> > > > its reference will never be enqueued by the GC even if it is GC'ed
>> > later.
>> > > > an enqueued reference (which indicates a buffer not returned to
>> pool)
>> > > also
>> > > > carries the buffer id, which is then removed from the outstanding
>> > buffers
>> > > > set and the memory marked as available (and a warning printed). the
>> > pool
>> > > > has a background thread dedicated to reading references out of the
>> > queue
>> > > > (which under normal conditions remains blocked forever).
>> > > >
>> > > > 12 - the issue here is that a single "large" request (say 1MB) can
>> get
>> > > > blocked indefinitely under a high pressure of much smaller requests
>> > (say
>> > > > 1KB) keeping memory utilization close to 100%. if we dont care about
>> > > > potential starvation the change is in a single condition. i'll make
>> > this
>> > > > configurable.
>> > > >
>> > > > 13 - I'll change the parameter name.
>> > > >
>> > > > On Fri, Sep 23, 2016 at 3:38 PM, Jun Rao <ju...@confluent.io> wrote:
>> > > >
>> > > > > Hi, Radai,
>> > > > >
>> > > > > Thanks for the updated KIP. A few more questions/comments below.
>> > > > >
>> > > > > 10. For "the mute/unmute happens just before poll(), which means
>> as a
>> > > > worst
>> > > > > case there will be no reads for 300ms if memory was unavailable",
>> I
>> > am
>> > > > > thinking that memory-pool could track if there is any pending
>> request
>> > > and
>> > > > > wake up the selector when memory is released and there is a
>> pending
>> > > > > request. This way, poll() doesn't have to wait for the timeout if
>> > > memory
>> > > > > frees up early.
>> > > > >
>> > > > > 11. For "to facilitate faster implementation (as a safety net) the
>> > pool
>> > > > > will be implemented in such a way that memory that was not
>> > release()ed
>> > > > (but
>> > > > > still garbage collected) would be detected and "reclaimed". this
>> is
>> > to
>> > > > > prevent "leaks" in case of code paths that fail to release()
>> > > properly.",
>> > > > > could you explain a bit at the high level how this is done?
>> > > > >
>> > > > > 12. For "As the pool would allow any size request if it has any
>> > > capacity
>> > > > > available, the actual memory bound is queued.max.bytes +
>> > > > > socket.request.max.bytes.", it seems intuitively, the pool should
>> > only
>> > > > give
>> > > > > the Buffer back if it has enough available bytes. Then the request
>> > > memory
>> > > > > can be bounded by queued.max.bytes. We can validate that
>> > > queued.max.bytes
>> > > > > is at least socket.request.max.bytes.
>> > > > >
>> > > > > 13. For the naming, it seems request.queue.max.bytes is clearer
>> than
>> > > > > queue.max.bytes.
>> > > > >
>> > > > > Jun
>> > > > >
>> > > > >
>> > > > >
>> > > > > On Thu, Sep 22, 2016 at 10:53 AM, radai <
>> radai.rosenblatt@gmail.com>
>> > > > > wrote:
>> > > > >
>> > > > > > As discussed in the KIP call, I have updated the kip-72 page (
>> > > > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-
>> > > > > > 72%3A+Allow+putting+a+bound+on+memory+consumed+by+
>> > Incoming+requests)
>> > > > > > to record both configuration validations and implementation
>> > concerns.
>> > > > > > I've also implemented channel muting/unmuting in response to
>> memory
>> > > > > > pressure. its available as a separate branch here -
>> > > > > > https://github.com/radai-rosenblatt/kafka/tree/broker-
>> > > > > > memory-pool-with-muting
>> > > > > > . the implementation without muting is here -
>> > > > > > https://github.com/radai-rosenblatt/kafka/tree/broker-memory
>> -pool.
>> > > > > >
>> > > > > > the mute/unmute happens just before poll(), which means as a
>> worst
>> > > case
>> > > > > > there will be no reads for 300ms if memory was unavailable
>> (thats
>> > the
>> > > > > > timeout untill the next poll). perhaps a design with dedicated
>> read
>> > > > > threads
>> > > > > > could do better (such a thread could actually block waiting for
>> > > > memory),
>> > > > > > but that would be a giant change.
>> > > > > >
>> > > > > > On Tue, Sep 13, 2016 at 9:20 AM, radai <
>> radai.rosenblatt@gmail.com
>> > >
>> > > > > wrote:
>> > > > > >
>> > > > > > > the specific memory pool implementation i wrote will allocate
>> > _any_
>> > > > > > amount
>> > > > > > > you request if it has _any_ memory available (so if it has 1
>> byte
>> > > > > > available
>> > > > > > > and you ask for 1MB you will get 1MB and the counter will go
>> > > > negative).
>> > > > > > > this was done to avoid issues with starvation of large
>> requests.
>> > > > other
>> > > > > > > implementations may be more strict. to me this means that
>> > generally
>> > > > its
>> > > > > > not
>> > > > > > > a simple "have memory" vs "no memory" split (which gets worse
>> > > under a
>> > > > > > > hypothetical tiered pool scheme for QoS).
>> > > > > > >
>> > > > > > > to allow this flexibility in pool implementation i must
>> preserve
>> > > the
>> > > > > > > amount of memory required. once read from the channel i cant
>> put
>> > it
>> > > > > back,
>> > > > > > > so i store it?
>> > > > > > >
>> > > > > > > On Tue, Sep 13, 2016 at 5:30 AM, Rajini Sivaram <
>> > > > > > > rajinisivaram@googlemail.com> wrote:
>> > > > > > >
>> > > > > > >> Is there any value in allowing the 4-byte size to be read
>> even
>> > > when
>> > > > > the
>> > > > > > >> request memory limit has been reached? If not, you can
>> disable
>> > > > OP_READ
>> > > > > > >> interest for all channels that are ready inside
>> Selector.poll()
>> > > when
>> > > > > > >> memory
>> > > > > > >> limit has been reached and re-enable before returning from
>> > poll().
>> > > > > > Perhaps
>> > > > > > >> a listener that is invoked when MemoryPool moves from
>> > unavailable
>> > > to
>> > > > > > >> available state can wakeup the selector. The changes for this
>> > > should
>> > > > > be
>> > > > > > >> fairly contained without any additional channel state. And it
>> > > would
>> > > > > > avoid
>> > > > > > >> the overhead of polls that return immediately even when
>> progress
>> > > > > cannot
>> > > > > > be
>> > > > > > >> made because memory limit has been reached.
>> > > > > > >>
>> > > > > > >> On Tue, Sep 13, 2016 at 12:31 AM, radai <
>> > > radai.rosenblatt@gmail.com
>> > > > >
>> > > > > > >> wrote:
>> > > > > > >>
>> > > > > > >> > Hi Jun,
>> > > > > > >> >
>> > > > > > >> > Yes, youre right - right now the next select() call will
>> > return
>> > > > > > >> immediately
>> > > > > > >> > with the same set of keys as earlier (at least) as they
>> were
>> > not
>> > > > > > >> previously
>> > > > > > >> > handled (no memory).
>> > > > > > >> > My assumption is that this happens under considerable load
>> -
>> > > > > something
>> > > > > > >> has
>> > > > > > >> > to be occupying all this memory. also, this happens in the
>> > > context
>> > > > > of
>> > > > > > >> > SocketServer.Processor.run():
>> > > > > > >> >
>> > > > > > >> > while (isRunning) {
>> > > > > > >> >    configureNewConnections()
>> > > > > > >> >    processNewResponses()
>> > > > > > >> >    poll()   <------ HERE
>> > > > > > >> >    processCompletedReceives()
>> > > > > > >> >    processCompletedSends()
>> > > > > > >> >    processDisconnected()
>> > > > > > >> > }
>> > > > > > >> >
>> > > > > > >> > even within poll(), things like finishConnection(),
>> prepare(),
>> > > and
>> > > > > > >> write()s
>> > > > > > >> > can still make progress under low memory conditions. and
>> given
>> > > the
>> > > > > > load,
>> > > > > > >> > there's probably progress to be made in
>> > > > processCompletedReceives(),
>> > > > > > >> > processCompletedSends() and processDisconnected().
>> > > > > > >> >
>> > > > > > >> > if there's progress to be made in other things its likely
>> that
>> > > the
>> > > > > > next
>> > > > > > >> > call to poll() will not happen immediately and so the loop
>> > wont
>> > > be
>> > > > > > that
>> > > > > > >> > tight. in order for this to devolve into true busy waiting
>> you
>> > > > would
>> > > > > > >> need a
>> > > > > > >> > situation where no progress can be made on any in-progress
>> > > > requests
>> > > > > > and
>> > > > > > >> no
>> > > > > > >> > responses to send out ?
>> > > > > > >> >
>> > > > > > >> > if my assumption does not hold then you are correct, and
>> > > > > > >> selector.poll(300)
>> > > > > > >> > currently hardcoded in SocketServer.Processor.poll() would
>> > need
>> > > to
>> > > > > be
>> > > > > > >> > replaced with something more complicated. my biggest point
>> of
>> > > > > concern
>> > > > > > >> > though is that the resulting code would be complicated and
>> > would
>> > > > > > couple
>> > > > > > >> > Selector to the memory pool very tightly. undey my current
>> > patch
>> > > > > > >> Selector
>> > > > > > >> > needs the memory pool only to pass to channels when they
>> are
>> > > > built.
>> > > > > > this
>> > > > > > >> > would allow different memory pools relatively easily for
>> > things
>> > > > like
>> > > > > > >> > reserving memory for cross-broker replication and high-SLA
>> > > > > > connections.
>> > > > > > >> a
>> > > > > > >> > tighter coupling would make any such future modification
>> hard.
>> > > > > > >> >
>> > > > > > >> > On Sun, Sep 11, 2016 at 10:37 AM, Jun Rao <
>> jun@confluent.io>
>> > > > wrote:
>> > > > > > >> >
>> > > > > > >> > > Hi, Radai,
>> > > > > > >> > >
>> > > > > > >> > > Thanks for the reply. I still have a followup question on
>> > #2.
>> > > > > > >> > >
>> > > > > > >> > > My understanding is that in your proposal, selector will
>> now
>> > > > first
>> > > > > > >> read
>> > > > > > >> > the
>> > > > > > >> > > size of the Receive. If there is not enough memory, it
>> has
>> > to
>> > > > turn
>> > > > > > off
>> > > > > > >> > the
>> > > > > > >> > > READ interest bit for the corresponding KafkaChannel.
>> > > Otherwise,
>> > > > > > >> > subsequent
>> > > > > > >> > > selector.poll() call will always return immediately,
>> adding
>> > > > > > >> unnecessary
>> > > > > > >> > > overhead. If you do that, the  Selector will need to know
>> > when
>> > > > to
>> > > > > > >> turn on
>> > > > > > >> > > the READ interest bit again. It may not be enough to do
>> this
>> > > > check
>> > > > > > >> until
>> > > > > > >> > > the next poll call since the timeout used by poll()
>> could be
>> > > > > > >> arbitrarily
>> > > > > > >> > > large. So, it seems that some kind of coordination
>> between
>> > the
>> > > > > > >> Selector
>> > > > > > >> > and
>> > > > > > >> > > the bufferpool is needed?
>> > > > > > >> > >
>> > > > > > >> > > Jun
>> > > > > > >> > >
>> > > > > > >> > > On Thu, Sep 8, 2016 at 7:02 PM, radai <
>> > > > radai.rosenblatt@gmail.com
>> > > > > >
>> > > > > > >> > wrote:
>> > > > > > >> > >
>> > > > > > >> > > > Hi Jun,
>> > > > > > >> > > >
>> > > > > > >> > > > 1. yes, it is my own personal opinion that people use
>> > > > > > >> > queued.max.requests
>> > > > > > >> > > > as an indirect way to bound memory consumption. once a
>> > more
>> > > > > direct
>> > > > > > >> > memory
>> > > > > > >> > > > bound mechanism exists (and works) i dont think
>> > > > > > queued.max.requests
>> > > > > > >> > woul
>> > > > > > >> > > > dbe required. having said that I was not planning on
>> > making
>> > > > any
>> > > > > > >> changes
>> > > > > > >> > > > w.r.t queued.max.requests support (so I was aiming to
>> get
>> > > to a
>> > > > > > >> > situation
>> > > > > > >> > > > where both configs are supported) to allow gathering
>> > enough
>> > > > > > >> > > data/feedback.
>> > > > > > >> > > >
>> > > > > > >> > > > 2. Selector.poll() calls into KafkaChannel.read() to
>> maybe
>> > > > get a
>> > > > > > >> > > > NetworkReceive. multiple such read() calls may be
>> required
>> > > > > until a
>> > > > > > >> > > Receive
>> > > > > > >> > > > is produced already in the current code base. my pool
>> > > > > > >> implementation is
>> > > > > > >> > > > non-blocking so if there's no memory available the
>> read()
>> > > call
>> > > > > > will
>> > > > > > >> > > return
>> > > > > > >> > > > null. poll() would then move on to try and service
>> other
>> > > > > selection
>> > > > > > >> > keys.
>> > > > > > >> > > > the pool will be checked for available memory again the
>> > next
>> > > > > time
>> > > > > > >> the
>> > > > > > >> > > > SocketServer.run() loop gets to poll(). and so right
>> now I
>> > > > dont
>> > > > > > >> > > communicate
>> > > > > > >> > > > memory becoming available to the selector - it will
>> just
>> > go
>> > > on
>> > > > > to
>> > > > > > >> try
>> > > > > > >> > and
>> > > > > > >> > > > make progress elsewhere and come back again. i never
>> block
>> > > it
>> > > > or
>> > > > > > >> send
>> > > > > > >> > it
>> > > > > > >> > > to
>> > > > > > >> > > > sleep. I think for efficiency what could maybe be done
>> is
>> > if
>> > > > > > there's
>> > > > > > >> > not
>> > > > > > >> > > > enough memory to service a readable selection key we
>> may
>> > > want
>> > > > to
>> > > > > > >> skip
>> > > > > > >> > all
>> > > > > > >> > > > other read-ready selection keys for that iteration of
>> > > > > > >> > > pollSelectionKeys().
>> > > > > > >> > > > that would require rather invasive changes around
>> > > > > > >> > > > Selector.pollSelectionKeys() that I'd rather avoid.
>> also
>> > > > > different
>> > > > > > >> > > > KafkaChannels may be backed by different memory pool
>> > (under
>> > > > some
>> > > > > > >> sort
>> > > > > > >> > of
>> > > > > > >> > > > future QoS scheme?), which would complicate such an
>> > > > optimization
>> > > > > > >> > further.
>> > > > > > >> > > >
>> > > > > > >> > > > 3. i added the pool interface and implementation under
>> > > > > > >> > > kafka.common.memory,
>> > > > > > >> > > > and the API is "thin" enough to be generally useful
>> > > (currently
>> > > > > its
>> > > > > > >> > > > non-blocking only, but a get(long maxWait) is
>> definitely
>> > > > > doable).
>> > > > > > >> > having
>> > > > > > >> > > > said that, I'm not really familiar enough with the
>> code to
>> > > > > say....
>> > > > > > >> > > >
>> > > > > > >> > > >
>> > > > > > >> > > >
>> > > > > > >> > > > On Fri, Sep 2, 2016 at 2:04 PM, Jun Rao <
>> jun@confluent.io
>> > >
>> > > > > wrote:
>> > > > > > >> > > >
>> > > > > > >> > > > > Hi, Radi,
>> > > > > > >> > > > >
>> > > > > > >> > > > > Thanks for the update. At the high level, this looks
>> > > > > promising.
>> > > > > > A
>> > > > > > >> few
>> > > > > > >> > > > > comments below.
>> > > > > > >> > > > >
>> > > > > > >> > > > > 1. If we can bound the requests by bytes, it seems
>> that
>> > we
>> > > > > don't
>> > > > > > >> need
>> > > > > > >> > > > > queued.max.requests
>> > > > > > >> > > > > any more? Could we just deprecate the config and make
>> > the
>> > > > > queue
>> > > > > > >> size
>> > > > > > >> > > > > unbounded?
>> > > > > > >> > > > > 2. How do we communicate back to the selector when
>> some
>> > > > memory
>> > > > > > is
>> > > > > > >> > freed
>> > > > > > >> > > > up?
>> > > > > > >> > > > > We probably need to wake up the selector. For
>> > efficiency,
>> > > > > > perhaps
>> > > > > > >> we
>> > > > > > >> > > only
>> > > > > > >> > > > > need to wake up the selector if the bufferpool is
>> full?
>> > > > > > >> > > > > 3. We talked about bounding the consumer's memory
>> > before.
>> > > To
>> > > > > > fully
>> > > > > > >> > > > support
>> > > > > > >> > > > > that, we will need to bound the memory used by
>> different
>> > > > fetch
>> > > > > > >> > > responses
>> > > > > > >> > > > in
>> > > > > > >> > > > > the consumer. Do you think the changes that you
>> propose
>> > > here
>> > > > > can
>> > > > > > >> be
>> > > > > > >> > > > > leveraged to bound the memory in the consumer as
>> well?
>> > > > > > >> > > > >
>> > > > > > >> > > > > Jun
>> > > > > > >> > > > >
>> > > > > > >> > > > >
>> > > > > > >> > > > > On Tue, Aug 30, 2016 at 10:41 AM, radai <
>> > > > > > >> radai.rosenblatt@gmail.com>
>> > > > > > >> > > > > wrote:
>> > > > > > >> > > > >
>> > > > > > >> > > > > > My apologies for the delay in response.
>> > > > > > >> > > > > >
>> > > > > > >> > > > > > I agree with the concerns about OOM reading from
>> the
>> > > > actual
>> > > > > > >> sockets
>> > > > > > >> > > and
>> > > > > > >> > > > > > blocking the network threads - messing with the
>> > request
>> > > > > queue
>> > > > > > >> > itself
>> > > > > > >> > > > > would
>> > > > > > >> > > > > > not do.
>> > > > > > >> > > > > >
>> > > > > > >> > > > > > I propose instead a memory pool approach - the
>> broker
>> > > > would
>> > > > > > >> have a
>> > > > > > >> > > non
>> > > > > > >> > > > > > blocking memory pool. upon reading the first 4
>> bytes
>> > out
>> > > > of
>> > > > > a
>> > > > > > >> > socket
>> > > > > > >> > > an
>> > > > > > >> > > > > > attempt would be made to acquire enough memory and
>> if
>> > > that
>> > > > > > >> attempt
>> > > > > > >> > > > fails
>> > > > > > >> > > > > > the processing thread will move on to try and make
>> > > > progress
>> > > > > > with
>> > > > > > >> > > other
>> > > > > > >> > > > > > tasks.
>> > > > > > >> > > > > >
>> > > > > > >> > > > > > I think Its simpler than mute/unmute because using
>> > > > > mute/unmute
>> > > > > > >> > would
>> > > > > > >> > > > > > require differentiating between sockets muted due
>> to a
>> > > > > request
>> > > > > > >> in
>> > > > > > >> > > > > progress
>> > > > > > >> > > > > > (normal current operation) and sockets muted due to
>> > lack
>> > > > of
>> > > > > > >> memory.
>> > > > > > >> > > > > sockets
>> > > > > > >> > > > > > of the 1st kind would be unmuted at the end of
>> request
>> > > > > > >> processing
>> > > > > > >> > (as
>> > > > > > >> > > > it
>> > > > > > >> > > > > > happens right now) but the 2nd kind would require
>> some
>> > > > sort
>> > > > > of
>> > > > > > >> > > "unmute
>> > > > > > >> > > > > > watchdog" which is (i claim) more complicated than
>> a
>> > > > memory
>> > > > > > >> pool.
>> > > > > > >> > > also
>> > > > > > >> > > > a
>> > > > > > >> > > > > > memory pool is a more generic solution.
>> > > > > > >> > > > > >
>> > > > > > >> > > > > > I've updated the KIP page (
>> > > > > > >> > > > > > https://cwiki.apache.org/
>> > confluence/display/KAFKA/KIP-
>> > > > > > >> > > > > > 72%3A+Allow+putting+a+bound+on+memory+consumed+by+
>> > > > > > >> > Incoming+requests)
>> > > > > > >> > > > > > to reflect the new proposed implementation, and
>> i've
>> > > also
>> > > > > put
>> > > > > > >> up an
>> > > > > > >> > > > > inital
>> > > > > > >> > > > > > implementation proposal on github -
>> > > > > > >> > > > > > https://github.com/radai-rosenblatt/kafka/commits/
>> > > > > > >> > broker-memory-pool
>> > > > > > >> > > .
>> > > > > > >> > > > > the
>> > > > > > >> > > > > > proposed code is not complete and tested yet (so
>> > > probably
>> > > > > > buggy)
>> > > > > > >> > but
>> > > > > > >> > > > does
>> > > > > > >> > > > > > include the main points of modification.
>> > > > > > >> > > > > >
>> > > > > > >> > > > > > the specific implementation of the pool on that
>> branch
>> > > > also
>> > > > > > has
>> > > > > > >> a
>> > > > > > >> > > built
>> > > > > > >> > > > > in
>> > > > > > >> > > > > > safety net where memory that is acquired but not
>> > > released
>> > > > > > (which
>> > > > > > >> > is a
>> > > > > > >> > > > > bug)
>> > > > > > >> > > > > > is discovered when the garbage collector frees it
>> and
>> > > the
>> > > > > > >> capacity
>> > > > > > >> > is
>> > > > > > >> > > > > > reclaimed.
>> > > > > > >> > > > > >
>> > > > > > >> > > > > > On Tue, Aug 9, 2016 at 8:14 AM, Jun Rao <
>> > > jun@confluent.io
>> > > > >
>> > > > > > >> wrote:
>> > > > > > >> > > > > >
>> > > > > > >> > > > > > > Radi,
>> > > > > > >> > > > > > >
>> > > > > > >> > > > > > > Yes, I got the benefit of bounding the request
>> queue
>> > > by
>> > > > > > >> bytes. My
>> > > > > > >> > > > > concern
>> > > > > > >> > > > > > > is the following if we don't change the behavior
>> of
>> > > > > > processor
>> > > > > > >> > > > blocking
>> > > > > > >> > > > > on
>> > > > > > >> > > > > > > queue full.
>> > > > > > >> > > > > > >
>> > > > > > >> > > > > > > If the broker truly doesn't have enough memory
>> for
>> > > > > buffering
>> > > > > > >> > > > > outstanding
>> > > > > > >> > > > > > > requests from all connections, we have to either
>> hit
>> > > OOM
>> > > > > or
>> > > > > > >> block
>> > > > > > >> > > the
>> > > > > > >> > > > > > > processor. Both will be bad. I am not sure if
>> one is
>> > > > > clearly
>> > > > > > >> > better
>> > > > > > >> > > > > than
>> > > > > > >> > > > > > > the other. In this case, the solution is
>> probably to
>> > > > > expand
>> > > > > > >> the
>> > > > > > >> > > > cluster
>> > > > > > >> > > > > > to
>> > > > > > >> > > > > > > reduce the per broker request load.
>> > > > > > >> > > > > > >
>> > > > > > >> > > > > > > If the broker actually has enough memory, we
>> want to
>> > > be
>> > > > > able
>> > > > > > >> to
>> > > > > > >> > > > > configure
>> > > > > > >> > > > > > > the request queue in such a way that it never
>> > blocks.
>> > > > You
>> > > > > > can
>> > > > > > >> > tell
>> > > > > > >> > > > > people
>> > > > > > >> > > > > > > to just set the request queue to be unbounded,
>> which
>> > > may
>> > > > > > scare
>> > > > > > >> > > them.
>> > > > > > >> > > > If
>> > > > > > >> > > > > > we
>> > > > > > >> > > > > > > do want to put a bound, it seems it's easier to
>> > > > configure
>> > > > > > the
>> > > > > > >> > queue
>> > > > > > >> > > > > size
>> > > > > > >> > > > > > > based on # requests. Basically, we can tell
>> people
>> > to
>> > > > set
>> > > > > > the
>> > > > > > >> > queue
>> > > > > > >> > > > > size
>> > > > > > >> > > > > > > based on number of connections. If the queue is
>> > based
>> > > on
>> > > > > > >> bytes,
>> > > > > > >> > > it's
>> > > > > > >> > > > > not
>> > > > > > >> > > > > > > clear how people should set it w/o causing the
>> > > processor
>> > > > > to
>> > > > > > >> > block.
>> > > > > > >> > > > > > >
>> > > > > > >> > > > > > > Finally, Rajini has a good point. The ByteBuffer
>> in
>> > > the
>> > > > > > >> request
>> > > > > > >> > > > object
>> > > > > > >> > > > > is
>> > > > > > >> > > > > > > allocated as soon as we see the first 4 bytes
>> from
>> > the
>> > > > > > socket.
>> > > > > > >> > So,
>> > > > > > >> > > I
>> > > > > > >> > > > am
>> > > > > > >> > > > > > not
>> > > > > > >> > > > > > > sure if just bounding the request queue itself is
>> > > enough
>> > > > > to
>> > > > > > >> bound
>> > > > > > >> > > the
>> > > > > > >> > > > > > > memory related to requests.
>> > > > > > >> > > > > > >
>> > > > > > >> > > > > > > Thanks,
>> > > > > > >> > > > > > >
>> > > > > > >> > > > > > > Jun
>> > > > > > >> > > > > > >
>> > > > > > >> > > > > > >
>> > > > > > >> > > > > > >
>> > > > > > >> > > > > > > On Mon, Aug 8, 2016 at 4:46 PM, radai <
>> > > > > > >> > radai.rosenblatt@gmail.com>
>> > > > > > >> > > > > > wrote:
>> > > > > > >> > > > > > >
>> > > > > > >> > > > > > > > I agree that filling up the request queue can
>> > cause
>> > > > > > clients
>> > > > > > >> to
>> > > > > > >> > > time
>> > > > > > >> > > > > out
>> > > > > > >> > > > > > > > (and presumably retry?). However, for the
>> > workloads
>> > > > > where
>> > > > > > we
>> > > > > > >> > > expect
>> > > > > > >> > > > > > this
>> > > > > > >> > > > > > > > configuration to be useful the alternative is
>> > > > currently
>> > > > > an
>> > > > > > >> OOM
>> > > > > > >> > > > crash.
>> > > > > > >> > > > > > > > In my opinion an initial implementation of this
>> > > > feature
>> > > > > > >> could
>> > > > > > >> > be
>> > > > > > >> > > > > > > > constrained to a simple drop-in replacement of
>> > > > > > >> > ArrayBlockingQueue
>> > > > > > >> > > > > > > > (conditional, opt-in) and further study of
>> > behavior
>> > > > > > patterns
>> > > > > > >> > > under
>> > > > > > >> > > > > load
>> > > > > > >> > > > > > > can
>> > > > > > >> > > > > > > > drive future changes to the API later when
>> those
>> > > > > behaviors
>> > > > > > >> are
>> > > > > > >> > > > better
>> > > > > > >> > > > > > > > understood (like back-pressure, nop filler
>> > responses
>> > > > to
>> > > > > > >> avoid
>> > > > > > >> > > > client
>> > > > > > >> > > > > > > > timeouts or whatever).
>> > > > > > >> > > > > > > >
>> > > > > > >> > > > > > > > On Mon, Aug 8, 2016 at 2:23 PM, Mayuresh
>> Gharat <
>> > > > > > >> > > > > > > > gharatmayuresh15@gmail.com>
>> > > > > > >> > > > > > > > wrote:
>> > > > > > >> > > > > > > >
>> > > > > > >> > > > > > > > > Nice write up Radai.
>> > > > > > >> > > > > > > > > I think what Jun said is a valid concern.
>> > > > > > >> > > > > > > > > If I am not wrong as per the proposal, we are
>> > > > > depending
>> > > > > > on
>> > > > > > >> > the
>> > > > > > >> > > > > entire
>> > > > > > >> > > > > > > > > pipeline to flow smoothly from accepting
>> > requests
>> > > to
>> > > > > > >> handling
>> > > > > > >> > > it,
>> > > > > > >> > > > > > > calling
>> > > > > > >> > > > > > > > > KafkaApis and handing back the responses.
>> > > > > > >> > > > > > > > >
>> > > > > > >> > > > > > > > > Thanks,
>> > > > > > >> > > > > > > > >
>> > > > > > >> > > > > > > > > Mayuresh
>> > > > > > >> > > > > > > > >
>> > > > > > >> > > > > > > > >
>> > > > > > >> > > > > > > > > On Mon, Aug 8, 2016 at 12:22 PM, Joel Koshy <
>> > > > > > >> > > jjkoshy.w@gmail.com
>> > > > > > >> > > > >
>> > > > > > >> > > > > > > wrote:
>> > > > > > >> > > > > > > > >
>> > > > > > >> > > > > > > > > > >
>> > > > > > >> > > > > > > > > > > .
>> > > > > > >> > > > > > > > > > >>
>> > > > > > >> > > > > > > > > > >>
>> > > > > > >> > > > > > > > > > > Hi Becket,
>> > > > > > >> > > > > > > > > > >
>> > > > > > >> > > > > > > > > > > I don't think progress can be made in the
>> > > > > > processor's
>> > > > > > >> run
>> > > > > > >> > > > loop
>> > > > > > >> > > > > if
>> > > > > > >> > > > > > > the
>> > > > > > >> > > > > > > > > > > queue fills up. i.e., I think Jun's
>> point is
>> > > > that
>> > > > > if
>> > > > > > >> the
>> > > > > > >> > > > queue
>> > > > > > >> > > > > is
>> > > > > > >> > > > > > > > full
>> > > > > > >> > > > > > > > > > > (either due to the proposed max.bytes or
>> > today
>> > > > due
>> > > > > > to
>> > > > > > >> > > > > > max.requests
>> > > > > > >> > > > > > > > > > hitting
>> > > > > > >> > > > > > > > > > > the limit) then processCompletedReceives
>> > will
>> > > > > block
>> > > > > > >> and
>> > > > > > >> > no
>> > > > > > >> > > > > > further
>> > > > > > >> > > > > > > > > > progress
>> > > > > > >> > > > > > > > > > > can be made.
>> > > > > > >> > > > > > > > > > >
>> > > > > > >> > > > > > > > > >
>> > > > > > >> > > > > > > > > > I'm sorry - this isn't right. There will be
>> > > > progress
>> > > > > > as
>> > > > > > >> > long
>> > > > > > >> > > as
>> > > > > > >> > > > > the
>> > > > > > >> > > > > > > API
>> > > > > > >> > > > > > > > > > handlers are able to pick requests off the
>> > > request
>> > > > > > queue
>> > > > > > >> > and
>> > > > > > >> > > > add
>> > > > > > >> > > > > > the
>> > > > > > >> > > > > > > > > > responses to the response queues (which are
>> > > > > > effectively
>> > > > > > >> > > > > unbounded).
>> > > > > > >> > > > > > > > > > However, the point is valid that blocking
>> in
>> > the
>> > > > > > request
>> > > > > > >> > > > > channel's
>> > > > > > >> > > > > > > put
>> > > > > > >> > > > > > > > > has
>> > > > > > >> > > > > > > > > > the effect of exacerbating the pressure on
>> the
>> > > > > socket
>> > > > > > >> > server.
>> > > > > > >> > > > > > > > > >
>> > > > > > >> > > > > > > > > >
>> > > > > > >> > > > > > > > > > >
>> > > > > > >> > > > > > > > > > >>
>> > > > > > >> > > > > > > > > > >> On Mon, Aug 8, 2016 at 10:04 AM, Jun
>> Rao <
>> > > > > > >> > > jun@confluent.io>
>> > > > > > >> > > > > > > wrote:
>> > > > > > >> > > > > > > > > > >>
>> > > > > > >> > > > > > > > > > >> > Radai,
>> > > > > > >> > > > > > > > > > >> >
>> > > > > > >> > > > > > > > > > >> > Thanks for the proposal. A couple of
>> > > comments
>> > > > > on
>> > > > > > >> this.
>> > > > > > >> > > > > > > > > > >> >
>> > > > > > >> > > > > > > > > > >> > 1. Since we store request objects in
>> the
>> > > > > request
>> > > > > > >> > queue,
>> > > > > > >> > > > how
>> > > > > > >> > > > > do
>> > > > > > >> > > > > > > we
>> > > > > > >> > > > > > > > > get
>> > > > > > >> > > > > > > > > > an
>> > > > > > >> > > > > > > > > > >> > accurate size estimate for those
>> > requests?
>> > > > > > >> > > > > > > > > > >> >
>> > > > > > >> > > > > > > > > > >> > 2. Currently, it's bad if the
>> processor
>> > > > blocks
>> > > > > on
>> > > > > > >> > > adding a
>> > > > > > >> > > > > > > request
>> > > > > > >> > > > > > > > > to
>> > > > > > >> > > > > > > > > > >> the
>> > > > > > >> > > > > > > > > > >> > request queue. Once blocked, the
>> > processor
>> > > > > can't
>> > > > > > >> > process
>> > > > > > >> > > > the
>> > > > > > >> > > > > > > > sending
>> > > > > > >> > > > > > > > > > of
>> > > > > > >> > > > > > > > > > >> > responses of other socket keys either.
>> > This
>> > > > > will
>> > > > > > >> cause
>> > > > > > >> > > all
>> > > > > > >> > > > > > > clients
>> > > > > > >> > > > > > > > > in
>> > > > > > >> > > > > > > > > > >> this
>> > > > > > >> > > > > > > > > > >> > processor with an outstanding request
>> to
>> > > > > > eventually
>> > > > > > >> > > > timeout.
>> > > > > > >> > > > > > > > > > Typically,
>> > > > > > >> > > > > > > > > > >> > this will trigger client-side retries,
>> > > which
>> > > > > will
>> > > > > > >> add
>> > > > > > >> > > more
>> > > > > > >> > > > > > load
>> > > > > > >> > > > > > > on
>> > > > > > >> > > > > > > > > the
>> > > > > > >> > > > > > > > > > >> > broker and cause potentially more
>> > > congestion
>> > > > in
>> > > > > > the
>> > > > > > >> > > > request
>> > > > > > >> > > > > > > queue.
>> > > > > > >> > > > > > > > > > With
>> > > > > > >> > > > > > > > > > >> > queued.max.requests, to prevent
>> blocking
>> > on
>> > > > the
>> > > > > > >> > request
>> > > > > > >> > > > > queue,
>> > > > > > >> > > > > > > our
>> > > > > > >> > > > > > > > > > >> > recommendation is to configure
>> > > > > > queued.max.requests
>> > > > > > >> to
>> > > > > > >> > be
>> > > > > > >> > > > the
>> > > > > > >> > > > > > > same
>> > > > > > >> > > > > > > > as
>> > > > > > >> > > > > > > > > > the
>> > > > > > >> > > > > > > > > > >> > number of socket connections on the
>> > broker.
>> > > > > Since
>> > > > > > >> the
>> > > > > > >> > > > broker
>> > > > > > >> > > > > > > never
>> > > > > > >> > > > > > > > > > >> > processes more than 1 request per
>> > > connection
>> > > > > at a
>> > > > > > >> > time,
>> > > > > > >> > > > the
>> > > > > > >> > > > > > > > request
>> > > > > > >> > > > > > > > > > >> queue
>> > > > > > >> > > > > > > > > > >> > will never be blocked. With
>> > > queued.max.bytes,
>> > > > > > it's
>> > > > > > >> > going
>> > > > > > >> > > > to
>> > > > > > >> > > > > be
>> > > > > > >> > > > > > > > > harder
>> > > > > > >> > > > > > > > > > to
>> > > > > > >> > > > > > > > > > >> > configure the value properly to
>> prevent
>> > > > > blocking.
>> > > > > > >> > > > > > > > > > >> >
>> > > > > > >> > > > > > > > > > >> > So, while adding queued.max.bytes is
>> > > > > potentially
>> > > > > > >> > useful
>> > > > > > >> > > > for
>> > > > > > >> > > > > > > memory
>> > > > > > >> > > > > > > > > > >> > management, for it to be truly
>> useful, we
>> > > > > > probably
>> > > > > > >> > need
>> > > > > > >> > > to
>> > > > > > >> > > > > > > address
>> > > > > > >> > > > > > > > > the
>> > > > > > >> > > > > > > > > > >> > processor blocking issue for it to be
>> > > really
>> > > > > > >> useful in
>> > > > > > >> > > > > > practice.
>> > > > > > >> > > > > > > > One
>> > > > > > >> > > > > > > > > > >> > possibility is to put back-pressure to
>> > the
>> > > > > client
>> > > > > > >> when
>> > > > > > >> > > the
>> > > > > > >> > > > > > > request
>> > > > > > >> > > > > > > > > > >> queue is
>> > > > > > >> > > > > > > > > > >> > blocked. For example, if the processor
>> > > > notices
>> > > > > > that
>> > > > > > >> > the
>> > > > > > >> > > > > > request
>> > > > > > >> > > > > > > > > queue
>> > > > > > >> > > > > > > > > > is
>> > > > > > >> > > > > > > > > > >> > full, it can turn off the interest bit
>> > for
>> > > > read
>> > > > > > for
>> > > > > > >> > all
>> > > > > > >> > > > > socket
>> > > > > > >> > > > > > > > keys.
>> > > > > > >> > > > > > > > > > >> This
>> > > > > > >> > > > > > > > > > >> > will allow the processor to continue
>> > > handling
>> > > > > > >> > responses.
>> > > > > > >> > > > > When
>> > > > > > >> > > > > > > the
>> > > > > > >> > > > > > > > > > >> request
>> > > > > > >> > > > > > > > > > >> > queue has space again, it can indicate
>> > the
>> > > > new
>> > > > > > >> state
>> > > > > > >> > to
>> > > > > > >> > > > the
>> > > > > > >> > > > > > > > process
>> > > > > > >> > > > > > > > > > and
>> > > > > > >> > > > > > > > > > >> > wake up the selector. Not sure how
>> this
>> > > will
>> > > > > work
>> > > > > > >> with
>> > > > > > >> > > > > > multiple
>> > > > > > >> > > > > > > > > > >> processors
>> > > > > > >> > > > > > > > > > >> > though since the request queue is
>> shared
>> > > > across
>> > > > > > all
>> > > > > > >> > > > > > processors.
>> > > > > > >> > > > > > > > > > >> >
>> > > > > > >> > > > > > > > > > >> > Thanks,
>> > > > > > >> > > > > > > > > > >> >
>> > > > > > >> > > > > > > > > > >> > Jun
>> > > > > > >> > > > > > > > > > >> >
>> > > > > > >> > > > > > > > > > >> >
>> > > > > > >> > > > > > > > > > >> >
>> > > > > > >> > > > > > > > > > >> > On Thu, Aug 4, 2016 at 11:28 AM,
>> radai <
>> > > > > > >> > > > > > > > radai.rosenblatt@gmail.com>
>> > > > > > >> > > > > > > > > > >> wrote:
>> > > > > > >> > > > > > > > > > >> >
>> > > > > > >> > > > > > > > > > >> > > Hello,
>> > > > > > >> > > > > > > > > > >> > >
>> > > > > > >> > > > > > > > > > >> > > I'd like to initiate a discussion
>> about
>> > > > > > >> > > > > > > > > > >> > > https://cwiki.apache.org/
>> > > > > > >> > > confluence/display/KAFKA/KIP-
>> > > > > > >> > > > > > > > > > >> > > 72%3A+Allow+Sizing+Incoming+Re
>> > > > > > >> quest+Queue+in+Bytes
>> > > > > > >> > > > > > > > > > >> > >
>> > > > > > >> > > > > > > > > > >> > > The goal of the KIP is to allow
>> > > > configuring a
>> > > > > > >> bound
>> > > > > > >> > on
>> > > > > > >> > > > the
>> > > > > > >> > > > > > > > > capacity
>> > > > > > >> > > > > > > > > > >> (as
>> > > > > > >> > > > > > > > > > >> > in
>> > > > > > >> > > > > > > > > > >> > > bytes of memory used) of the
>> incoming
>> > > > request
>> > > > > > >> queue,
>> > > > > > >> > > in
>> > > > > > >> > > > > > > addition
>> > > > > > >> > > > > > > > > to
>> > > > > > >> > > > > > > > > > >> the
>> > > > > > >> > > > > > > > > > >> > > current bound on the number of
>> > messages.
>> > > > > > >> > > > > > > > > > >> > >
>> > > > > > >> > > > > > > > > > >> > > This comes after several incidents
>> at
>> > > > > Linkedin
>> > > > > > >> > where a
>> > > > > > >> > > > > > sudden
>> > > > > > >> > > > > > > > > > "spike"
>> > > > > > >> > > > > > > > > > >> of
>> > > > > > >> > > > > > > > > > >> > > large message batches caused an out
>> of
>> > > > memory
>> > > > > > >> > > exception.
>> > > > > > >> > > > > > > > > > >> > >
>> > > > > > >> > > > > > > > > > >> > > Thank you,
>> > > > > > >> > > > > > > > > > >> > >
>> > > > > > >> > > > > > > > > > >> > >    Radai
>> > > > > > >> > > > > > > > > > >> > >
>> > > > > > >> > > > > > > > > > >> >
>> > > > > > >> > > > > > > > > > >>
>> > > > > > >> > > > > > > > > > >
>> > > > > > >> > > > > > > > > > >
>> > > > > > >> > > > > > > > > >
>> > > > > > >> > > > > > > > >
>> > > > > > >> > > > > > > > >
>> > > > > > >> > > > > > > > >
>> > > > > > >> > > > > > > > > --
>> > > > > > >> > > > > > > > > -Regards,
>> > > > > > >> > > > > > > > > Mayuresh R. Gharat
>> > > > > > >> > > > > > > > > (862) 250-7125
>> > > > > > >> > > > > > > > >
>> > > > > > >> > > > > > > >
>> > > > > > >> > > > > > >
>> > > > > > >> > > > > >
>> > > > > > >> > > > >
>> > > > > > >> > > >
>> > > > > > >> > >
>> > > > > > >> >
>> > > > > > >>
>> > > > > > >>
>> > > > > > >>
>> > > > > > >> --
>> > > > > > >> Regards,
>> > > > > > >>
>> > > > > > >> Rajini
>> > > > > > >>
>> > > > > > >
>> > > > > > >
>> > > > > >
>> > > > >
>> > > >
>> > >
>> >
>>
>
>