You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by Mahendra Kariya <ma...@go-jek.com> on 2017/03/13 00:37:22 UTC

Capacity planning for Kafka Streams

Hey All,

Are there some guidelines / documentation around capacity planning for
Kafka streams?

We have a Streams application which consumes messages from a topic with 400
partitions. At peak time, there are around 20K messages coming into that
topic per second. The Streams app consumes these messages to do some
aggregations.

Currently we have 3 VMs, each of them running 10 threads. The lag keeps
fluctuating between a few tens to a few lakhs. We have also noticed that
lag on the partitions being consumed on one particular machine is way
higher than the other two machines.

Has anybody faced similar issues? How did you guys resolve it?

Re: Capacity planning for Kafka Streams

Posted by Mahendra Kariya <ma...@go-jek.com>.
Hi Damian,

The rest of the logs were INFO messages about offset being committed.

Anyways, the problem is resolved for now, after we increased the
max.poll.interval.ms.

For anyone else who is facing similar problem, please refer this thread.
https://groups.google.com/forum/#!topic/confluent-platform/wgCSuwIJo5g



On Wed, Mar 22, 2017 at 7:11 PM, Damian Guy <da...@gmail.com> wrote:

> Hi Mahendra,
>
> Are you able to share the complete logs? It is pretty hard to tell what is
> happening just from a few snippets of information.
>
> Thanks,
> Damian
>
> On Wed, 22 Mar 2017 at 12:16 Mahendra Kariya <ma...@go-jek.com>
> wrote:
>
> > To test Kafka streams on 0.10.2.0, we setup a new Kafka cluster with the
> > latest version and used mirror maker to replicate the data from the
> > 0.10.0.0 Kafka cluster. We pointed our streaming app to the newly created
> > Kafka cluster.
> >
> > We have 5 nodes, each running the streaming app with 10 threads. In less
> > than 10 minutes, the process on all the 5 nodes died with different
> > exceptions. Below are the different stack traces we got.
> >
> > Any help would be really appreciated.
> >
> > *Stacktrace # 1 (got on 3 of 5 nodes):*
> >
> > 18:58:00.349 [StreamThread-2] INFO o.a.k.s.p.i.StreamThread -
> stream-thread
> > [StreamThread-2] Stream thread shutdown complete
> > 18:58:00.349 [StreamThread-2] WARN o.a.k.s.p.i.StreamThread - Unexpected
> > state transition from RUNNING to NOT_RUNNING
> > Exception in thread "StreamThread-2"
> > org.apache.kafka.streams.errors.StreamsException: Exception caught in
> > process. taskId=1_396, processor=KSTREAM-SOURCE-0000000004,
> > topic=topicname, partition=396, offset=66839
> >         at
> >
> > org.apache.kafka.streams.processor.internals.
> StreamTask.process(StreamTask.java:216)
> >         at
> >
> > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(
> StreamThread.java:641)
> >         at
> >
> > org.apache.kafka.streams.processor.internals.
> StreamThread.run(StreamThread.java:368)
> > Caused by: org.apache.kafka.streams.errors.InvalidStateStoreException:
> > store %s has closed
> >         at
> >
> > org.apache.kafka.streams.state.internals.RocksDBStore$
> RocksDbIterator.hasNext(RocksDBStore.java:398)
> >         at
> >
> > org.apache.kafka.streams.state.internals.RocksDBStore$
> RocksDBRangeIterator.hasNext(RocksDBStore.java:457)
> >         at
> >
> > org.apache.kafka.streams.state.internals.WindowStoreKeySchema$1.
> hasNext(WindowStoreKeySchema.java:30)
> >         at
> >
> > org.apache.kafka.streams.state.internals.SegmentIterator.hasNext(
> SegmentIterator.java:69)
> >         at
> >
> > org.apache.kafka.streams.state.internals.MeteredSegmentedBytesStore$
> MeteredSegmentedBytesStoreIterator.hasNext(MeteredSegmentedBytesStore.
> java:131)
> >         at
> >
> > org.apache.kafka.streams.state.internals.RocksDBWindowStore$
> TheWindowStoreIterator.hasNext(RocksDBWindowStore.java:131)
> >         at
> >
> > org.apache.kafka.streams.state.internals.AbstractMergedSortedCacheStore
> Iterator.hasNext(AbstractMergedSortedCacheStoreIterator.java:74)
> >         at
> >
> > org.apache.kafka.streams.kstream.internals.KStreamWindowAggregate$
> KStreamWindowAggregateProcessor.process(KStreamWindowAggregate.java:97)
> >         at
> >
> > org.apache.kafka.streams.processor.internals.ProcessorNode$1.run(
> ProcessorNode.java:48)
> >         at
> >
> > org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.
> measureLatencyNs(StreamsMetricsImpl.java:188)
> >         at
> >
> > org.apache.kafka.streams.processor.internals.ProcessorNode.process(
> ProcessorNode.java:134)
> >         at
> >
> > org.apache.kafka.streams.processor.internals.
> ProcessorContextImpl.forward(ProcessorContextImpl.java:83)
> >         at
> >
> > org.apache.kafka.streams.processor.internals.
> SourceNode.process(SourceNode.java:70)
> >         at
> >
> > org.apache.kafka.streams.processor.internals.
> StreamTask.process(StreamTask.java:197)
> >         ... 2 more
> >
> >
> > *Stacktrace # 2 (got on 1 node):*
> >
> > 18:57:44.692 [StreamThread-2] INFO o.a.k.s.p.i.StreamThread -
> stream-thread
> > [StreamThread-2] Stream thread shutdown complete
> > 18:57:44.692 [StreamThread-2] WARN o.a.k.s.p.i.StreamThread - Unexpected
> > state transition from ASSIGNING_PARTITIONS to NOT_RUNNING
> > Exception in thread "StreamThread-2"
> > org.apache.kafka.streams.errors.StreamsException: stream-thread
> > [StreamThread-2] Failed to rebalance
> >         at
> >
> > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(
> StreamThread.java:612)
> >         at
> >
> > org.apache.kafka.streams.processor.internals.
> StreamThread.run(StreamThread.java:368)
> > Caused by: java.lang.IllegalArgumentException: A metric named
> 'MetricName
> > [name=1_234-sometopic-hitRatio-avg, group=stream-record-cache-metrics,
> > description=The current count of 1_234-sometopic hitRatio operation.,
> > tags={record-cache-id=1_234-sometopic}]' already exists, can't register
> > another one.
> >         at
> > org.apache.kafka.common.metrics.Metrics.registerMetric(Metrics.java:433)
> >         at org.apache.kafka.common.metrics.Sensor.add(Sensor.java:249)
> >         at org.apache.kafka.common.metrics.Sensor.add(Sensor.java:234)
> >         at
> >
> > org.apache.kafka.streams.state.internals.NamedCache$
> NamedCacheMetrics.<init>(NamedCache.java:388)
> >         at
> >
> > org.apache.kafka.streams.state.internals.NamedCache.<
> init>(NamedCache.java:62)
> >         at
> >
> > org.apache.kafka.streams.state.internals.ThreadCache.
> getOrCreateCache(ThreadCache.java:226)
> >         at
> >
> > org.apache.kafka.streams.state.internals.ThreadCache.
> addDirtyEntryFlushListener(ThreadCache.java:87)
> >         at
> >
> > org.apache.kafka.streams.state.internals.CachingWindowStore.
> initInternal(CachingWindowStore.java:74)
> >         at
> >
> > org.apache.kafka.streams.state.internals.CachingWindowStore.init(
> CachingWindowStore.java:62)
> >         at
> >
> > org.apache.kafka.streams.processor.internals.AbstractTask.
> initializeStateStores(AbstractTask.java:86)
> >         at
> >
> > org.apache.kafka.streams.processor.internals.
> StreamTask.<init>(StreamTask.java:141)
> >         at
> >
> > org.apache.kafka.streams.processor.internals.
> StreamThread.createStreamTask(StreamThread.java:834)
> >         at
> >
> > org.apache.kafka.streams.processor.internals.StreamThread$TaskCreator.
> createTask(StreamThread.java:1207)
> >         at
> >
> > org.apache.kafka.streams.processor.internals.StreamThread$
> AbstractTaskCreator.retryWithBackoff(StreamThread.java:1180)
> >         at
> >
> > org.apache.kafka.streams.processor.internals.
> StreamThread.addStreamTasks(StreamThread.java:937)
> >         at
> >
> > org.apache.kafka.streams.processor.internals.StreamThread.access$500(
> StreamThread.java:69)
> >         at
> >
> > org.apache.kafka.streams.processor.internals.StreamThread$1.
> onPartitionsAssigned(StreamThread.java:236)
> >         at
> >
> > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.
> onJoinComplete(ConsumerCoordinator.java:255)
> >         at
> >
> > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.
> joinGroupIfNeeded(AbstractCoordinator.java:339)
> >         at
> >
> > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.
> ensureActiveGroup(AbstractCoordinator.java:303)
> >         at
> >
> > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(
> ConsumerCoordinator.java:286)
> >         at
> >
> > org.apache.kafka.clients.consumer.KafkaConsumer.
> pollOnce(KafkaConsumer.java:1030)
> >         at
> >
> > org.apache.kafka.clients.consumer.KafkaConsumer.poll(
> KafkaConsumer.java:995)
> >         at
> >
> > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(
> StreamThread.java:582)
> >         ... 1 more
> >
> > *Stacktrace # 3 (got on 1 node):*
> >
> > 19:07:34.827 [StreamThread-1] WARN o.a.k.s.p.i.StreamThread - Could not
> > create task 0_192. Will retry.
> > org.apache.kafka.streams.errors.LockException: task [0_192] Failed to
> lock
> > the state directory: /tmp/kafka-streams/streams_test_2/0_192
> >         at
> >
> > org.apache.kafka.streams.processor.internals.
> ProcessorStateManager.<init>(ProcessorStateManager.java:102)
> > ~[app-name-1.2.1.jar:na]
> >         at
> >
> > org.apache.kafka.streams.processor.internals.AbstractTask.<init>(
> AbstractTask.java:73)
> > ~[app-name-1.2.1.jar:na]
> >         at
> >
> > org.apache.kafka.streams.processor.internals.
> StreamTask.<init>(StreamTask.java:108)
> > ~[app-name-1.2.1.jar:na]
> >         at
> >
> > org.apache.kafka.streams.processor.internals.
> StreamThread.createStreamTask(StreamThread.java:834)
> > [app-name-1.2.1.jar:na]
> >         at
> >
> > org.apache.kafka.streams.processor.internals.StreamThread$TaskCreator.
> createTask(StreamThread.java:1207)
> > ~[app-name-1.2.1.jar:na]
> >         at
> >
> > org.apache.kafka.streams.processor.internals.StreamThread$
> AbstractTaskCreator.retryWithBackoff(StreamThread.java:1180)
> > ~[app-name-1.2.1.jar:na]
> >         at
> >
> > org.apache.kafka.streams.processor.internals.
> StreamThread.addStreamTasks(StreamThread.java:937)
> > [app-name-1.2.1.jar:na]
> >         at
> >
> > org.apache.kafka.streams.processor.internals.StreamThread.access$500(
> StreamThread.java:69)
> > [app-name-1.2.1.jar:na]
> >         at
> >
> > org.apache.kafka.streams.processor.internals.StreamThread$1.
> onPartitionsAssigned(StreamThread.java:236)
> > [app-name-1.2.1.jar:na]
> >         at
> >
> > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.
> onJoinComplete(ConsumerCoordinator.java:255)
> > [app-name-1.2.1.jar:na]
> >         at
> >
> > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.
> joinGroupIfNeeded(AbstractCoordinator.java:339)
> > [app-name-1.2.1.jar:na]
> >         at
> >
> > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.
> ensureActiveGroup(AbstractCoordinator.java:303)
> > [app-name-1.2.1.jar:na]
> >         at
> >
> > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(
> ConsumerCoordinator.java:286)
> > [app-name-1.2.1.jar:na]
> >         at
> >
> > org.apache.kafka.clients.consumer.KafkaConsumer.
> pollOnce(KafkaConsumer.java:1030)
> > [app-name-1.2.1.jar:na]
> >         at
> >
> > org.apache.kafka.clients.consumer.KafkaConsumer.poll(
> KafkaConsumer.java:995)
> > [app-name-1.2.1.jar:na]
> >         at
> >
> > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(
> StreamThread.java:582)
> > [app-name-1.2.1.jar:na]
> >         at
> >
> > org.apache.kafka.streams.processor.internals.
> StreamThread.run(StreamThread.java:368)
> > [app-name-1.2.1.jar:na]
> >
> >
> >
> > On Sat, Mar 18, 2017 at 5:58 AM, Mahendra Kariya <
> > mahendra.kariya@go-jek.com
> > > wrote:
> >
> > > Thanks for the heads up Guozhang!
> > >
> > > The problem is our brokers are on 0.10.0.x. So we will have to upgrade
> > > them.
> > >
> > > On Sat, Mar 18, 2017 at 12:30 AM, Guozhang Wang <wa...@gmail.com>
> > > wrote:
> > >
> > >> Hi Mahendra,
> > >>
> > >> Just a kind reminder that upgrading Streams to 0.10.2 does not
> > necessarily
> > >> require you to upgrade brokers to 0.10.2 as well. Since we have added
> a
> > >> new
> > >> feature since 0.10.2 to allow newer versioned clients (producer,
> > consumer,
> > >> streams) to talk to older versioned brokers, and for Streams
> > specifically
> > >> it only requires brokers to be no older than 0.10.1.
> > >>
> > >>
> > >> Guozhang
> > >>
> > >>
> > >> On Mon, Mar 13, 2017 at 5:12 AM, Mahendra Kariya <
> > >> mahendra.kariya@go-jek.com
> > >> > wrote:
> > >>
> > >> > We are planning to migrate to the newer version of Kafka. But
> that's a
> > >> few
> > >> > weeks away.
> > >> >
> > >> > We will try setting the socket config and see how it turns out.
> > >> >
> > >> > Thanks a lot for your response!
> > >> >
> > >> >
> > >> >
> > >> > On Mon, Mar 13, 2017 at 3:21 PM, Eno Thereska <
> eno.thereska@gmail.com
> > >
> > >> > wrote:
> > >> >
> > >> > > Thanks,
> > >> > >
> > >> > > A couple of things:
> > >> > > - I’d recommend moving to 0.10.2 (latest release) if you can since
> > >> > several
> > >> > > improvements were made in the last two releases that make
> > rebalancing
> > >> and
> > >> > > performance better.
> > >> > >
> > >> > > - When running on environments with large latency on AWS at least
> > >> > (haven’t
> > >> > > tried Google cloud), one parameter we have found useful to
> increase
> > >> > > performance is the receive and send socket size for the consumer
> and
> > >> > > producer in streams. We’d recommend setting them to 1MB like this
> > >> (where
> > >> > > “props” is your own properties object when you start streams):
> > >> > >
> > >> > > // the socket buffer needs to be large, especially when running in
> > AWS
> > >> > with
> > >> > > // high latency. if running locally the default is fine.
> > >> > > props.put(ProducerConfig.SEND_BUFFER_CONFIG, 1024 * 1024);
> > >> > > props.put(ConsumerConfig.RECEIVE_BUFFER_CONFIG, 1024 * 1024);
> > >> > >
> > >> > > Make sure the OS allows the larger socket size too.
> > >> > >
> > >> > > Thanks
> > >> > > Eno
> > >> > >
> > >> > > > On Mar 13, 2017, at 9:21 AM, Mahendra Kariya <
> > >> > mahendra.kariya@go-jek.com>
> > >> > > wrote:
> > >> > > >
> > >> > > > Hi Eno,
> > >> > > >
> > >> > > > Please find my answers inline.
> > >> > > >
> > >> > > >
> > >> > > > We are in the process of documenting capacity planning for
> > streams,
> > >> > stay
> > >> > > tuned.
> > >> > > >
> > >> > > > This would be great! Looking forward to it.
> > >> > > >
> > >> > > > Could you send some more info on your problem? What Kafka
> version
> > >> are
> > >> > > you using?
> > >> > > >
> > >> > > > We are using Kafka 0.10.0.0.
> > >> > > >
> > >> > > > Are the VMs on the same or different hosts?
> > >> > > >
> > >> > > > The VMs are on Google Cloud. Two of them are in asia-east1-a and
> > >> one is
> > >> > > in asia-east1-c. All three are n1-standard-4 Ubuntu instances.
> > >> > > >
> > >> > > > Also what exactly do you mean by “the lag keeps fluctuating”,
> what
> > >> > > metric are you looking at?
> > >> > > >
> > >> > > > We are looking at Kafka Manager for the time being. By
> > fluctuating,
> > >> I
> > >> > > mean the lag is few thousands at one time, we refresh it the next
> > >> second,
> > >> > > it is in few lakhs, and again refresh it and it is few thousands.
> I
> > >> > > understand this may not be very accurate. We will soon have more
> > >> accurate
> > >> > > data once we start pushing the consumer lag metric to Datadog.
> > >> > > >
> > >> > > > But on a separate note, the difference between lags on different
> > >> > > partitions is way too high. I have attached a tab separated file
> > >> herewith
> > >> > > which shows the consumer lag (from Kafka Manager) for the first
> the
> > 50
> > >> > > partitions. As is clear, the lag on partition 2 is 530 while the
> lag
> > >> on
> > >> > > partition 18 is 23K. Note that the same VM is pulling data from
> both
> > >> the
> > >> > > partitions.
> > >> > > >
> > >> > > >
> > >> > > >
> > >> > > >
> > >> > > > <KafkaLags.tsv>
> > >> > >
> > >> > >
> > >> >
> > >>
> > >>
> > >>
> > >> --
> > >> -- Guozhang
> > >>
> > >
> > >
> >
>

Re: Capacity planning for Kafka Streams

Posted by Damian Guy <da...@gmail.com>.
Hi Mahendra,

Are you able to share the complete logs? It is pretty hard to tell what is
happening just from a few snippets of information.

Thanks,
Damian

On Wed, 22 Mar 2017 at 12:16 Mahendra Kariya <ma...@go-jek.com>
wrote:

> To test Kafka streams on 0.10.2.0, we setup a new Kafka cluster with the
> latest version and used mirror maker to replicate the data from the
> 0.10.0.0 Kafka cluster. We pointed our streaming app to the newly created
> Kafka cluster.
>
> We have 5 nodes, each running the streaming app with 10 threads. In less
> than 10 minutes, the process on all the 5 nodes died with different
> exceptions. Below are the different stack traces we got.
>
> Any help would be really appreciated.
>
> *Stacktrace # 1 (got on 3 of 5 nodes):*
>
> 18:58:00.349 [StreamThread-2] INFO o.a.k.s.p.i.StreamThread - stream-thread
> [StreamThread-2] Stream thread shutdown complete
> 18:58:00.349 [StreamThread-2] WARN o.a.k.s.p.i.StreamThread - Unexpected
> state transition from RUNNING to NOT_RUNNING
> Exception in thread "StreamThread-2"
> org.apache.kafka.streams.errors.StreamsException: Exception caught in
> process. taskId=1_396, processor=KSTREAM-SOURCE-0000000004,
> topic=topicname, partition=396, offset=66839
>         at
>
> org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:216)
>         at
>
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:641)
>         at
>
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:368)
> Caused by: org.apache.kafka.streams.errors.InvalidStateStoreException:
> store %s has closed
>         at
>
> org.apache.kafka.streams.state.internals.RocksDBStore$RocksDbIterator.hasNext(RocksDBStore.java:398)
>         at
>
> org.apache.kafka.streams.state.internals.RocksDBStore$RocksDBRangeIterator.hasNext(RocksDBStore.java:457)
>         at
>
> org.apache.kafka.streams.state.internals.WindowStoreKeySchema$1.hasNext(WindowStoreKeySchema.java:30)
>         at
>
> org.apache.kafka.streams.state.internals.SegmentIterator.hasNext(SegmentIterator.java:69)
>         at
>
> org.apache.kafka.streams.state.internals.MeteredSegmentedBytesStore$MeteredSegmentedBytesStoreIterator.hasNext(MeteredSegmentedBytesStore.java:131)
>         at
>
> org.apache.kafka.streams.state.internals.RocksDBWindowStore$TheWindowStoreIterator.hasNext(RocksDBWindowStore.java:131)
>         at
>
> org.apache.kafka.streams.state.internals.AbstractMergedSortedCacheStoreIterator.hasNext(AbstractMergedSortedCacheStoreIterator.java:74)
>         at
>
> org.apache.kafka.streams.kstream.internals.KStreamWindowAggregate$KStreamWindowAggregateProcessor.process(KStreamWindowAggregate.java:97)
>         at
>
> org.apache.kafka.streams.processor.internals.ProcessorNode$1.run(ProcessorNode.java:48)
>         at
>
> org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:188)
>         at
>
> org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:134)
>         at
>
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:83)
>         at
>
> org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:70)
>         at
>
> org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:197)
>         ... 2 more
>
>
> *Stacktrace # 2 (got on 1 node):*
>
> 18:57:44.692 [StreamThread-2] INFO o.a.k.s.p.i.StreamThread - stream-thread
> [StreamThread-2] Stream thread shutdown complete
> 18:57:44.692 [StreamThread-2] WARN o.a.k.s.p.i.StreamThread - Unexpected
> state transition from ASSIGNING_PARTITIONS to NOT_RUNNING
> Exception in thread "StreamThread-2"
> org.apache.kafka.streams.errors.StreamsException: stream-thread
> [StreamThread-2] Failed to rebalance
>         at
>
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:612)
>         at
>
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:368)
> Caused by: java.lang.IllegalArgumentException: A metric named 'MetricName
> [name=1_234-sometopic-hitRatio-avg, group=stream-record-cache-metrics,
> description=The current count of 1_234-sometopic hitRatio operation.,
> tags={record-cache-id=1_234-sometopic}]' already exists, can't register
> another one.
>         at
> org.apache.kafka.common.metrics.Metrics.registerMetric(Metrics.java:433)
>         at org.apache.kafka.common.metrics.Sensor.add(Sensor.java:249)
>         at org.apache.kafka.common.metrics.Sensor.add(Sensor.java:234)
>         at
>
> org.apache.kafka.streams.state.internals.NamedCache$NamedCacheMetrics.<init>(NamedCache.java:388)
>         at
>
> org.apache.kafka.streams.state.internals.NamedCache.<init>(NamedCache.java:62)
>         at
>
> org.apache.kafka.streams.state.internals.ThreadCache.getOrCreateCache(ThreadCache.java:226)
>         at
>
> org.apache.kafka.streams.state.internals.ThreadCache.addDirtyEntryFlushListener(ThreadCache.java:87)
>         at
>
> org.apache.kafka.streams.state.internals.CachingWindowStore.initInternal(CachingWindowStore.java:74)
>         at
>
> org.apache.kafka.streams.state.internals.CachingWindowStore.init(CachingWindowStore.java:62)
>         at
>
> org.apache.kafka.streams.processor.internals.AbstractTask.initializeStateStores(AbstractTask.java:86)
>         at
>
> org.apache.kafka.streams.processor.internals.StreamTask.<init>(StreamTask.java:141)
>         at
>
> org.apache.kafka.streams.processor.internals.StreamThread.createStreamTask(StreamThread.java:834)
>         at
>
> org.apache.kafka.streams.processor.internals.StreamThread$TaskCreator.createTask(StreamThread.java:1207)
>         at
>
> org.apache.kafka.streams.processor.internals.StreamThread$AbstractTaskCreator.retryWithBackoff(StreamThread.java:1180)
>         at
>
> org.apache.kafka.streams.processor.internals.StreamThread.addStreamTasks(StreamThread.java:937)
>         at
>
> org.apache.kafka.streams.processor.internals.StreamThread.access$500(StreamThread.java:69)
>         at
>
> org.apache.kafka.streams.processor.internals.StreamThread$1.onPartitionsAssigned(StreamThread.java:236)
>         at
>
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:255)
>         at
>
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:339)
>         at
>
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:303)
>         at
>
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:286)
>         at
>
> org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1030)
>         at
>
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:995)
>         at
>
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:582)
>         ... 1 more
>
> *Stacktrace # 3 (got on 1 node):*
>
> 19:07:34.827 [StreamThread-1] WARN o.a.k.s.p.i.StreamThread - Could not
> create task 0_192. Will retry.
> org.apache.kafka.streams.errors.LockException: task [0_192] Failed to lock
> the state directory: /tmp/kafka-streams/streams_test_2/0_192
>         at
>
> org.apache.kafka.streams.processor.internals.ProcessorStateManager.<init>(ProcessorStateManager.java:102)
> ~[app-name-1.2.1.jar:na]
>         at
>
> org.apache.kafka.streams.processor.internals.AbstractTask.<init>(AbstractTask.java:73)
> ~[app-name-1.2.1.jar:na]
>         at
>
> org.apache.kafka.streams.processor.internals.StreamTask.<init>(StreamTask.java:108)
> ~[app-name-1.2.1.jar:na]
>         at
>
> org.apache.kafka.streams.processor.internals.StreamThread.createStreamTask(StreamThread.java:834)
> [app-name-1.2.1.jar:na]
>         at
>
> org.apache.kafka.streams.processor.internals.StreamThread$TaskCreator.createTask(StreamThread.java:1207)
> ~[app-name-1.2.1.jar:na]
>         at
>
> org.apache.kafka.streams.processor.internals.StreamThread$AbstractTaskCreator.retryWithBackoff(StreamThread.java:1180)
> ~[app-name-1.2.1.jar:na]
>         at
>
> org.apache.kafka.streams.processor.internals.StreamThread.addStreamTasks(StreamThread.java:937)
> [app-name-1.2.1.jar:na]
>         at
>
> org.apache.kafka.streams.processor.internals.StreamThread.access$500(StreamThread.java:69)
> [app-name-1.2.1.jar:na]
>         at
>
> org.apache.kafka.streams.processor.internals.StreamThread$1.onPartitionsAssigned(StreamThread.java:236)
> [app-name-1.2.1.jar:na]
>         at
>
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:255)
> [app-name-1.2.1.jar:na]
>         at
>
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:339)
> [app-name-1.2.1.jar:na]
>         at
>
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:303)
> [app-name-1.2.1.jar:na]
>         at
>
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:286)
> [app-name-1.2.1.jar:na]
>         at
>
> org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1030)
> [app-name-1.2.1.jar:na]
>         at
>
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:995)
> [app-name-1.2.1.jar:na]
>         at
>
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:582)
> [app-name-1.2.1.jar:na]
>         at
>
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:368)
> [app-name-1.2.1.jar:na]
>
>
>
> On Sat, Mar 18, 2017 at 5:58 AM, Mahendra Kariya <
> mahendra.kariya@go-jek.com
> > wrote:
>
> > Thanks for the heads up Guozhang!
> >
> > The problem is our brokers are on 0.10.0.x. So we will have to upgrade
> > them.
> >
> > On Sat, Mar 18, 2017 at 12:30 AM, Guozhang Wang <wa...@gmail.com>
> > wrote:
> >
> >> Hi Mahendra,
> >>
> >> Just a kind reminder that upgrading Streams to 0.10.2 does not
> necessarily
> >> require you to upgrade brokers to 0.10.2 as well. Since we have added a
> >> new
> >> feature since 0.10.2 to allow newer versioned clients (producer,
> consumer,
> >> streams) to talk to older versioned brokers, and for Streams
> specifically
> >> it only requires brokers to be no older than 0.10.1.
> >>
> >>
> >> Guozhang
> >>
> >>
> >> On Mon, Mar 13, 2017 at 5:12 AM, Mahendra Kariya <
> >> mahendra.kariya@go-jek.com
> >> > wrote:
> >>
> >> > We are planning to migrate to the newer version of Kafka. But that's a
> >> few
> >> > weeks away.
> >> >
> >> > We will try setting the socket config and see how it turns out.
> >> >
> >> > Thanks a lot for your response!
> >> >
> >> >
> >> >
> >> > On Mon, Mar 13, 2017 at 3:21 PM, Eno Thereska <eno.thereska@gmail.com
> >
> >> > wrote:
> >> >
> >> > > Thanks,
> >> > >
> >> > > A couple of things:
> >> > > - I’d recommend moving to 0.10.2 (latest release) if you can since
> >> > several
> >> > > improvements were made in the last two releases that make
> rebalancing
> >> and
> >> > > performance better.
> >> > >
> >> > > - When running on environments with large latency on AWS at least
> >> > (haven’t
> >> > > tried Google cloud), one parameter we have found useful to increase
> >> > > performance is the receive and send socket size for the consumer and
> >> > > producer in streams. We’d recommend setting them to 1MB like this
> >> (where
> >> > > “props” is your own properties object when you start streams):
> >> > >
> >> > > // the socket buffer needs to be large, especially when running in
> AWS
> >> > with
> >> > > // high latency. if running locally the default is fine.
> >> > > props.put(ProducerConfig.SEND_BUFFER_CONFIG, 1024 * 1024);
> >> > > props.put(ConsumerConfig.RECEIVE_BUFFER_CONFIG, 1024 * 1024);
> >> > >
> >> > > Make sure the OS allows the larger socket size too.
> >> > >
> >> > > Thanks
> >> > > Eno
> >> > >
> >> > > > On Mar 13, 2017, at 9:21 AM, Mahendra Kariya <
> >> > mahendra.kariya@go-jek.com>
> >> > > wrote:
> >> > > >
> >> > > > Hi Eno,
> >> > > >
> >> > > > Please find my answers inline.
> >> > > >
> >> > > >
> >> > > > We are in the process of documenting capacity planning for
> streams,
> >> > stay
> >> > > tuned.
> >> > > >
> >> > > > This would be great! Looking forward to it.
> >> > > >
> >> > > > Could you send some more info on your problem? What Kafka version
> >> are
> >> > > you using?
> >> > > >
> >> > > > We are using Kafka 0.10.0.0.
> >> > > >
> >> > > > Are the VMs on the same or different hosts?
> >> > > >
> >> > > > The VMs are on Google Cloud. Two of them are in asia-east1-a and
> >> one is
> >> > > in asia-east1-c. All three are n1-standard-4 Ubuntu instances.
> >> > > >
> >> > > > Also what exactly do you mean by “the lag keeps fluctuating”, what
> >> > > metric are you looking at?
> >> > > >
> >> > > > We are looking at Kafka Manager for the time being. By
> fluctuating,
> >> I
> >> > > mean the lag is few thousands at one time, we refresh it the next
> >> second,
> >> > > it is in few lakhs, and again refresh it and it is few thousands. I
> >> > > understand this may not be very accurate. We will soon have more
> >> accurate
> >> > > data once we start pushing the consumer lag metric to Datadog.
> >> > > >
> >> > > > But on a separate note, the difference between lags on different
> >> > > partitions is way too high. I have attached a tab separated file
> >> herewith
> >> > > which shows the consumer lag (from Kafka Manager) for the first the
> 50
> >> > > partitions. As is clear, the lag on partition 2 is 530 while the lag
> >> on
> >> > > partition 18 is 23K. Note that the same VM is pulling data from both
> >> the
> >> > > partitions.
> >> > > >
> >> > > >
> >> > > >
> >> > > >
> >> > > > <KafkaLags.tsv>
> >> > >
> >> > >
> >> >
> >>
> >>
> >>
> >> --
> >> -- Guozhang
> >>
> >
> >
>

Re: Capacity planning for Kafka Streams

Posted by Mahendra Kariya <ma...@go-jek.com>.
To test Kafka streams on 0.10.2.0, we setup a new Kafka cluster with the
latest version and used mirror maker to replicate the data from the
0.10.0.0 Kafka cluster. We pointed our streaming app to the newly created
Kafka cluster.

We have 5 nodes, each running the streaming app with 10 threads. In less
than 10 minutes, the process on all the 5 nodes died with different
exceptions. Below are the different stack traces we got.

Any help would be really appreciated.

*Stacktrace # 1 (got on 3 of 5 nodes):*

18:58:00.349 [StreamThread-2] INFO o.a.k.s.p.i.StreamThread - stream-thread
[StreamThread-2] Stream thread shutdown complete
18:58:00.349 [StreamThread-2] WARN o.a.k.s.p.i.StreamThread - Unexpected
state transition from RUNNING to NOT_RUNNING
Exception in thread "StreamThread-2"
org.apache.kafka.streams.errors.StreamsException: Exception caught in
process. taskId=1_396, processor=KSTREAM-SOURCE-0000000004,
topic=topicname, partition=396, offset=66839
        at
org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:216)
        at
org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:641)
        at
org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:368)
Caused by: org.apache.kafka.streams.errors.InvalidStateStoreException:
store %s has closed
        at
org.apache.kafka.streams.state.internals.RocksDBStore$RocksDbIterator.hasNext(RocksDBStore.java:398)
        at
org.apache.kafka.streams.state.internals.RocksDBStore$RocksDBRangeIterator.hasNext(RocksDBStore.java:457)
        at
org.apache.kafka.streams.state.internals.WindowStoreKeySchema$1.hasNext(WindowStoreKeySchema.java:30)
        at
org.apache.kafka.streams.state.internals.SegmentIterator.hasNext(SegmentIterator.java:69)
        at
org.apache.kafka.streams.state.internals.MeteredSegmentedBytesStore$MeteredSegmentedBytesStoreIterator.hasNext(MeteredSegmentedBytesStore.java:131)
        at
org.apache.kafka.streams.state.internals.RocksDBWindowStore$TheWindowStoreIterator.hasNext(RocksDBWindowStore.java:131)
        at
org.apache.kafka.streams.state.internals.AbstractMergedSortedCacheStoreIterator.hasNext(AbstractMergedSortedCacheStoreIterator.java:74)
        at
org.apache.kafka.streams.kstream.internals.KStreamWindowAggregate$KStreamWindowAggregateProcessor.process(KStreamWindowAggregate.java:97)
        at
org.apache.kafka.streams.processor.internals.ProcessorNode$1.run(ProcessorNode.java:48)
        at
org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:188)
        at
org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:134)
        at
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:83)
        at
org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:70)
        at
org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:197)
        ... 2 more


*Stacktrace # 2 (got on 1 node):*

18:57:44.692 [StreamThread-2] INFO o.a.k.s.p.i.StreamThread - stream-thread
[StreamThread-2] Stream thread shutdown complete
18:57:44.692 [StreamThread-2] WARN o.a.k.s.p.i.StreamThread - Unexpected
state transition from ASSIGNING_PARTITIONS to NOT_RUNNING
Exception in thread "StreamThread-2"
org.apache.kafka.streams.errors.StreamsException: stream-thread
[StreamThread-2] Failed to rebalance
        at
org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:612)
        at
org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:368)
Caused by: java.lang.IllegalArgumentException: A metric named 'MetricName
[name=1_234-sometopic-hitRatio-avg, group=stream-record-cache-metrics,
description=The current count of 1_234-sometopic hitRatio operation.,
tags={record-cache-id=1_234-sometopic}]' already exists, can't register
another one.
        at
org.apache.kafka.common.metrics.Metrics.registerMetric(Metrics.java:433)
        at org.apache.kafka.common.metrics.Sensor.add(Sensor.java:249)
        at org.apache.kafka.common.metrics.Sensor.add(Sensor.java:234)
        at
org.apache.kafka.streams.state.internals.NamedCache$NamedCacheMetrics.<init>(NamedCache.java:388)
        at
org.apache.kafka.streams.state.internals.NamedCache.<init>(NamedCache.java:62)
        at
org.apache.kafka.streams.state.internals.ThreadCache.getOrCreateCache(ThreadCache.java:226)
        at
org.apache.kafka.streams.state.internals.ThreadCache.addDirtyEntryFlushListener(ThreadCache.java:87)
        at
org.apache.kafka.streams.state.internals.CachingWindowStore.initInternal(CachingWindowStore.java:74)
        at
org.apache.kafka.streams.state.internals.CachingWindowStore.init(CachingWindowStore.java:62)
        at
org.apache.kafka.streams.processor.internals.AbstractTask.initializeStateStores(AbstractTask.java:86)
        at
org.apache.kafka.streams.processor.internals.StreamTask.<init>(StreamTask.java:141)
        at
org.apache.kafka.streams.processor.internals.StreamThread.createStreamTask(StreamThread.java:834)
        at
org.apache.kafka.streams.processor.internals.StreamThread$TaskCreator.createTask(StreamThread.java:1207)
        at
org.apache.kafka.streams.processor.internals.StreamThread$AbstractTaskCreator.retryWithBackoff(StreamThread.java:1180)
        at
org.apache.kafka.streams.processor.internals.StreamThread.addStreamTasks(StreamThread.java:937)
        at
org.apache.kafka.streams.processor.internals.StreamThread.access$500(StreamThread.java:69)
        at
org.apache.kafka.streams.processor.internals.StreamThread$1.onPartitionsAssigned(StreamThread.java:236)
        at
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:255)
        at
org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:339)
        at
org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:303)
        at
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:286)
        at
org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1030)
        at
org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:995)
        at
org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:582)
        ... 1 more

*Stacktrace # 3 (got on 1 node):*

19:07:34.827 [StreamThread-1] WARN o.a.k.s.p.i.StreamThread - Could not
create task 0_192. Will retry.
org.apache.kafka.streams.errors.LockException: task [0_192] Failed to lock
the state directory: /tmp/kafka-streams/streams_test_2/0_192
        at
org.apache.kafka.streams.processor.internals.ProcessorStateManager.<init>(ProcessorStateManager.java:102)
~[app-name-1.2.1.jar:na]
        at
org.apache.kafka.streams.processor.internals.AbstractTask.<init>(AbstractTask.java:73)
~[app-name-1.2.1.jar:na]
        at
org.apache.kafka.streams.processor.internals.StreamTask.<init>(StreamTask.java:108)
~[app-name-1.2.1.jar:na]
        at
org.apache.kafka.streams.processor.internals.StreamThread.createStreamTask(StreamThread.java:834)
[app-name-1.2.1.jar:na]
        at
org.apache.kafka.streams.processor.internals.StreamThread$TaskCreator.createTask(StreamThread.java:1207)
~[app-name-1.2.1.jar:na]
        at
org.apache.kafka.streams.processor.internals.StreamThread$AbstractTaskCreator.retryWithBackoff(StreamThread.java:1180)
~[app-name-1.2.1.jar:na]
        at
org.apache.kafka.streams.processor.internals.StreamThread.addStreamTasks(StreamThread.java:937)
[app-name-1.2.1.jar:na]
        at
org.apache.kafka.streams.processor.internals.StreamThread.access$500(StreamThread.java:69)
[app-name-1.2.1.jar:na]
        at
org.apache.kafka.streams.processor.internals.StreamThread$1.onPartitionsAssigned(StreamThread.java:236)
[app-name-1.2.1.jar:na]
        at
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:255)
[app-name-1.2.1.jar:na]
        at
org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:339)
[app-name-1.2.1.jar:na]
        at
org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:303)
[app-name-1.2.1.jar:na]
        at
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:286)
[app-name-1.2.1.jar:na]
        at
org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1030)
[app-name-1.2.1.jar:na]
        at
org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:995)
[app-name-1.2.1.jar:na]
        at
org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:582)
[app-name-1.2.1.jar:na]
        at
org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:368)
[app-name-1.2.1.jar:na]



On Sat, Mar 18, 2017 at 5:58 AM, Mahendra Kariya <mahendra.kariya@go-jek.com
> wrote:

> Thanks for the heads up Guozhang!
>
> The problem is our brokers are on 0.10.0.x. So we will have to upgrade
> them.
>
> On Sat, Mar 18, 2017 at 12:30 AM, Guozhang Wang <wa...@gmail.com>
> wrote:
>
>> Hi Mahendra,
>>
>> Just a kind reminder that upgrading Streams to 0.10.2 does not necessarily
>> require you to upgrade brokers to 0.10.2 as well. Since we have added a
>> new
>> feature since 0.10.2 to allow newer versioned clients (producer, consumer,
>> streams) to talk to older versioned brokers, and for Streams specifically
>> it only requires brokers to be no older than 0.10.1.
>>
>>
>> Guozhang
>>
>>
>> On Mon, Mar 13, 2017 at 5:12 AM, Mahendra Kariya <
>> mahendra.kariya@go-jek.com
>> > wrote:
>>
>> > We are planning to migrate to the newer version of Kafka. But that's a
>> few
>> > weeks away.
>> >
>> > We will try setting the socket config and see how it turns out.
>> >
>> > Thanks a lot for your response!
>> >
>> >
>> >
>> > On Mon, Mar 13, 2017 at 3:21 PM, Eno Thereska <en...@gmail.com>
>> > wrote:
>> >
>> > > Thanks,
>> > >
>> > > A couple of things:
>> > > - I’d recommend moving to 0.10.2 (latest release) if you can since
>> > several
>> > > improvements were made in the last two releases that make rebalancing
>> and
>> > > performance better.
>> > >
>> > > - When running on environments with large latency on AWS at least
>> > (haven’t
>> > > tried Google cloud), one parameter we have found useful to increase
>> > > performance is the receive and send socket size for the consumer and
>> > > producer in streams. We’d recommend setting them to 1MB like this
>> (where
>> > > “props” is your own properties object when you start streams):
>> > >
>> > > // the socket buffer needs to be large, especially when running in AWS
>> > with
>> > > // high latency. if running locally the default is fine.
>> > > props.put(ProducerConfig.SEND_BUFFER_CONFIG, 1024 * 1024);
>> > > props.put(ConsumerConfig.RECEIVE_BUFFER_CONFIG, 1024 * 1024);
>> > >
>> > > Make sure the OS allows the larger socket size too.
>> > >
>> > > Thanks
>> > > Eno
>> > >
>> > > > On Mar 13, 2017, at 9:21 AM, Mahendra Kariya <
>> > mahendra.kariya@go-jek.com>
>> > > wrote:
>> > > >
>> > > > Hi Eno,
>> > > >
>> > > > Please find my answers inline.
>> > > >
>> > > >
>> > > > We are in the process of documenting capacity planning for streams,
>> > stay
>> > > tuned.
>> > > >
>> > > > This would be great! Looking forward to it.
>> > > >
>> > > > Could you send some more info on your problem? What Kafka version
>> are
>> > > you using?
>> > > >
>> > > > We are using Kafka 0.10.0.0.
>> > > >
>> > > > Are the VMs on the same or different hosts?
>> > > >
>> > > > The VMs are on Google Cloud. Two of them are in asia-east1-a and
>> one is
>> > > in asia-east1-c. All three are n1-standard-4 Ubuntu instances.
>> > > >
>> > > > Also what exactly do you mean by “the lag keeps fluctuating”, what
>> > > metric are you looking at?
>> > > >
>> > > > We are looking at Kafka Manager for the time being. By fluctuating,
>> I
>> > > mean the lag is few thousands at one time, we refresh it the next
>> second,
>> > > it is in few lakhs, and again refresh it and it is few thousands. I
>> > > understand this may not be very accurate. We will soon have more
>> accurate
>> > > data once we start pushing the consumer lag metric to Datadog.
>> > > >
>> > > > But on a separate note, the difference between lags on different
>> > > partitions is way too high. I have attached a tab separated file
>> herewith
>> > > which shows the consumer lag (from Kafka Manager) for the first the 50
>> > > partitions. As is clear, the lag on partition 2 is 530 while the lag
>> on
>> > > partition 18 is 23K. Note that the same VM is pulling data from both
>> the
>> > > partitions.
>> > > >
>> > > >
>> > > >
>> > > >
>> > > > <KafkaLags.tsv>
>> > >
>> > >
>> >
>>
>>
>>
>> --
>> -- Guozhang
>>
>
>

Re: Capacity planning for Kafka Streams

Posted by Mahendra Kariya <ma...@go-jek.com>.
Thanks for the heads up Guozhang!

The problem is our brokers are on 0.10.0.x. So we will have to upgrade them.

On Sat, Mar 18, 2017 at 12:30 AM, Guozhang Wang <wa...@gmail.com> wrote:

> Hi Mahendra,
>
> Just a kind reminder that upgrading Streams to 0.10.2 does not necessarily
> require you to upgrade brokers to 0.10.2 as well. Since we have added a new
> feature since 0.10.2 to allow newer versioned clients (producer, consumer,
> streams) to talk to older versioned brokers, and for Streams specifically
> it only requires brokers to be no older than 0.10.1.
>
>
> Guozhang
>
>
> On Mon, Mar 13, 2017 at 5:12 AM, Mahendra Kariya <
> mahendra.kariya@go-jek.com
> > wrote:
>
> > We are planning to migrate to the newer version of Kafka. But that's a
> few
> > weeks away.
> >
> > We will try setting the socket config and see how it turns out.
> >
> > Thanks a lot for your response!
> >
> >
> >
> > On Mon, Mar 13, 2017 at 3:21 PM, Eno Thereska <en...@gmail.com>
> > wrote:
> >
> > > Thanks,
> > >
> > > A couple of things:
> > > - I’d recommend moving to 0.10.2 (latest release) if you can since
> > several
> > > improvements were made in the last two releases that make rebalancing
> and
> > > performance better.
> > >
> > > - When running on environments with large latency on AWS at least
> > (haven’t
> > > tried Google cloud), one parameter we have found useful to increase
> > > performance is the receive and send socket size for the consumer and
> > > producer in streams. We’d recommend setting them to 1MB like this
> (where
> > > “props” is your own properties object when you start streams):
> > >
> > > // the socket buffer needs to be large, especially when running in AWS
> > with
> > > // high latency. if running locally the default is fine.
> > > props.put(ProducerConfig.SEND_BUFFER_CONFIG, 1024 * 1024);
> > > props.put(ConsumerConfig.RECEIVE_BUFFER_CONFIG, 1024 * 1024);
> > >
> > > Make sure the OS allows the larger socket size too.
> > >
> > > Thanks
> > > Eno
> > >
> > > > On Mar 13, 2017, at 9:21 AM, Mahendra Kariya <
> > mahendra.kariya@go-jek.com>
> > > wrote:
> > > >
> > > > Hi Eno,
> > > >
> > > > Please find my answers inline.
> > > >
> > > >
> > > > We are in the process of documenting capacity planning for streams,
> > stay
> > > tuned.
> > > >
> > > > This would be great! Looking forward to it.
> > > >
> > > > Could you send some more info on your problem? What Kafka version are
> > > you using?
> > > >
> > > > We are using Kafka 0.10.0.0.
> > > >
> > > > Are the VMs on the same or different hosts?
> > > >
> > > > The VMs are on Google Cloud. Two of them are in asia-east1-a and one
> is
> > > in asia-east1-c. All three are n1-standard-4 Ubuntu instances.
> > > >
> > > > Also what exactly do you mean by “the lag keeps fluctuating”, what
> > > metric are you looking at?
> > > >
> > > > We are looking at Kafka Manager for the time being. By fluctuating, I
> > > mean the lag is few thousands at one time, we refresh it the next
> second,
> > > it is in few lakhs, and again refresh it and it is few thousands. I
> > > understand this may not be very accurate. We will soon have more
> accurate
> > > data once we start pushing the consumer lag metric to Datadog.
> > > >
> > > > But on a separate note, the difference between lags on different
> > > partitions is way too high. I have attached a tab separated file
> herewith
> > > which shows the consumer lag (from Kafka Manager) for the first the 50
> > > partitions. As is clear, the lag on partition 2 is 530 while the lag on
> > > partition 18 is 23K. Note that the same VM is pulling data from both
> the
> > > partitions.
> > > >
> > > >
> > > >
> > > >
> > > > <KafkaLags.tsv>
> > >
> > >
> >
>
>
>
> --
> -- Guozhang
>

Re: Capacity planning for Kafka Streams

Posted by Guozhang Wang <wa...@gmail.com>.
Hi Mahendra,

Just a kind reminder that upgrading Streams to 0.10.2 does not necessarily
require you to upgrade brokers to 0.10.2 as well. Since we have added a new
feature since 0.10.2 to allow newer versioned clients (producer, consumer,
streams) to talk to older versioned brokers, and for Streams specifically
it only requires brokers to be no older than 0.10.1.


Guozhang


On Mon, Mar 13, 2017 at 5:12 AM, Mahendra Kariya <mahendra.kariya@go-jek.com
> wrote:

> We are planning to migrate to the newer version of Kafka. But that's a few
> weeks away.
>
> We will try setting the socket config and see how it turns out.
>
> Thanks a lot for your response!
>
>
>
> On Mon, Mar 13, 2017 at 3:21 PM, Eno Thereska <en...@gmail.com>
> wrote:
>
> > Thanks,
> >
> > A couple of things:
> > - I’d recommend moving to 0.10.2 (latest release) if you can since
> several
> > improvements were made in the last two releases that make rebalancing and
> > performance better.
> >
> > - When running on environments with large latency on AWS at least
> (haven’t
> > tried Google cloud), one parameter we have found useful to increase
> > performance is the receive and send socket size for the consumer and
> > producer in streams. We’d recommend setting them to 1MB like this (where
> > “props” is your own properties object when you start streams):
> >
> > // the socket buffer needs to be large, especially when running in AWS
> with
> > // high latency. if running locally the default is fine.
> > props.put(ProducerConfig.SEND_BUFFER_CONFIG, 1024 * 1024);
> > props.put(ConsumerConfig.RECEIVE_BUFFER_CONFIG, 1024 * 1024);
> >
> > Make sure the OS allows the larger socket size too.
> >
> > Thanks
> > Eno
> >
> > > On Mar 13, 2017, at 9:21 AM, Mahendra Kariya <
> mahendra.kariya@go-jek.com>
> > wrote:
> > >
> > > Hi Eno,
> > >
> > > Please find my answers inline.
> > >
> > >
> > > We are in the process of documenting capacity planning for streams,
> stay
> > tuned.
> > >
> > > This would be great! Looking forward to it.
> > >
> > > Could you send some more info on your problem? What Kafka version are
> > you using?
> > >
> > > We are using Kafka 0.10.0.0.
> > >
> > > Are the VMs on the same or different hosts?
> > >
> > > The VMs are on Google Cloud. Two of them are in asia-east1-a and one is
> > in asia-east1-c. All three are n1-standard-4 Ubuntu instances.
> > >
> > > Also what exactly do you mean by “the lag keeps fluctuating”, what
> > metric are you looking at?
> > >
> > > We are looking at Kafka Manager for the time being. By fluctuating, I
> > mean the lag is few thousands at one time, we refresh it the next second,
> > it is in few lakhs, and again refresh it and it is few thousands. I
> > understand this may not be very accurate. We will soon have more accurate
> > data once we start pushing the consumer lag metric to Datadog.
> > >
> > > But on a separate note, the difference between lags on different
> > partitions is way too high. I have attached a tab separated file herewith
> > which shows the consumer lag (from Kafka Manager) for the first the 50
> > partitions. As is clear, the lag on partition 2 is 530 while the lag on
> > partition 18 is 23K. Note that the same VM is pulling data from both the
> > partitions.
> > >
> > >
> > >
> > >
> > > <KafkaLags.tsv>
> >
> >
>



-- 
-- Guozhang

Re: Capacity planning for Kafka Streams

Posted by Mahendra Kariya <ma...@go-jek.com>.
We are planning to migrate to the newer version of Kafka. But that's a few
weeks away.

We will try setting the socket config and see how it turns out.

Thanks a lot for your response!



On Mon, Mar 13, 2017 at 3:21 PM, Eno Thereska <en...@gmail.com>
wrote:

> Thanks,
>
> A couple of things:
> - I’d recommend moving to 0.10.2 (latest release) if you can since several
> improvements were made in the last two releases that make rebalancing and
> performance better.
>
> - When running on environments with large latency on AWS at least (haven’t
> tried Google cloud), one parameter we have found useful to increase
> performance is the receive and send socket size for the consumer and
> producer in streams. We’d recommend setting them to 1MB like this (where
> “props” is your own properties object when you start streams):
>
> // the socket buffer needs to be large, especially when running in AWS with
> // high latency. if running locally the default is fine.
> props.put(ProducerConfig.SEND_BUFFER_CONFIG, 1024 * 1024);
> props.put(ConsumerConfig.RECEIVE_BUFFER_CONFIG, 1024 * 1024);
>
> Make sure the OS allows the larger socket size too.
>
> Thanks
> Eno
>
> > On Mar 13, 2017, at 9:21 AM, Mahendra Kariya <ma...@go-jek.com>
> wrote:
> >
> > Hi Eno,
> >
> > Please find my answers inline.
> >
> >
> > We are in the process of documenting capacity planning for streams, stay
> tuned.
> >
> > This would be great! Looking forward to it.
> >
> > Could you send some more info on your problem? What Kafka version are
> you using?
> >
> > We are using Kafka 0.10.0.0.
> >
> > Are the VMs on the same or different hosts?
> >
> > The VMs are on Google Cloud. Two of them are in asia-east1-a and one is
> in asia-east1-c. All three are n1-standard-4 Ubuntu instances.
> >
> > Also what exactly do you mean by “the lag keeps fluctuating”, what
> metric are you looking at?
> >
> > We are looking at Kafka Manager for the time being. By fluctuating, I
> mean the lag is few thousands at one time, we refresh it the next second,
> it is in few lakhs, and again refresh it and it is few thousands. I
> understand this may not be very accurate. We will soon have more accurate
> data once we start pushing the consumer lag metric to Datadog.
> >
> > But on a separate note, the difference between lags on different
> partitions is way too high. I have attached a tab separated file herewith
> which shows the consumer lag (from Kafka Manager) for the first the 50
> partitions. As is clear, the lag on partition 2 is 530 while the lag on
> partition 18 is 23K. Note that the same VM is pulling data from both the
> partitions.
> >
> >
> >
> >
> > <KafkaLags.tsv>
>
>

Re: Capacity planning for Kafka Streams

Posted by Eno Thereska <en...@gmail.com>.
Thanks,

A couple of things:
- I’d recommend moving to 0.10.2 (latest release) if you can since several improvements were made in the last two releases that make rebalancing and performance better.

- When running on environments with large latency on AWS at least (haven’t tried Google cloud), one parameter we have found useful to increase performance is the receive and send socket size for the consumer and producer in streams. We’d recommend setting them to 1MB like this (where “props” is your own properties object when you start streams):

// the socket buffer needs to be large, especially when running in AWS with
// high latency. if running locally the default is fine.
props.put(ProducerConfig.SEND_BUFFER_CONFIG, 1024 * 1024);
props.put(ConsumerConfig.RECEIVE_BUFFER_CONFIG, 1024 * 1024);

Make sure the OS allows the larger socket size too.

Thanks
Eno

> On Mar 13, 2017, at 9:21 AM, Mahendra Kariya <ma...@go-jek.com> wrote:
> 
> Hi Eno,
> 
> Please find my answers inline.
> 
> 
> We are in the process of documenting capacity planning for streams, stay tuned.
> 
> This would be great! Looking forward to it.
> 
> Could you send some more info on your problem? What Kafka version are you using?
> 
> We are using Kafka 0.10.0.0.
>  
> Are the VMs on the same or different hosts?
> 
> The VMs are on Google Cloud. Two of them are in asia-east1-a and one is in asia-east1-c. All three are n1-standard-4 Ubuntu instances.
>  
> Also what exactly do you mean by “the lag keeps fluctuating”, what metric are you looking at?
> 
> We are looking at Kafka Manager for the time being. By fluctuating, I mean the lag is few thousands at one time, we refresh it the next second, it is in few lakhs, and again refresh it and it is few thousands. I understand this may not be very accurate. We will soon have more accurate data once we start pushing the consumer lag metric to Datadog.
> 
> But on a separate note, the difference between lags on different partitions is way too high. I have attached a tab separated file herewith which shows the consumer lag (from Kafka Manager) for the first the 50 partitions. As is clear, the lag on partition 2 is 530 while the lag on partition 18 is 23K. Note that the same VM is pulling data from both the partitions.
> 
> 
> 
> 
> <KafkaLags.tsv>


Re: Capacity planning for Kafka Streams

Posted by Mahendra Kariya <ma...@go-jek.com>.
Hi Eno,

Please find my answers inline.


We are in the process of documenting capacity planning for streams, stay
> tuned.
>

This would be great! Looking forward to it.

Could you send some more info on your problem? What Kafka version are you
> using?
>

We are using Kafka 0.10.0.0.


> Are the VMs on the same or different hosts?
>

The VMs are on Google Cloud. Two of them are in asia-east1-a and one is in
asia-east1-c. All three are n1-standard-4 Ubuntu instances.


> Also what exactly do you mean by “the lag keeps fluctuating”, what metric
> are you looking at?
>

We are looking at Kafka Manager for the time being. By fluctuating, I mean
the lag is few thousands at one time, we refresh it the next second, it is
in few lakhs, and again refresh it and it is few thousands. I understand
this may not be very accurate. We will soon have more accurate data once we
start pushing the consumer lag metric to Datadog.

But on a separate note, the difference between lags on different partitions
is way too high. I have attached a tab separated file herewith which shows
the consumer lag (from Kafka Manager) for the first the 50 partitions. As
is clear, the lag on partition 2 is 530 while the lag on partition 18 is
23K. Note that the same VM is pulling data from both the partitions.

Re: Capacity planning for Kafka Streams

Posted by Eno Thereska <en...@gmail.com>.
Hi Mahenda,

We are in the process of documenting capacity planning for streams, stay tuned.

Could you send some more info on your problem? What Kafka version are you using? Are the VMs on the same or different hosts? Also what exactly do you mean by “the lag keeps fluctuating”, what metric are you looking at?

Thanks
Eno

> On Mar 13, 2017, at 12:37 AM, Mahendra Kariya <ma...@go-jek.com> wrote:
> 
> Hey All,
> 
> Are there some guidelines / documentation around capacity planning for
> Kafka streams?
> 
> We have a Streams application which consumes messages from a topic with 400
> partitions. At peak time, there are around 20K messages coming into that
> topic per second. The Streams app consumes these messages to do some
> aggregations.
> 
> Currently we have 3 VMs, each of them running 10 threads. The lag keeps
> fluctuating between a few tens to a few lakhs. We have also noticed that
> lag on the partitions being consumed on one particular machine is way
> higher than the other two machines.
> 
> Has anybody faced similar issues? How did you guys resolve it?