You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by dev loper <sp...@gmail.com> on 2017/09/16 13:14:24 UTC

Improving Kafka State Store performance

Hi Kafka Streams Users,

I am trying to improve the performance of Kafka Streams State Store
Persistent Store. In our application we are using Kafka Streams Processor
API  and using Persistent State Store.. My application when starts up it
performing well but over a period of time the performance deteriorated. I
am computing certain results in computeAnalytics method and this method is
not taking time at all. This method is being called within both process and
punctuate and I am storing the updated object back to store. Over the
period of time its taking huge time for completing the punctuate process
and I could see majority of the time is spent in storing the records and
Iterating the records. The record size is just 2500 per partition. I am not
where I am going wrong and how can I improve the performance.

Below is one such sample log record.

INFO  | 07:59:58 | processors.MyProcessor (MyProcessor.java:123) - Time
Metrics for punctuate  for TimeStamp :: 1505564655878 processed Records ::
2109 totalTimeTakenToProcessRecords :: 2 totalTimeTakenToStoreRecord ::
27605toal time Taken to retrieve Records :: 12787 Total time Taken :: 40394

Below I have given my pseudo code for my processor which exactly resembles
the code which I am using in my application.

MyProcessor(){

  process(Key objectkey, Update eventupdate){
   long timestamp=context.timestamp();
   AnalyticeObj storeobj=store.get(objectkey);

   if( storeobj ===null)
         {
          storeobj=new  AnalyticeObj(objectkey,eventupdate,timestamp)
         }
         else
        {
           storeobj.update(eventupdate,timestamp)
        }
     storeobj=storeobj.computeAnalytics();

   store.put(objectkey,storeobj);
  context.commit();
}
// Every 5 seconds
punctuate(long timestamp)
{
 long startTime = System.currentTimeMillis();
long totalTimeTakenToProcessRecords=0;
long totalTimeTakenToStoreRecords=0;
long counter=0;
KeyValueIterator iter=this.visitStore.all();
while (iter.hasNext()) {
KeyValue<Key, AnalyticeObj> entry = iter.next();

    if(AnalyticeObj.hasExpired(timestamp)
         store.remove(entry.key)
      else
      {
        long processStartTime=System.currentTimeMillis();
         AnalyticeObj storeobj= entry.value.computeAnalytics(timestamp);

totalTimeTakenToProcessRecords=totalTimeTakenToProcessRecords+(System.currentTimeMillis()-processStartTime);

         long storeStartTime=System.currentTimeMillis();
          store.put(entry.key,storeobj);

totalTimeTakenToStoreRecords=totalTimeTakenToStoreRecords+(System.currentTimeMillis()-storeStartTime);
       }
   counter++;
}
     logger.info(" Time Metrics for punctuate  "
                    " for TimeStamp :: " + "" + timestamp + " processed
Records :: "
                    + counter +" totalTimeTakenToProcessRecords ::
"+totalTimeTakenToProcessRecords +" totalTimeTakenToStoreRecord ::
"+totalTimeTakenToStoreRecords
                    +"toal time Taken to retrieve Records :: "+
(System.currentTimeMillis() -
(startTime+totalTimeTakenToProcessRecords+totalTimeTakenToStoreRecords))+"
Total time Taken :: " + (System.currentTimeMillis() - startTime));
}
}

Re: Improving Kafka State Store performance

Posted by dev loper <sp...@gmail.com>.
Hi Ted,

I am using  0.11.0.0 . I am not using external state store.  I am using the
persistent store that comes with kafka stream 0.11.0.0.
My assumption is put just inserts my object into the state store. I guess
the state store is manged by rocksdb internally by kafkastreams.

I am not sure why the put is taking time over  period of time.


On Sat, Sep 16, 2017 at 7:07 PM, Ted Yu <yu...@gmail.com> wrote:

> Looking at the calculation of totalTimeTakenToStoreRecords, it covers
> store.put()
> call.
>
> Can you tell us more about what the put() does ?
> Does it involve external key value store ?
>
> Are you using 0.11.0.0 ?
>
> Thanks
>
> On Sat, Sep 16, 2017 at 6:14 AM, dev loper <sp...@gmail.com> wrote:
>
> > Hi Kafka Streams Users,
> >
> > I am trying to improve the performance of Kafka Streams State Store
> > Persistent Store. In our application we are using Kafka Streams Processor
> > API  and using Persistent State Store.. My application when starts up it
> > performing well but over a period of time the performance deteriorated. I
> > am computing certain results in computeAnalytics method and this method
> is
> > not taking time at all. This method is being called within both process
> and
> > punctuate and I am storing the updated object back to store. Over the
> > period of time its taking huge time for completing the punctuate process
> > and I could see majority of the time is spent in storing the records and
> > Iterating the records. The record size is just 2500 per partition. I am
> not
> > where I am going wrong and how can I improve the performance.
> >
> > Below is one such sample log record.
> >
> > INFO  | 07:59:58 | processors.MyProcessor (MyProcessor.java:123) - Time
> > Metrics for punctuate  for TimeStamp :: 1505564655878 processed Records
> ::
> > 2109 totalTimeTakenToProcessRecords :: 2 totalTimeTakenToStoreRecord ::
> > 27605toal time Taken to retrieve Records :: 12787 Total time Taken ::
> 40394
> >
> > Below I have given my pseudo code for my processor which exactly
> resembles
> > the code which I am using in my application.
> >
> > MyProcessor(){
> >
> >   process(Key objectkey, Update eventupdate){
> >    long timestamp=context.timestamp();
> >    AnalyticeObj storeobj=store.get(objectkey);
> >
> >    if( storeobj ===null)
> >          {
> >           storeobj=new  AnalyticeObj(objectkey,eventupdate,timestamp)
> >          }
> >          else
> >         {
> >            storeobj.update(eventupdate,timestamp)
> >         }
> >      storeobj=storeobj.computeAnalytics();
> >
> >    store.put(objectkey,storeobj);
> >   context.commit();
> > }
> > // Every 5 seconds
> > punctuate(long timestamp)
> > {
> >  long startTime = System.currentTimeMillis();
> > long totalTimeTakenToProcessRecords=0;
> > long totalTimeTakenToStoreRecords=0;
> > long counter=0;
> > KeyValueIterator iter=this.visitStore.all();
> > while (iter.hasNext()) {
> > KeyValue<Key, AnalyticeObj> entry = iter.next();
> >
> >     if(AnalyticeObj.hasExpired(timestamp)
> >          store.remove(entry.key)
> >       else
> >       {
> >         long processStartTime=System.currentTimeMillis();
> >          AnalyticeObj storeobj= entry.value.computeAnalytics(timestamp);
> >
> > totalTimeTakenToProcessRecords=totalTimeTakenToProcessRecords
> > +(System.currentTimeMillis()-processStartTime);
> >
> >          long storeStartTime=System.currentTimeMillis();
> >           store.put(entry.key,storeobj);
> >
> > totalTimeTakenToStoreRecords=totalTimeTakenToStoreRecords+(
> > System.currentTimeMillis()-storeStartTime);
> >        }
> >    counter++;
> > }
> >      logger.info(" Time Metrics for punctuate  "
> >                     " for TimeStamp :: " + "" + timestamp + " processed
> > Records :: "
> >                     + counter +" totalTimeTakenToProcessRecords ::
> > "+totalTimeTakenToProcessRecords +" totalTimeTakenToStoreRecord ::
> > "+totalTimeTakenToStoreRecords
> >                     +"toal time Taken to retrieve Records :: "+
> > (System.currentTimeMillis() -
> > (startTime+totalTimeTakenToProcessRecords+totalTimeTakenToStoreRecords)
> )+"
> > Total time Taken :: " + (System.currentTimeMillis() - startTime));
> > }
> > }
> >
>

Re: Improving Kafka State Store performance

Posted by Ted Yu <yu...@gmail.com>.
Looking at the calculation of totalTimeTakenToStoreRecords, it covers
store.put()
call.

Can you tell us more about what the put() does ?
Does it involve external key value store ?

Are you using 0.11.0.0 ?

Thanks

On Sat, Sep 16, 2017 at 6:14 AM, dev loper <sp...@gmail.com> wrote:

> Hi Kafka Streams Users,
>
> I am trying to improve the performance of Kafka Streams State Store
> Persistent Store. In our application we are using Kafka Streams Processor
> API  and using Persistent State Store.. My application when starts up it
> performing well but over a period of time the performance deteriorated. I
> am computing certain results in computeAnalytics method and this method is
> not taking time at all. This method is being called within both process and
> punctuate and I am storing the updated object back to store. Over the
> period of time its taking huge time for completing the punctuate process
> and I could see majority of the time is spent in storing the records and
> Iterating the records. The record size is just 2500 per partition. I am not
> where I am going wrong and how can I improve the performance.
>
> Below is one such sample log record.
>
> INFO  | 07:59:58 | processors.MyProcessor (MyProcessor.java:123) - Time
> Metrics for punctuate  for TimeStamp :: 1505564655878 processed Records ::
> 2109 totalTimeTakenToProcessRecords :: 2 totalTimeTakenToStoreRecord ::
> 27605toal time Taken to retrieve Records :: 12787 Total time Taken :: 40394
>
> Below I have given my pseudo code for my processor which exactly resembles
> the code which I am using in my application.
>
> MyProcessor(){
>
>   process(Key objectkey, Update eventupdate){
>    long timestamp=context.timestamp();
>    AnalyticeObj storeobj=store.get(objectkey);
>
>    if( storeobj ===null)
>          {
>           storeobj=new  AnalyticeObj(objectkey,eventupdate,timestamp)
>          }
>          else
>         {
>            storeobj.update(eventupdate,timestamp)
>         }
>      storeobj=storeobj.computeAnalytics();
>
>    store.put(objectkey,storeobj);
>   context.commit();
> }
> // Every 5 seconds
> punctuate(long timestamp)
> {
>  long startTime = System.currentTimeMillis();
> long totalTimeTakenToProcessRecords=0;
> long totalTimeTakenToStoreRecords=0;
> long counter=0;
> KeyValueIterator iter=this.visitStore.all();
> while (iter.hasNext()) {
> KeyValue<Key, AnalyticeObj> entry = iter.next();
>
>     if(AnalyticeObj.hasExpired(timestamp)
>          store.remove(entry.key)
>       else
>       {
>         long processStartTime=System.currentTimeMillis();
>          AnalyticeObj storeobj= entry.value.computeAnalytics(timestamp);
>
> totalTimeTakenToProcessRecords=totalTimeTakenToProcessRecords
> +(System.currentTimeMillis()-processStartTime);
>
>          long storeStartTime=System.currentTimeMillis();
>           store.put(entry.key,storeobj);
>
> totalTimeTakenToStoreRecords=totalTimeTakenToStoreRecords+(
> System.currentTimeMillis()-storeStartTime);
>        }
>    counter++;
> }
>      logger.info(" Time Metrics for punctuate  "
>                     " for TimeStamp :: " + "" + timestamp + " processed
> Records :: "
>                     + counter +" totalTimeTakenToProcessRecords ::
> "+totalTimeTakenToProcessRecords +" totalTimeTakenToStoreRecord ::
> "+totalTimeTakenToStoreRecords
>                     +"toal time Taken to retrieve Records :: "+
> (System.currentTimeMillis() -
> (startTime+totalTimeTakenToProcessRecords+totalTimeTakenToStoreRecords))+"
> Total time Taken :: " + (System.currentTimeMillis() - startTime));
> }
> }
>

Re: Improving Kafka State Store performance

Posted by Guozhang Wang <wa...@gmail.com>.
Hello,

Since you are running on EBS not SSD, the first suspicion I'd have is its
write and storage amplification. This can possibly be verified from
RocksDB's own stats, and Bill once shared the code to expose such metrics
for investigaion:

https://github.com/facebook/rocksdb/wiki/RocksDB-Tuning-Guide#rocksdb-statistics


Guozhang


On Thu, Sep 21, 2017 at 9:32 AM, Ian Duffy <ia...@ianduffy.ie> wrote:

> Have you checked the EBS burst balance on your disks that the streams
> application is running on?
>
> On 21 September 2017 at 04:28, dev loper <sp...@gmail.com> wrote:
>
> > Hi Bill,
> >
> > I will repeat my tests with Rocks DB enabled and I will revert to you
> with
> > details. I might take 1-2 days to get back to you with details since I am
> > traveling.  But I will try my level best to get it tonight.
> >
> > On Mon, Sep 18, 2017 at 5:30 PM, Bill Bejeck <bi...@confluent.io> wrote:
> >
> > > I'm following up from your other thread as well here.  Thanks for the
> > info
> > > above, that is helpful.
> > >
> > > I think the AWS instance type might be a factor here, but let's do some
> > > more homework first.
> > >
> > > For a next step, we could enable logging for RocksDB so we can observe
> > the
> > > performance.
> > >
> > > Here is some sample code that will allow logging at the INFO level as
> > well
> > > as print out statistics (using RocksDB internal stats) every 15
> minutes.
> > >
> > > Would you mind reverting your Streams application to use a persistent
> > store
> > > again?
> > >
> > > Then let it run until you observe the behavior you described before and
> > if
> > > you don't mind share the logs with me so we can look them over.
> Thanks!
> > >
> > > import org.apache.kafka.streams.state.RocksDBConfigSetter;
> > > import org.rocksdb.InfoLogLevel;
> > > import org.rocksdb.Options;
> > >
> > > import java.util.Map;
> > >
> > > public class RocksDbLogsConfig implements RocksDBConfigSetter {
> > >
> > >     @Override
> > >     public void setConfig(String storeName, Options options,
> Map<String,
> > > Object> configs) {
> > >                    options.setInfoLogLevel(InfoLogLevel.INFO_LEVEL);
> > >                    options.createStatistics();
> > >                    options.setStatsDumpPeriodSec(900);
> > >                    options.setDbLogDir("UPDATE WITH PATH WHERE YOU WANT
> > LOG
> > > FILES");
> > >     }
> > > }
> > >
> > > To use the RocksDbLogsConfig class, you'll need to update your Streams
> > > configs like so:
> > >
> > >   props.put(StreamsConfig.ROCKSDB_CONFIG_SETTER_CLASS_CONFIG,
> > > RocksDbLogsConfig.class);
> > >
> > >
> > >
> > > Thanks
> > > Bill
> > >
> > > On Sat, Sep 16, 2017 at 11:22 PM, dev loper <sp...@gmail.com>
> wrote:
> > >
> > > > Hi Bill.
> > > >
> > > > Thank you pointing out, But in actual code I am calling iter.close()
> in
> > > the
> > > > finally block if the iterator is not null. I don't see any issues
> when
> > I
> > > am
> > > > running it on light traffic. As soon as I switch to production
> traffic
> > I
> > > > start seeing these issues.
> > > >
> > > > Below I have provided additional details about our current
> application.
> > > If
> > > > you are looking for specific logs or details , please let me know. I
> > will
> > > > get the details captured.
> > > >
> > > > In production environment I am receiving 10,000 messages per second.
> > > There
> > > > are 36 partitions  for the topic and there are around 2500 unique
> > > entities
> > > > per partition for which I have to maintain the state.
> > > >
> > > > Below I have mentioned the hardware configuration and number of
> > instances
> > > > we are using for this solution. Please let me know if hardware is the
> > > > limiting factor here. We didn't go for higher configuration since the
> > > load
> > > > average on these instances were quite low and I could hardly see any
> > CPU
> > > > spikes .
> > > >
> > > >
> > > > Kafka Machine Machine Details: - 2 Broker Instances with below
> > > > Configuration ,  (Current CPU Usage 2%- 8%)
> > > >
> > > >  Instance Type : AWS T2 Large
> > > >   Machine Configuration : 2 VCPU;s, 8gb Ram, Storage : EBS
> > > >
> > > > Kafka Streams Instance : 3 Kafka Streams  Application Instances
> > (Current
> > > > CPU Usage 8%- 24%)
> > > >
> > > >     Instance Type : AWS M4 Large
> > > >     Machine Configuration : 2 VCPU;s, 8gb Ram, Storage : EBS
> (Dedicated
> > > EBS
> > > > bandwidth 450 mbps)
> > > >
> > > >
> > > >
> > > > On Sat, Sep 16, 2017 at 10:05 PM, Bill Bejeck <bi...@confluent.io>
> > wrote:
> > > >
> > > > > Hi,
> > > > >
> > > > > It's hard to say exactly without a little more information.
> > > > >
> > > > > On a side note, I don't see where you are closing the
> > KeyValueIterator
> > > in
> > > > > the code above. Not closing a KeyValueIterator on a Permanent State
> > > Store
> > > > > can cause a resource leak over time, so I'd add `iter.close()`
> right
> > > > before
> > > > > your `logger.info` call.  It might be worth retrying at that
> point.
> > > > >
> > > > > Thanks,
> > > > > Bill
> > > > >
> > > > > On Sat, Sep 16, 2017 at 9:14 AM, dev loper <sp...@gmail.com>
> > wrote:
> > > > >
> > > > > > Hi Kafka Streams Users,
> > > > > >
> > > > > > I am trying to improve the performance of Kafka Streams State
> Store
> > > > > > Persistent Store. In our application we are using Kafka Streams
> > > > Processor
> > > > > > API  and using Persistent State Store.. My application when
> starts
> > up
> > > > it
> > > > > > performing well but over a period of time the performance
> > > > deteriorated. I
> > > > > > am computing certain results in computeAnalytics method and this
> > > method
> > > > > is
> > > > > > not taking time at all. This method is being called within both
> > > process
> > > > > and
> > > > > > punctuate and I am storing the updated object back to store. Over
> > the
> > > > > > period of time its taking huge time for completing the punctuate
> > > > process
> > > > > > and I could see majority of the time is spent in storing the
> > records
> > > > and
> > > > > > Iterating the records. The record size is just 2500 per
> partition.
> > I
> > > am
> > > > > not
> > > > > > where I am going wrong and how can I improve the performance.
> > > > > >
> > > > > > Below is one such sample log record.
> > > > > >
> > > > > > INFO  | 07:59:58 | processors.MyProcessor (MyProcessor.java:123)
> -
> > > Time
> > > > > > Metrics for punctuate  for TimeStamp :: 1505564655878 processed
> > > Records
> > > > > ::
> > > > > > 2109 totalTimeTakenToProcessRecords :: 2
> > totalTimeTakenToStoreRecord
> > > ::
> > > > > > 27605toal time Taken to retrieve Records :: 12787 Total time
> Taken
> > ::
> > > > > 40394
> > > > > >
> > > > > > Below I have given my pseudo code for my processor which exactly
> > > > > resembles
> > > > > > the code which I am using in my application.
> > > > > >
> > > > > > MyProcessor(){
> > > > > >
> > > > > >   process(Key objectkey, Update eventupdate){
> > > > > >    long timestamp=context.timestamp();
> > > > > >    AnalyticeObj storeobj=store.get(objectkey);
> > > > > >
> > > > > >    if( storeobj ===null)
> > > > > >          {
> > > > > >           storeobj=new  AnalyticeObj(objectkey,
> > > eventupdate,timestamp)
> > > > > >          }
> > > > > >          else
> > > > > >         {
> > > > > >            storeobj.update(eventupdate,timestamp)
> > > > > >         }
> > > > > >      storeobj=storeobj.computeAnalytics();
> > > > > >
> > > > > >    store.put(objectkey,storeobj);
> > > > > >   context.commit();
> > > > > > }
> > > > > > // Every 5 seconds
> > > > > > punctuate(long timestamp)
> > > > > > {
> > > > > >  long startTime = System.currentTimeMillis();
> > > > > > long totalTimeTakenToProcessRecords=0;
> > > > > > long totalTimeTakenToStoreRecords=0;
> > > > > > long counter=0;
> > > > > > KeyValueIterator iter=this.visitStore.all();
> > > > > > while (iter.hasNext()) {
> > > > > > KeyValue<Key, AnalyticeObj> entry = iter.next();
> > > > > >
> > > > > >     if(AnalyticeObj.hasExpired(timestamp)
> > > > > >          store.remove(entry.key)
> > > > > >       else
> > > > > >       {
> > > > > >         long processStartTime=System.currentTimeMillis();
> > > > > >          AnalyticeObj storeobj= entry.value.computeAnalytics(
> > > > timestamp);
> > > > > >
> > > > > > totalTimeTakenToProcessRecords=totalTimeTakenToProcessRecords
> > > > > > +(System.currentTimeMillis()-processStartTime);
> > > > > >
> > > > > >          long storeStartTime=System.currentTimeMillis();
> > > > > >           store.put(entry.key,storeobj);
> > > > > >
> > > > > > totalTimeTakenToStoreRecords=totalTimeTakenToStoreRecords+(
> > > > > > System.currentTimeMillis()-storeStartTime);
> > > > > >        }
> > > > > >    counter++;
> > > > > > }
> > > > > >      logger.info(" Time Metrics for punctuate  "
> > > > > >                     " for TimeStamp :: " + "" + timestamp + "
> > > processed
> > > > > > Records :: "
> > > > > >                     + counter +" totalTimeTakenToProcessRecords
> ::
> > > > > > "+totalTimeTakenToProcessRecords +" totalTimeTakenToStoreRecord
> ::
> > > > > > "+totalTimeTakenToStoreRecords
> > > > > >                     +"toal time Taken to retrieve Records :: "+
> > > > > > (System.currentTimeMillis() -
> > > > > > (startTime+totalTimeTakenToProcessRecords
> > > > +totalTimeTakenToStoreRecords)
> > > > > )+"
> > > > > > Total time Taken :: " + (System.currentTimeMillis() -
> startTime));
> > > > > > }
> > > > > > }
> > > > > >
> > > > >
> > > >
> > >
> >
>



-- 
-- Guozhang

Re: Improving Kafka State Store performance

Posted by Ian Duffy <ia...@ianduffy.ie>.
Have you checked the EBS burst balance on your disks that the streams
application is running on?

On 21 September 2017 at 04:28, dev loper <sp...@gmail.com> wrote:

> Hi Bill,
>
> I will repeat my tests with Rocks DB enabled and I will revert to you with
> details. I might take 1-2 days to get back to you with details since I am
> traveling.  But I will try my level best to get it tonight.
>
> On Mon, Sep 18, 2017 at 5:30 PM, Bill Bejeck <bi...@confluent.io> wrote:
>
> > I'm following up from your other thread as well here.  Thanks for the
> info
> > above, that is helpful.
> >
> > I think the AWS instance type might be a factor here, but let's do some
> > more homework first.
> >
> > For a next step, we could enable logging for RocksDB so we can observe
> the
> > performance.
> >
> > Here is some sample code that will allow logging at the INFO level as
> well
> > as print out statistics (using RocksDB internal stats) every 15 minutes.
> >
> > Would you mind reverting your Streams application to use a persistent
> store
> > again?
> >
> > Then let it run until you observe the behavior you described before and
> if
> > you don't mind share the logs with me so we can look them over.  Thanks!
> >
> > import org.apache.kafka.streams.state.RocksDBConfigSetter;
> > import org.rocksdb.InfoLogLevel;
> > import org.rocksdb.Options;
> >
> > import java.util.Map;
> >
> > public class RocksDbLogsConfig implements RocksDBConfigSetter {
> >
> >     @Override
> >     public void setConfig(String storeName, Options options, Map<String,
> > Object> configs) {
> >                    options.setInfoLogLevel(InfoLogLevel.INFO_LEVEL);
> >                    options.createStatistics();
> >                    options.setStatsDumpPeriodSec(900);
> >                    options.setDbLogDir("UPDATE WITH PATH WHERE YOU WANT
> LOG
> > FILES");
> >     }
> > }
> >
> > To use the RocksDbLogsConfig class, you'll need to update your Streams
> > configs like so:
> >
> >   props.put(StreamsConfig.ROCKSDB_CONFIG_SETTER_CLASS_CONFIG,
> > RocksDbLogsConfig.class);
> >
> >
> >
> > Thanks
> > Bill
> >
> > On Sat, Sep 16, 2017 at 11:22 PM, dev loper <sp...@gmail.com> wrote:
> >
> > > Hi Bill.
> > >
> > > Thank you pointing out, But in actual code I am calling iter.close() in
> > the
> > > finally block if the iterator is not null. I don't see any issues when
> I
> > am
> > > running it on light traffic. As soon as I switch to production traffic
> I
> > > start seeing these issues.
> > >
> > > Below I have provided additional details about our current application.
> > If
> > > you are looking for specific logs or details , please let me know. I
> will
> > > get the details captured.
> > >
> > > In production environment I am receiving 10,000 messages per second.
> > There
> > > are 36 partitions  for the topic and there are around 2500 unique
> > entities
> > > per partition for which I have to maintain the state.
> > >
> > > Below I have mentioned the hardware configuration and number of
> instances
> > > we are using for this solution. Please let me know if hardware is the
> > > limiting factor here. We didn't go for higher configuration since the
> > load
> > > average on these instances were quite low and I could hardly see any
> CPU
> > > spikes .
> > >
> > >
> > > Kafka Machine Machine Details: - 2 Broker Instances with below
> > > Configuration ,  (Current CPU Usage 2%- 8%)
> > >
> > >  Instance Type : AWS T2 Large
> > >   Machine Configuration : 2 VCPU;s, 8gb Ram, Storage : EBS
> > >
> > > Kafka Streams Instance : 3 Kafka Streams  Application Instances
> (Current
> > > CPU Usage 8%- 24%)
> > >
> > >     Instance Type : AWS M4 Large
> > >     Machine Configuration : 2 VCPU;s, 8gb Ram, Storage : EBS (Dedicated
> > EBS
> > > bandwidth 450 mbps)
> > >
> > >
> > >
> > > On Sat, Sep 16, 2017 at 10:05 PM, Bill Bejeck <bi...@confluent.io>
> wrote:
> > >
> > > > Hi,
> > > >
> > > > It's hard to say exactly without a little more information.
> > > >
> > > > On a side note, I don't see where you are closing the
> KeyValueIterator
> > in
> > > > the code above. Not closing a KeyValueIterator on a Permanent State
> > Store
> > > > can cause a resource leak over time, so I'd add `iter.close()` right
> > > before
> > > > your `logger.info` call.  It might be worth retrying at that point.
> > > >
> > > > Thanks,
> > > > Bill
> > > >
> > > > On Sat, Sep 16, 2017 at 9:14 AM, dev loper <sp...@gmail.com>
> wrote:
> > > >
> > > > > Hi Kafka Streams Users,
> > > > >
> > > > > I am trying to improve the performance of Kafka Streams State Store
> > > > > Persistent Store. In our application we are using Kafka Streams
> > > Processor
> > > > > API  and using Persistent State Store.. My application when starts
> up
> > > it
> > > > > performing well but over a period of time the performance
> > > deteriorated. I
> > > > > am computing certain results in computeAnalytics method and this
> > method
> > > > is
> > > > > not taking time at all. This method is being called within both
> > process
> > > > and
> > > > > punctuate and I am storing the updated object back to store. Over
> the
> > > > > period of time its taking huge time for completing the punctuate
> > > process
> > > > > and I could see majority of the time is spent in storing the
> records
> > > and
> > > > > Iterating the records. The record size is just 2500 per partition.
> I
> > am
> > > > not
> > > > > where I am going wrong and how can I improve the performance.
> > > > >
> > > > > Below is one such sample log record.
> > > > >
> > > > > INFO  | 07:59:58 | processors.MyProcessor (MyProcessor.java:123) -
> > Time
> > > > > Metrics for punctuate  for TimeStamp :: 1505564655878 processed
> > Records
> > > > ::
> > > > > 2109 totalTimeTakenToProcessRecords :: 2
> totalTimeTakenToStoreRecord
> > ::
> > > > > 27605toal time Taken to retrieve Records :: 12787 Total time Taken
> ::
> > > > 40394
> > > > >
> > > > > Below I have given my pseudo code for my processor which exactly
> > > > resembles
> > > > > the code which I am using in my application.
> > > > >
> > > > > MyProcessor(){
> > > > >
> > > > >   process(Key objectkey, Update eventupdate){
> > > > >    long timestamp=context.timestamp();
> > > > >    AnalyticeObj storeobj=store.get(objectkey);
> > > > >
> > > > >    if( storeobj ===null)
> > > > >          {
> > > > >           storeobj=new  AnalyticeObj(objectkey,
> > eventupdate,timestamp)
> > > > >          }
> > > > >          else
> > > > >         {
> > > > >            storeobj.update(eventupdate,timestamp)
> > > > >         }
> > > > >      storeobj=storeobj.computeAnalytics();
> > > > >
> > > > >    store.put(objectkey,storeobj);
> > > > >   context.commit();
> > > > > }
> > > > > // Every 5 seconds
> > > > > punctuate(long timestamp)
> > > > > {
> > > > >  long startTime = System.currentTimeMillis();
> > > > > long totalTimeTakenToProcessRecords=0;
> > > > > long totalTimeTakenToStoreRecords=0;
> > > > > long counter=0;
> > > > > KeyValueIterator iter=this.visitStore.all();
> > > > > while (iter.hasNext()) {
> > > > > KeyValue<Key, AnalyticeObj> entry = iter.next();
> > > > >
> > > > >     if(AnalyticeObj.hasExpired(timestamp)
> > > > >          store.remove(entry.key)
> > > > >       else
> > > > >       {
> > > > >         long processStartTime=System.currentTimeMillis();
> > > > >          AnalyticeObj storeobj= entry.value.computeAnalytics(
> > > timestamp);
> > > > >
> > > > > totalTimeTakenToProcessRecords=totalTimeTakenToProcessRecords
> > > > > +(System.currentTimeMillis()-processStartTime);
> > > > >
> > > > >          long storeStartTime=System.currentTimeMillis();
> > > > >           store.put(entry.key,storeobj);
> > > > >
> > > > > totalTimeTakenToStoreRecords=totalTimeTakenToStoreRecords+(
> > > > > System.currentTimeMillis()-storeStartTime);
> > > > >        }
> > > > >    counter++;
> > > > > }
> > > > >      logger.info(" Time Metrics for punctuate  "
> > > > >                     " for TimeStamp :: " + "" + timestamp + "
> > processed
> > > > > Records :: "
> > > > >                     + counter +" totalTimeTakenToProcessRecords ::
> > > > > "+totalTimeTakenToProcessRecords +" totalTimeTakenToStoreRecord ::
> > > > > "+totalTimeTakenToStoreRecords
> > > > >                     +"toal time Taken to retrieve Records :: "+
> > > > > (System.currentTimeMillis() -
> > > > > (startTime+totalTimeTakenToProcessRecords
> > > +totalTimeTakenToStoreRecords)
> > > > )+"
> > > > > Total time Taken :: " + (System.currentTimeMillis() - startTime));
> > > > > }
> > > > > }
> > > > >
> > > >
> > >
> >
>

Re: Improving Kafka State Store performance

Posted by dev loper <sp...@gmail.com>.
Hi Bill,

I will repeat my tests with Rocks DB enabled and I will revert to you with
details. I might take 1-2 days to get back to you with details since I am
traveling.  But I will try my level best to get it tonight.

On Mon, Sep 18, 2017 at 5:30 PM, Bill Bejeck <bi...@confluent.io> wrote:

> I'm following up from your other thread as well here.  Thanks for the info
> above, that is helpful.
>
> I think the AWS instance type might be a factor here, but let's do some
> more homework first.
>
> For a next step, we could enable logging for RocksDB so we can observe the
> performance.
>
> Here is some sample code that will allow logging at the INFO level as well
> as print out statistics (using RocksDB internal stats) every 15 minutes.
>
> Would you mind reverting your Streams application to use a persistent store
> again?
>
> Then let it run until you observe the behavior you described before and if
> you don't mind share the logs with me so we can look them over.  Thanks!
>
> import org.apache.kafka.streams.state.RocksDBConfigSetter;
> import org.rocksdb.InfoLogLevel;
> import org.rocksdb.Options;
>
> import java.util.Map;
>
> public class RocksDbLogsConfig implements RocksDBConfigSetter {
>
>     @Override
>     public void setConfig(String storeName, Options options, Map<String,
> Object> configs) {
>                    options.setInfoLogLevel(InfoLogLevel.INFO_LEVEL);
>                    options.createStatistics();
>                    options.setStatsDumpPeriodSec(900);
>                    options.setDbLogDir("UPDATE WITH PATH WHERE YOU WANT LOG
> FILES");
>     }
> }
>
> To use the RocksDbLogsConfig class, you'll need to update your Streams
> configs like so:
>
>   props.put(StreamsConfig.ROCKSDB_CONFIG_SETTER_CLASS_CONFIG,
> RocksDbLogsConfig.class);
>
>
>
> Thanks
> Bill
>
> On Sat, Sep 16, 2017 at 11:22 PM, dev loper <sp...@gmail.com> wrote:
>
> > Hi Bill.
> >
> > Thank you pointing out, But in actual code I am calling iter.close() in
> the
> > finally block if the iterator is not null. I don't see any issues when I
> am
> > running it on light traffic. As soon as I switch to production traffic I
> > start seeing these issues.
> >
> > Below I have provided additional details about our current application.
> If
> > you are looking for specific logs or details , please let me know. I will
> > get the details captured.
> >
> > In production environment I am receiving 10,000 messages per second.
> There
> > are 36 partitions  for the topic and there are around 2500 unique
> entities
> > per partition for which I have to maintain the state.
> >
> > Below I have mentioned the hardware configuration and number of instances
> > we are using for this solution. Please let me know if hardware is the
> > limiting factor here. We didn't go for higher configuration since the
> load
> > average on these instances were quite low and I could hardly see any CPU
> > spikes .
> >
> >
> > Kafka Machine Machine Details: - 2 Broker Instances with below
> > Configuration ,  (Current CPU Usage 2%- 8%)
> >
> >  Instance Type : AWS T2 Large
> >   Machine Configuration : 2 VCPU;s, 8gb Ram, Storage : EBS
> >
> > Kafka Streams Instance : 3 Kafka Streams  Application Instances (Current
> > CPU Usage 8%- 24%)
> >
> >     Instance Type : AWS M4 Large
> >     Machine Configuration : 2 VCPU;s, 8gb Ram, Storage : EBS (Dedicated
> EBS
> > bandwidth 450 mbps)
> >
> >
> >
> > On Sat, Sep 16, 2017 at 10:05 PM, Bill Bejeck <bi...@confluent.io> wrote:
> >
> > > Hi,
> > >
> > > It's hard to say exactly without a little more information.
> > >
> > > On a side note, I don't see where you are closing the KeyValueIterator
> in
> > > the code above. Not closing a KeyValueIterator on a Permanent State
> Store
> > > can cause a resource leak over time, so I'd add `iter.close()` right
> > before
> > > your `logger.info` call.  It might be worth retrying at that point.
> > >
> > > Thanks,
> > > Bill
> > >
> > > On Sat, Sep 16, 2017 at 9:14 AM, dev loper <sp...@gmail.com> wrote:
> > >
> > > > Hi Kafka Streams Users,
> > > >
> > > > I am trying to improve the performance of Kafka Streams State Store
> > > > Persistent Store. In our application we are using Kafka Streams
> > Processor
> > > > API  and using Persistent State Store.. My application when starts up
> > it
> > > > performing well but over a period of time the performance
> > deteriorated. I
> > > > am computing certain results in computeAnalytics method and this
> method
> > > is
> > > > not taking time at all. This method is being called within both
> process
> > > and
> > > > punctuate and I am storing the updated object back to store. Over the
> > > > period of time its taking huge time for completing the punctuate
> > process
> > > > and I could see majority of the time is spent in storing the records
> > and
> > > > Iterating the records. The record size is just 2500 per partition. I
> am
> > > not
> > > > where I am going wrong and how can I improve the performance.
> > > >
> > > > Below is one such sample log record.
> > > >
> > > > INFO  | 07:59:58 | processors.MyProcessor (MyProcessor.java:123) -
> Time
> > > > Metrics for punctuate  for TimeStamp :: 1505564655878 processed
> Records
> > > ::
> > > > 2109 totalTimeTakenToProcessRecords :: 2 totalTimeTakenToStoreRecord
> ::
> > > > 27605toal time Taken to retrieve Records :: 12787 Total time Taken ::
> > > 40394
> > > >
> > > > Below I have given my pseudo code for my processor which exactly
> > > resembles
> > > > the code which I am using in my application.
> > > >
> > > > MyProcessor(){
> > > >
> > > >   process(Key objectkey, Update eventupdate){
> > > >    long timestamp=context.timestamp();
> > > >    AnalyticeObj storeobj=store.get(objectkey);
> > > >
> > > >    if( storeobj ===null)
> > > >          {
> > > >           storeobj=new  AnalyticeObj(objectkey,
> eventupdate,timestamp)
> > > >          }
> > > >          else
> > > >         {
> > > >            storeobj.update(eventupdate,timestamp)
> > > >         }
> > > >      storeobj=storeobj.computeAnalytics();
> > > >
> > > >    store.put(objectkey,storeobj);
> > > >   context.commit();
> > > > }
> > > > // Every 5 seconds
> > > > punctuate(long timestamp)
> > > > {
> > > >  long startTime = System.currentTimeMillis();
> > > > long totalTimeTakenToProcessRecords=0;
> > > > long totalTimeTakenToStoreRecords=0;
> > > > long counter=0;
> > > > KeyValueIterator iter=this.visitStore.all();
> > > > while (iter.hasNext()) {
> > > > KeyValue<Key, AnalyticeObj> entry = iter.next();
> > > >
> > > >     if(AnalyticeObj.hasExpired(timestamp)
> > > >          store.remove(entry.key)
> > > >       else
> > > >       {
> > > >         long processStartTime=System.currentTimeMillis();
> > > >          AnalyticeObj storeobj= entry.value.computeAnalytics(
> > timestamp);
> > > >
> > > > totalTimeTakenToProcessRecords=totalTimeTakenToProcessRecords
> > > > +(System.currentTimeMillis()-processStartTime);
> > > >
> > > >          long storeStartTime=System.currentTimeMillis();
> > > >           store.put(entry.key,storeobj);
> > > >
> > > > totalTimeTakenToStoreRecords=totalTimeTakenToStoreRecords+(
> > > > System.currentTimeMillis()-storeStartTime);
> > > >        }
> > > >    counter++;
> > > > }
> > > >      logger.info(" Time Metrics for punctuate  "
> > > >                     " for TimeStamp :: " + "" + timestamp + "
> processed
> > > > Records :: "
> > > >                     + counter +" totalTimeTakenToProcessRecords ::
> > > > "+totalTimeTakenToProcessRecords +" totalTimeTakenToStoreRecord ::
> > > > "+totalTimeTakenToStoreRecords
> > > >                     +"toal time Taken to retrieve Records :: "+
> > > > (System.currentTimeMillis() -
> > > > (startTime+totalTimeTakenToProcessRecords
> > +totalTimeTakenToStoreRecords)
> > > )+"
> > > > Total time Taken :: " + (System.currentTimeMillis() - startTime));
> > > > }
> > > > }
> > > >
> > >
> >
>

Re: Improving Kafka State Store performance

Posted by Bill Bejeck <bi...@confluent.io>.
Understood, but since we haven't updated to use 5.7.3 yet, I think it's
best to test against what is currently deployed.

Thanks.

On Mon, Sep 18, 2017 at 9:56 AM, Ted Yu <yu...@gmail.com> wrote:

> We're using rocksdb 5.3.6
>
> It would make more sense to perform next round of experiment using rocksdb
> 5.7.3 which is latest.
>
> Cheers
>
> On Mon, Sep 18, 2017 at 5:00 AM, Bill Bejeck <bi...@confluent.io> wrote:
>
> > I'm following up from your other thread as well here.  Thanks for the
> info
> > above, that is helpful.
> >
> > I think the AWS instance type might be a factor here, but let's do some
> > more homework first.
> >
> > For a next step, we could enable logging for RocksDB so we can observe
> the
> > performance.
> >
> > Here is some sample code that will allow logging at the INFO level as
> well
> > as print out statistics (using RocksDB internal stats) every 15 minutes.
> >
> > Would you mind reverting your Streams application to use a persistent
> store
> > again?
> >
> > Then let it run until you observe the behavior you described before and
> if
> > you don't mind share the logs with me so we can look them over.  Thanks!
> >
> > import org.apache.kafka.streams.state.RocksDBConfigSetter;
> > import org.rocksdb.InfoLogLevel;
> > import org.rocksdb.Options;
> >
> > import java.util.Map;
> >
> > public class RocksDbLogsConfig implements RocksDBConfigSetter {
> >
> >     @Override
> >     public void setConfig(String storeName, Options options, Map<String,
> > Object> configs) {
> >                    options.setInfoLogLevel(InfoLogLevel.INFO_LEVEL);
> >                    options.createStatistics();
> >                    options.setStatsDumpPeriodSec(900);
> >                    options.setDbLogDir("UPDATE WITH PATH WHERE YOU WANT
> LOG
> > FILES");
> >     }
> > }
> >
> > To use the RocksDbLogsConfig class, you'll need to update your Streams
> > configs like so:
> >
> >   props.put(StreamsConfig.ROCKSDB_CONFIG_SETTER_CLASS_CONFIG,
> > RocksDbLogsConfig.class);
> >
> >
> >
> > Thanks
> > Bill
> >
> > On Sat, Sep 16, 2017 at 11:22 PM, dev loper <sp...@gmail.com> wrote:
> >
> > > Hi Bill.
> > >
> > > Thank you pointing out, But in actual code I am calling iter.close() in
> > the
> > > finally block if the iterator is not null. I don't see any issues when
> I
> > am
> > > running it on light traffic. As soon as I switch to production traffic
> I
> > > start seeing these issues.
> > >
> > > Below I have provided additional details about our current application.
> > If
> > > you are looking for specific logs or details , please let me know. I
> will
> > > get the details captured.
> > >
> > > In production environment I am receiving 10,000 messages per second.
> > There
> > > are 36 partitions  for the topic and there are around 2500 unique
> > entities
> > > per partition for which I have to maintain the state.
> > >
> > > Below I have mentioned the hardware configuration and number of
> instances
> > > we are using for this solution. Please let me know if hardware is the
> > > limiting factor here. We didn't go for higher configuration since the
> > load
> > > average on these instances were quite low and I could hardly see any
> CPU
> > > spikes .
> > >
> > >
> > > Kafka Machine Machine Details: - 2 Broker Instances with below
> > > Configuration ,  (Current CPU Usage 2%- 8%)
> > >
> > >  Instance Type : AWS T2 Large
> > >   Machine Configuration : 2 VCPU;s, 8gb Ram, Storage : EBS
> > >
> > > Kafka Streams Instance : 3 Kafka Streams  Application Instances
> (Current
> > > CPU Usage 8%- 24%)
> > >
> > >     Instance Type : AWS M4 Large
> > >     Machine Configuration : 2 VCPU;s, 8gb Ram, Storage : EBS (Dedicated
> > EBS
> > > bandwidth 450 mbps)
> > >
> > >
> > >
> > > On Sat, Sep 16, 2017 at 10:05 PM, Bill Bejeck <bi...@confluent.io>
> wrote:
> > >
> > > > Hi,
> > > >
> > > > It's hard to say exactly without a little more information.
> > > >
> > > > On a side note, I don't see where you are closing the
> KeyValueIterator
> > in
> > > > the code above. Not closing a KeyValueIterator on a Permanent State
> > Store
> > > > can cause a resource leak over time, so I'd add `iter.close()` right
> > > before
> > > > your `logger.info` call.  It might be worth retrying at that point.
> > > >
> > > > Thanks,
> > > > Bill
> > > >
> > > > On Sat, Sep 16, 2017 at 9:14 AM, dev loper <sp...@gmail.com>
> wrote:
> > > >
> > > > > Hi Kafka Streams Users,
> > > > >
> > > > > I am trying to improve the performance of Kafka Streams State Store
> > > > > Persistent Store. In our application we are using Kafka Streams
> > > Processor
> > > > > API  and using Persistent State Store.. My application when starts
> up
> > > it
> > > > > performing well but over a period of time the performance
> > > deteriorated. I
> > > > > am computing certain results in computeAnalytics method and this
> > method
> > > > is
> > > > > not taking time at all. This method is being called within both
> > process
> > > > and
> > > > > punctuate and I am storing the updated object back to store. Over
> the
> > > > > period of time its taking huge time for completing the punctuate
> > > process
> > > > > and I could see majority of the time is spent in storing the
> records
> > > and
> > > > > Iterating the records. The record size is just 2500 per partition.
> I
> > am
> > > > not
> > > > > where I am going wrong and how can I improve the performance.
> > > > >
> > > > > Below is one such sample log record.
> > > > >
> > > > > INFO  | 07:59:58 | processors.MyProcessor (MyProcessor.java:123) -
> > Time
> > > > > Metrics for punctuate  for TimeStamp :: 1505564655878 processed
> > Records
> > > > ::
> > > > > 2109 totalTimeTakenToProcessRecords :: 2
> totalTimeTakenToStoreRecord
> > ::
> > > > > 27605toal time Taken to retrieve Records :: 12787 Total time Taken
> ::
> > > > 40394
> > > > >
> > > > > Below I have given my pseudo code for my processor which exactly
> > > > resembles
> > > > > the code which I am using in my application.
> > > > >
> > > > > MyProcessor(){
> > > > >
> > > > >   process(Key objectkey, Update eventupdate){
> > > > >    long timestamp=context.timestamp();
> > > > >    AnalyticeObj storeobj=store.get(objectkey);
> > > > >
> > > > >    if( storeobj ===null)
> > > > >          {
> > > > >           storeobj=new  AnalyticeObj(objectkey,
> > eventupdate,timestamp)
> > > > >          }
> > > > >          else
> > > > >         {
> > > > >            storeobj.update(eventupdate,timestamp)
> > > > >         }
> > > > >      storeobj=storeobj.computeAnalytics();
> > > > >
> > > > >    store.put(objectkey,storeobj);
> > > > >   context.commit();
> > > > > }
> > > > > // Every 5 seconds
> > > > > punctuate(long timestamp)
> > > > > {
> > > > >  long startTime = System.currentTimeMillis();
> > > > > long totalTimeTakenToProcessRecords=0;
> > > > > long totalTimeTakenToStoreRecords=0;
> > > > > long counter=0;
> > > > > KeyValueIterator iter=this.visitStore.all();
> > > > > while (iter.hasNext()) {
> > > > > KeyValue<Key, AnalyticeObj> entry = iter.next();
> > > > >
> > > > >     if(AnalyticeObj.hasExpired(timestamp)
> > > > >          store.remove(entry.key)
> > > > >       else
> > > > >       {
> > > > >         long processStartTime=System.currentTimeMillis();
> > > > >          AnalyticeObj storeobj= entry.value.computeAnalytics(
> > > timestamp);
> > > > >
> > > > > totalTimeTakenToProcessRecords=totalTimeTakenToProcessRecords
> > > > > +(System.currentTimeMillis()-processStartTime);
> > > > >
> > > > >          long storeStartTime=System.currentTimeMillis();
> > > > >           store.put(entry.key,storeobj);
> > > > >
> > > > > totalTimeTakenToStoreRecords=totalTimeTakenToStoreRecords+(
> > > > > System.currentTimeMillis()-storeStartTime);
> > > > >        }
> > > > >    counter++;
> > > > > }
> > > > >      logger.info(" Time Metrics for punctuate  "
> > > > >                     " for TimeStamp :: " + "" + timestamp + "
> > processed
> > > > > Records :: "
> > > > >                     + counter +" totalTimeTakenToProcessRecords ::
> > > > > "+totalTimeTakenToProcessRecords +" totalTimeTakenToStoreRecord ::
> > > > > "+totalTimeTakenToStoreRecords
> > > > >                     +"toal time Taken to retrieve Records :: "+
> > > > > (System.currentTimeMillis() -
> > > > > (startTime+totalTimeTakenToProcessRecords
> > > +totalTimeTakenToStoreRecords)
> > > > )+"
> > > > > Total time Taken :: " + (System.currentTimeMillis() - startTime));
> > > > > }
> > > > > }
> > > > >
> > > >
> > >
> >
>

Re: Improving Kafka State Store performance

Posted by Ted Yu <yu...@gmail.com>.
We're using rocksdb 5.3.6

It would make more sense to perform next round of experiment using rocksdb
5.7.3 which is latest.

Cheers

On Mon, Sep 18, 2017 at 5:00 AM, Bill Bejeck <bi...@confluent.io> wrote:

> I'm following up from your other thread as well here.  Thanks for the info
> above, that is helpful.
>
> I think the AWS instance type might be a factor here, but let's do some
> more homework first.
>
> For a next step, we could enable logging for RocksDB so we can observe the
> performance.
>
> Here is some sample code that will allow logging at the INFO level as well
> as print out statistics (using RocksDB internal stats) every 15 minutes.
>
> Would you mind reverting your Streams application to use a persistent store
> again?
>
> Then let it run until you observe the behavior you described before and if
> you don't mind share the logs with me so we can look them over.  Thanks!
>
> import org.apache.kafka.streams.state.RocksDBConfigSetter;
> import org.rocksdb.InfoLogLevel;
> import org.rocksdb.Options;
>
> import java.util.Map;
>
> public class RocksDbLogsConfig implements RocksDBConfigSetter {
>
>     @Override
>     public void setConfig(String storeName, Options options, Map<String,
> Object> configs) {
>                    options.setInfoLogLevel(InfoLogLevel.INFO_LEVEL);
>                    options.createStatistics();
>                    options.setStatsDumpPeriodSec(900);
>                    options.setDbLogDir("UPDATE WITH PATH WHERE YOU WANT LOG
> FILES");
>     }
> }
>
> To use the RocksDbLogsConfig class, you'll need to update your Streams
> configs like so:
>
>   props.put(StreamsConfig.ROCKSDB_CONFIG_SETTER_CLASS_CONFIG,
> RocksDbLogsConfig.class);
>
>
>
> Thanks
> Bill
>
> On Sat, Sep 16, 2017 at 11:22 PM, dev loper <sp...@gmail.com> wrote:
>
> > Hi Bill.
> >
> > Thank you pointing out, But in actual code I am calling iter.close() in
> the
> > finally block if the iterator is not null. I don't see any issues when I
> am
> > running it on light traffic. As soon as I switch to production traffic I
> > start seeing these issues.
> >
> > Below I have provided additional details about our current application.
> If
> > you are looking for specific logs or details , please let me know. I will
> > get the details captured.
> >
> > In production environment I am receiving 10,000 messages per second.
> There
> > are 36 partitions  for the topic and there are around 2500 unique
> entities
> > per partition for which I have to maintain the state.
> >
> > Below I have mentioned the hardware configuration and number of instances
> > we are using for this solution. Please let me know if hardware is the
> > limiting factor here. We didn't go for higher configuration since the
> load
> > average on these instances were quite low and I could hardly see any CPU
> > spikes .
> >
> >
> > Kafka Machine Machine Details: - 2 Broker Instances with below
> > Configuration ,  (Current CPU Usage 2%- 8%)
> >
> >  Instance Type : AWS T2 Large
> >   Machine Configuration : 2 VCPU;s, 8gb Ram, Storage : EBS
> >
> > Kafka Streams Instance : 3 Kafka Streams  Application Instances (Current
> > CPU Usage 8%- 24%)
> >
> >     Instance Type : AWS M4 Large
> >     Machine Configuration : 2 VCPU;s, 8gb Ram, Storage : EBS (Dedicated
> EBS
> > bandwidth 450 mbps)
> >
> >
> >
> > On Sat, Sep 16, 2017 at 10:05 PM, Bill Bejeck <bi...@confluent.io> wrote:
> >
> > > Hi,
> > >
> > > It's hard to say exactly without a little more information.
> > >
> > > On a side note, I don't see where you are closing the KeyValueIterator
> in
> > > the code above. Not closing a KeyValueIterator on a Permanent State
> Store
> > > can cause a resource leak over time, so I'd add `iter.close()` right
> > before
> > > your `logger.info` call.  It might be worth retrying at that point.
> > >
> > > Thanks,
> > > Bill
> > >
> > > On Sat, Sep 16, 2017 at 9:14 AM, dev loper <sp...@gmail.com> wrote:
> > >
> > > > Hi Kafka Streams Users,
> > > >
> > > > I am trying to improve the performance of Kafka Streams State Store
> > > > Persistent Store. In our application we are using Kafka Streams
> > Processor
> > > > API  and using Persistent State Store.. My application when starts up
> > it
> > > > performing well but over a period of time the performance
> > deteriorated. I
> > > > am computing certain results in computeAnalytics method and this
> method
> > > is
> > > > not taking time at all. This method is being called within both
> process
> > > and
> > > > punctuate and I am storing the updated object back to store. Over the
> > > > period of time its taking huge time for completing the punctuate
> > process
> > > > and I could see majority of the time is spent in storing the records
> > and
> > > > Iterating the records. The record size is just 2500 per partition. I
> am
> > > not
> > > > where I am going wrong and how can I improve the performance.
> > > >
> > > > Below is one such sample log record.
> > > >
> > > > INFO  | 07:59:58 | processors.MyProcessor (MyProcessor.java:123) -
> Time
> > > > Metrics for punctuate  for TimeStamp :: 1505564655878 processed
> Records
> > > ::
> > > > 2109 totalTimeTakenToProcessRecords :: 2 totalTimeTakenToStoreRecord
> ::
> > > > 27605toal time Taken to retrieve Records :: 12787 Total time Taken ::
> > > 40394
> > > >
> > > > Below I have given my pseudo code for my processor which exactly
> > > resembles
> > > > the code which I am using in my application.
> > > >
> > > > MyProcessor(){
> > > >
> > > >   process(Key objectkey, Update eventupdate){
> > > >    long timestamp=context.timestamp();
> > > >    AnalyticeObj storeobj=store.get(objectkey);
> > > >
> > > >    if( storeobj ===null)
> > > >          {
> > > >           storeobj=new  AnalyticeObj(objectkey,
> eventupdate,timestamp)
> > > >          }
> > > >          else
> > > >         {
> > > >            storeobj.update(eventupdate,timestamp)
> > > >         }
> > > >      storeobj=storeobj.computeAnalytics();
> > > >
> > > >    store.put(objectkey,storeobj);
> > > >   context.commit();
> > > > }
> > > > // Every 5 seconds
> > > > punctuate(long timestamp)
> > > > {
> > > >  long startTime = System.currentTimeMillis();
> > > > long totalTimeTakenToProcessRecords=0;
> > > > long totalTimeTakenToStoreRecords=0;
> > > > long counter=0;
> > > > KeyValueIterator iter=this.visitStore.all();
> > > > while (iter.hasNext()) {
> > > > KeyValue<Key, AnalyticeObj> entry = iter.next();
> > > >
> > > >     if(AnalyticeObj.hasExpired(timestamp)
> > > >          store.remove(entry.key)
> > > >       else
> > > >       {
> > > >         long processStartTime=System.currentTimeMillis();
> > > >          AnalyticeObj storeobj= entry.value.computeAnalytics(
> > timestamp);
> > > >
> > > > totalTimeTakenToProcessRecords=totalTimeTakenToProcessRecords
> > > > +(System.currentTimeMillis()-processStartTime);
> > > >
> > > >          long storeStartTime=System.currentTimeMillis();
> > > >           store.put(entry.key,storeobj);
> > > >
> > > > totalTimeTakenToStoreRecords=totalTimeTakenToStoreRecords+(
> > > > System.currentTimeMillis()-storeStartTime);
> > > >        }
> > > >    counter++;
> > > > }
> > > >      logger.info(" Time Metrics for punctuate  "
> > > >                     " for TimeStamp :: " + "" + timestamp + "
> processed
> > > > Records :: "
> > > >                     + counter +" totalTimeTakenToProcessRecords ::
> > > > "+totalTimeTakenToProcessRecords +" totalTimeTakenToStoreRecord ::
> > > > "+totalTimeTakenToStoreRecords
> > > >                     +"toal time Taken to retrieve Records :: "+
> > > > (System.currentTimeMillis() -
> > > > (startTime+totalTimeTakenToProcessRecords
> > +totalTimeTakenToStoreRecords)
> > > )+"
> > > > Total time Taken :: " + (System.currentTimeMillis() - startTime));
> > > > }
> > > > }
> > > >
> > >
> >
>

Re: Improving Kafka State Store performance

Posted by Bill Bejeck <bi...@confluent.io>.
I'm following up from your other thread as well here.  Thanks for the info
above, that is helpful.

I think the AWS instance type might be a factor here, but let's do some
more homework first.

For a next step, we could enable logging for RocksDB so we can observe the
performance.

Here is some sample code that will allow logging at the INFO level as well
as print out statistics (using RocksDB internal stats) every 15 minutes.

Would you mind reverting your Streams application to use a persistent store
again?

Then let it run until you observe the behavior you described before and if
you don't mind share the logs with me so we can look them over.  Thanks!

import org.apache.kafka.streams.state.RocksDBConfigSetter;
import org.rocksdb.InfoLogLevel;
import org.rocksdb.Options;

import java.util.Map;

public class RocksDbLogsConfig implements RocksDBConfigSetter {

    @Override
    public void setConfig(String storeName, Options options, Map<String,
Object> configs) {
                   options.setInfoLogLevel(InfoLogLevel.INFO_LEVEL);
                   options.createStatistics();
                   options.setStatsDumpPeriodSec(900);
                   options.setDbLogDir("UPDATE WITH PATH WHERE YOU WANT LOG
FILES");
    }
}

To use the RocksDbLogsConfig class, you'll need to update your Streams
configs like so:

  props.put(StreamsConfig.ROCKSDB_CONFIG_SETTER_CLASS_CONFIG,
RocksDbLogsConfig.class);



Thanks
Bill

On Sat, Sep 16, 2017 at 11:22 PM, dev loper <sp...@gmail.com> wrote:

> Hi Bill.
>
> Thank you pointing out, But in actual code I am calling iter.close() in the
> finally block if the iterator is not null. I don't see any issues when I am
> running it on light traffic. As soon as I switch to production traffic I
> start seeing these issues.
>
> Below I have provided additional details about our current application. If
> you are looking for specific logs or details , please let me know. I will
> get the details captured.
>
> In production environment I am receiving 10,000 messages per second. There
> are 36 partitions  for the topic and there are around 2500 unique entities
> per partition for which I have to maintain the state.
>
> Below I have mentioned the hardware configuration and number of instances
> we are using for this solution. Please let me know if hardware is the
> limiting factor here. We didn't go for higher configuration since the load
> average on these instances were quite low and I could hardly see any CPU
> spikes .
>
>
> Kafka Machine Machine Details: - 2 Broker Instances with below
> Configuration ,  (Current CPU Usage 2%- 8%)
>
>  Instance Type : AWS T2 Large
>   Machine Configuration : 2 VCPU;s, 8gb Ram, Storage : EBS
>
> Kafka Streams Instance : 3 Kafka Streams  Application Instances (Current
> CPU Usage 8%- 24%)
>
>     Instance Type : AWS M4 Large
>     Machine Configuration : 2 VCPU;s, 8gb Ram, Storage : EBS (Dedicated EBS
> bandwidth 450 mbps)
>
>
>
> On Sat, Sep 16, 2017 at 10:05 PM, Bill Bejeck <bi...@confluent.io> wrote:
>
> > Hi,
> >
> > It's hard to say exactly without a little more information.
> >
> > On a side note, I don't see where you are closing the KeyValueIterator in
> > the code above. Not closing a KeyValueIterator on a Permanent State Store
> > can cause a resource leak over time, so I'd add `iter.close()` right
> before
> > your `logger.info` call.  It might be worth retrying at that point.
> >
> > Thanks,
> > Bill
> >
> > On Sat, Sep 16, 2017 at 9:14 AM, dev loper <sp...@gmail.com> wrote:
> >
> > > Hi Kafka Streams Users,
> > >
> > > I am trying to improve the performance of Kafka Streams State Store
> > > Persistent Store. In our application we are using Kafka Streams
> Processor
> > > API  and using Persistent State Store.. My application when starts up
> it
> > > performing well but over a period of time the performance
> deteriorated. I
> > > am computing certain results in computeAnalytics method and this method
> > is
> > > not taking time at all. This method is being called within both process
> > and
> > > punctuate and I am storing the updated object back to store. Over the
> > > period of time its taking huge time for completing the punctuate
> process
> > > and I could see majority of the time is spent in storing the records
> and
> > > Iterating the records. The record size is just 2500 per partition. I am
> > not
> > > where I am going wrong and how can I improve the performance.
> > >
> > > Below is one such sample log record.
> > >
> > > INFO  | 07:59:58 | processors.MyProcessor (MyProcessor.java:123) - Time
> > > Metrics for punctuate  for TimeStamp :: 1505564655878 processed Records
> > ::
> > > 2109 totalTimeTakenToProcessRecords :: 2 totalTimeTakenToStoreRecord ::
> > > 27605toal time Taken to retrieve Records :: 12787 Total time Taken ::
> > 40394
> > >
> > > Below I have given my pseudo code for my processor which exactly
> > resembles
> > > the code which I am using in my application.
> > >
> > > MyProcessor(){
> > >
> > >   process(Key objectkey, Update eventupdate){
> > >    long timestamp=context.timestamp();
> > >    AnalyticeObj storeobj=store.get(objectkey);
> > >
> > >    if( storeobj ===null)
> > >          {
> > >           storeobj=new  AnalyticeObj(objectkey,eventupdate,timestamp)
> > >          }
> > >          else
> > >         {
> > >            storeobj.update(eventupdate,timestamp)
> > >         }
> > >      storeobj=storeobj.computeAnalytics();
> > >
> > >    store.put(objectkey,storeobj);
> > >   context.commit();
> > > }
> > > // Every 5 seconds
> > > punctuate(long timestamp)
> > > {
> > >  long startTime = System.currentTimeMillis();
> > > long totalTimeTakenToProcessRecords=0;
> > > long totalTimeTakenToStoreRecords=0;
> > > long counter=0;
> > > KeyValueIterator iter=this.visitStore.all();
> > > while (iter.hasNext()) {
> > > KeyValue<Key, AnalyticeObj> entry = iter.next();
> > >
> > >     if(AnalyticeObj.hasExpired(timestamp)
> > >          store.remove(entry.key)
> > >       else
> > >       {
> > >         long processStartTime=System.currentTimeMillis();
> > >          AnalyticeObj storeobj= entry.value.computeAnalytics(
> timestamp);
> > >
> > > totalTimeTakenToProcessRecords=totalTimeTakenToProcessRecords
> > > +(System.currentTimeMillis()-processStartTime);
> > >
> > >          long storeStartTime=System.currentTimeMillis();
> > >           store.put(entry.key,storeobj);
> > >
> > > totalTimeTakenToStoreRecords=totalTimeTakenToStoreRecords+(
> > > System.currentTimeMillis()-storeStartTime);
> > >        }
> > >    counter++;
> > > }
> > >      logger.info(" Time Metrics for punctuate  "
> > >                     " for TimeStamp :: " + "" + timestamp + " processed
> > > Records :: "
> > >                     + counter +" totalTimeTakenToProcessRecords ::
> > > "+totalTimeTakenToProcessRecords +" totalTimeTakenToStoreRecord ::
> > > "+totalTimeTakenToStoreRecords
> > >                     +"toal time Taken to retrieve Records :: "+
> > > (System.currentTimeMillis() -
> > > (startTime+totalTimeTakenToProcessRecords
> +totalTimeTakenToStoreRecords)
> > )+"
> > > Total time Taken :: " + (System.currentTimeMillis() - startTime));
> > > }
> > > }
> > >
> >
>

Re: Improving Kafka State Store performance

Posted by dev loper <sp...@gmail.com>.
@James,
I will enable the monitoring metrics and try out. But I seriously think the
performance
 degradation was due to rocks DB. As soon as I switched to in Memory State
Store all my problems disappeared.

@Sabarish,

 Only my kafka instances are on T2 Instance. My application instances are
on M4 large instance. Thank you very much for pointing out your experience
with rocksDB. I switched to InMemory State Store after your suggestion and
all my problems disappeared. Maybe if I switch to  SSD storage , I  might
get  able to get better performance from Rocks DB , Currently it was
running on EBS. But i don't think the current level of performance The
current experience with ROCKS DB was terrible it was degrading over the
period of time. If you haven't pointer out your experience with Rocks DB I
wouldn't have tried it out.





On Sun, Sep 17, 2017 at 3:25 PM, Sabarish Sasidharan <sabarish.spk@gmail.com
> wrote:

> T2 instances are credit based instances that do not provide constant
> throughout in cpu and io. You would want to reconsider using t2.
>
> That said we too face issues with scaling rocksdb. Although I don't
> remember the performance benchmark numbers but I do remember it was at
> least 3x slower when compared to hashmap storage. Every lookup, every write
> contributed to the degradation. Changelogging had negligible overhead btw.
>
> Regards
> Sab
>
> On 17 Sep 2017 8:52 am, "dev loper" <sp...@gmail.com> wrote:
>
> > Hi Bill.
> >
> > Thank you pointing out, But in actual code I am calling iter.close() in
> the
> > finally block if the iterator is not null. I don't see any issues when I
> am
> > running it on light traffic. As soon as I switch to production traffic I
> > start seeing these issues.
> >
> > Below I have provided additional details about our current application.
> If
> > you are looking for specific logs or details , please let me know. I will
> > get the details captured.
> >
> > In production environment I am receiving 10,000 messages per second.
> There
> > are 36 partitions  for the topic and there are around 2500 unique
> entities
> > per partition for which I have to maintain the state.
> >
> > Below I have mentioned the hardware configuration and number of instances
> > we are using for this solution. Please let me know if hardware is the
> > limiting factor here. We didn't go for higher configuration since the
> load
> > average on these instances were quite low and I could hardly see any CPU
> > spikes .
> >
> >
> > Kafka Machine Machine Details: - 2 Broker Instances with below
> > Configuration ,  (Current CPU Usage 2%- 8%)
> >
> >  Instance Type : AWS T2 Large
> >   Machine Configuration : 2 VCPU;s, 8gb Ram, Storage : EBS
> >
> > Kafka Streams Instance : 3 Kafka Streams  Application Instances (Current
> > CPU Usage 8%- 24%)
> >
> >     Instance Type : AWS M4 Large
> >     Machine Configuration : 2 VCPU;s, 8gb Ram, Storage : EBS (Dedicated
> EBS
> > bandwidth 450 mbps)
> >
> >
> >
> > On Sat, Sep 16, 2017 at 10:05 PM, Bill Bejeck <bi...@confluent.io> wrote:
> >
> > > Hi,
> > >
> > > It's hard to say exactly without a little more information.
> > >
> > > On a side note, I don't see where you are closing the KeyValueIterator
> in
> > > the code above. Not closing a KeyValueIterator on a Permanent State
> Store
> > > can cause a resource leak over time, so I'd add `iter.close()` right
> > before
> > > your `logger.info` call.  It might be worth retrying at that point.
> > >
> > > Thanks,
> > > Bill
> > >
> > > On Sat, Sep 16, 2017 at 9:14 AM, dev loper <sp...@gmail.com> wrote:
> > >
> > > > Hi Kafka Streams Users,
> > > >
> > > > I am trying to improve the performance of Kafka Streams State Store
> > > > Persistent Store. In our application we are using Kafka Streams
> > Processor
> > > > API  and using Persistent State Store.. My application when starts up
> > it
> > > > performing well but over a period of time the performance
> > deteriorated. I
> > > > am computing certain results in computeAnalytics method and this
> method
> > > is
> > > > not taking time at all. This method is being called within both
> process
> > > and
> > > > punctuate and I am storing the updated object back to store. Over the
> > > > period of time its taking huge time for completing the punctuate
> > process
> > > > and I could see majority of the time is spent in storing the records
> > and
> > > > Iterating the records. The record size is just 2500 per partition. I
> am
> > > not
> > > > where I am going wrong and how can I improve the performance.
> > > >
> > > > Below is one such sample log record.
> > > >
> > > > INFO  | 07:59:58 | processors.MyProcessor (MyProcessor.java:123) -
> Time
> > > > Metrics for punctuate  for TimeStamp :: 1505564655878 processed
> Records
> > > ::
> > > > 2109 totalTimeTakenToProcessRecords :: 2 totalTimeTakenToStoreRecord
> ::
> > > > 27605toal time Taken to retrieve Records :: 12787 Total time Taken ::
> > > 40394
> > > >
> > > > Below I have given my pseudo code for my processor which exactly
> > > resembles
> > > > the code which I am using in my application.
> > > >
> > > > MyProcessor(){
> > > >
> > > >   process(Key objectkey, Update eventupdate){
> > > >    long timestamp=context.timestamp();
> > > >    AnalyticeObj storeobj=store.get(objectkey);
> > > >
> > > >    if( storeobj ===null)
> > > >          {
> > > >           storeobj=new  AnalyticeObj(objectkey,
> eventupdate,timestamp)
> > > >          }
> > > >          else
> > > >         {
> > > >            storeobj.update(eventupdate,timestamp)
> > > >         }
> > > >      storeobj=storeobj.computeAnalytics();
> > > >
> > > >    store.put(objectkey,storeobj);
> > > >   context.commit();
> > > > }
> > > > // Every 5 seconds
> > > > punctuate(long timestamp)
> > > > {
> > > >  long startTime = System.currentTimeMillis();
> > > > long totalTimeTakenToProcessRecords=0;
> > > > long totalTimeTakenToStoreRecords=0;
> > > > long counter=0;
> > > > KeyValueIterator iter=this.visitStore.all();
> > > > while (iter.hasNext()) {
> > > > KeyValue<Key, AnalyticeObj> entry = iter.next();
> > > >
> > > >     if(AnalyticeObj.hasExpired(timestamp)
> > > >          store.remove(entry.key)
> > > >       else
> > > >       {
> > > >         long processStartTime=System.currentTimeMillis();
> > > >          AnalyticeObj storeobj= entry.value.computeAnalytics(
> > timestamp);
> > > >
> > > > totalTimeTakenToProcessRecords=totalTimeTakenToProcessRecords
> > > > +(System.currentTimeMillis()-processStartTime);
> > > >
> > > >          long storeStartTime=System.currentTimeMillis();
> > > >           store.put(entry.key,storeobj);
> > > >
> > > > totalTimeTakenToStoreRecords=totalTimeTakenToStoreRecords+(
> > > > System.currentTimeMillis()-storeStartTime);
> > > >        }
> > > >    counter++;
> > > > }
> > > >      logger.info(" Time Metrics for punctuate  "
> > > >                     " for TimeStamp :: " + "" + timestamp + "
> processed
> > > > Records :: "
> > > >                     + counter +" totalTimeTakenToProcessRecords ::
> > > > "+totalTimeTakenToProcessRecords +" totalTimeTakenToStoreRecord ::
> > > > "+totalTimeTakenToStoreRecords
> > > >                     +"toal time Taken to retrieve Records :: "+
> > > > (System.currentTimeMillis() -
> > > > (startTime+totalTimeTakenToProcessRecords
> > +totalTimeTakenToStoreRecords)
> > > )+"
> > > > Total time Taken :: " + (System.currentTimeMillis() - startTime));
> > > > }
> > > > }
> > > >
> > >
> >
>

Re: Improving Kafka State Store performance

Posted by Sabarish Sasidharan <sa...@gmail.com>.
T2 instances are credit based instances that do not provide constant
throughout in cpu and io. You would want to reconsider using t2.

That said we too face issues with scaling rocksdb. Although I don't
remember the performance benchmark numbers but I do remember it was at
least 3x slower when compared to hashmap storage. Every lookup, every write
contributed to the degradation. Changelogging had negligible overhead btw.

Regards
Sab

On 17 Sep 2017 8:52 am, "dev loper" <sp...@gmail.com> wrote:

> Hi Bill.
>
> Thank you pointing out, But in actual code I am calling iter.close() in the
> finally block if the iterator is not null. I don't see any issues when I am
> running it on light traffic. As soon as I switch to production traffic I
> start seeing these issues.
>
> Below I have provided additional details about our current application. If
> you are looking for specific logs or details , please let me know. I will
> get the details captured.
>
> In production environment I am receiving 10,000 messages per second. There
> are 36 partitions  for the topic and there are around 2500 unique entities
> per partition for which I have to maintain the state.
>
> Below I have mentioned the hardware configuration and number of instances
> we are using for this solution. Please let me know if hardware is the
> limiting factor here. We didn't go for higher configuration since the load
> average on these instances were quite low and I could hardly see any CPU
> spikes .
>
>
> Kafka Machine Machine Details: - 2 Broker Instances with below
> Configuration ,  (Current CPU Usage 2%- 8%)
>
>  Instance Type : AWS T2 Large
>   Machine Configuration : 2 VCPU;s, 8gb Ram, Storage : EBS
>
> Kafka Streams Instance : 3 Kafka Streams  Application Instances (Current
> CPU Usage 8%- 24%)
>
>     Instance Type : AWS M4 Large
>     Machine Configuration : 2 VCPU;s, 8gb Ram, Storage : EBS (Dedicated EBS
> bandwidth 450 mbps)
>
>
>
> On Sat, Sep 16, 2017 at 10:05 PM, Bill Bejeck <bi...@confluent.io> wrote:
>
> > Hi,
> >
> > It's hard to say exactly without a little more information.
> >
> > On a side note, I don't see where you are closing the KeyValueIterator in
> > the code above. Not closing a KeyValueIterator on a Permanent State Store
> > can cause a resource leak over time, so I'd add `iter.close()` right
> before
> > your `logger.info` call.  It might be worth retrying at that point.
> >
> > Thanks,
> > Bill
> >
> > On Sat, Sep 16, 2017 at 9:14 AM, dev loper <sp...@gmail.com> wrote:
> >
> > > Hi Kafka Streams Users,
> > >
> > > I am trying to improve the performance of Kafka Streams State Store
> > > Persistent Store. In our application we are using Kafka Streams
> Processor
> > > API  and using Persistent State Store.. My application when starts up
> it
> > > performing well but over a period of time the performance
> deteriorated. I
> > > am computing certain results in computeAnalytics method and this method
> > is
> > > not taking time at all. This method is being called within both process
> > and
> > > punctuate and I am storing the updated object back to store. Over the
> > > period of time its taking huge time for completing the punctuate
> process
> > > and I could see majority of the time is spent in storing the records
> and
> > > Iterating the records. The record size is just 2500 per partition. I am
> > not
> > > where I am going wrong and how can I improve the performance.
> > >
> > > Below is one such sample log record.
> > >
> > > INFO  | 07:59:58 | processors.MyProcessor (MyProcessor.java:123) - Time
> > > Metrics for punctuate  for TimeStamp :: 1505564655878 processed Records
> > ::
> > > 2109 totalTimeTakenToProcessRecords :: 2 totalTimeTakenToStoreRecord ::
> > > 27605toal time Taken to retrieve Records :: 12787 Total time Taken ::
> > 40394
> > >
> > > Below I have given my pseudo code for my processor which exactly
> > resembles
> > > the code which I am using in my application.
> > >
> > > MyProcessor(){
> > >
> > >   process(Key objectkey, Update eventupdate){
> > >    long timestamp=context.timestamp();
> > >    AnalyticeObj storeobj=store.get(objectkey);
> > >
> > >    if( storeobj ===null)
> > >          {
> > >           storeobj=new  AnalyticeObj(objectkey,eventupdate,timestamp)
> > >          }
> > >          else
> > >         {
> > >            storeobj.update(eventupdate,timestamp)
> > >         }
> > >      storeobj=storeobj.computeAnalytics();
> > >
> > >    store.put(objectkey,storeobj);
> > >   context.commit();
> > > }
> > > // Every 5 seconds
> > > punctuate(long timestamp)
> > > {
> > >  long startTime = System.currentTimeMillis();
> > > long totalTimeTakenToProcessRecords=0;
> > > long totalTimeTakenToStoreRecords=0;
> > > long counter=0;
> > > KeyValueIterator iter=this.visitStore.all();
> > > while (iter.hasNext()) {
> > > KeyValue<Key, AnalyticeObj> entry = iter.next();
> > >
> > >     if(AnalyticeObj.hasExpired(timestamp)
> > >          store.remove(entry.key)
> > >       else
> > >       {
> > >         long processStartTime=System.currentTimeMillis();
> > >          AnalyticeObj storeobj= entry.value.computeAnalytics(
> timestamp);
> > >
> > > totalTimeTakenToProcessRecords=totalTimeTakenToProcessRecords
> > > +(System.currentTimeMillis()-processStartTime);
> > >
> > >          long storeStartTime=System.currentTimeMillis();
> > >           store.put(entry.key,storeobj);
> > >
> > > totalTimeTakenToStoreRecords=totalTimeTakenToStoreRecords+(
> > > System.currentTimeMillis()-storeStartTime);
> > >        }
> > >    counter++;
> > > }
> > >      logger.info(" Time Metrics for punctuate  "
> > >                     " for TimeStamp :: " + "" + timestamp + " processed
> > > Records :: "
> > >                     + counter +" totalTimeTakenToProcessRecords ::
> > > "+totalTimeTakenToProcessRecords +" totalTimeTakenToStoreRecord ::
> > > "+totalTimeTakenToStoreRecords
> > >                     +"toal time Taken to retrieve Records :: "+
> > > (System.currentTimeMillis() -
> > > (startTime+totalTimeTakenToProcessRecords
> +totalTimeTakenToStoreRecords)
> > )+"
> > > Total time Taken :: " + (System.currentTimeMillis() - startTime));
> > > }
> > > }
> > >
> >
>

Re: Improving Kafka State Store performance

Posted by dev loper <sp...@gmail.com>.
Hi Bill.

Thank you pointing out, But in actual code I am calling iter.close() in the
finally block if the iterator is not null. I don't see any issues when I am
running it on light traffic. As soon as I switch to production traffic I
start seeing these issues.

Below I have provided additional details about our current application. If
you are looking for specific logs or details , please let me know. I will
get the details captured.

In production environment I am receiving 10,000 messages per second. There
are 36 partitions  for the topic and there are around 2500 unique entities
per partition for which I have to maintain the state.

Below I have mentioned the hardware configuration and number of instances
we are using for this solution. Please let me know if hardware is the
limiting factor here. We didn't go for higher configuration since the load
average on these instances were quite low and I could hardly see any CPU
spikes .


Kafka Machine Machine Details: - 2 Broker Instances with below
Configuration ,  (Current CPU Usage 2%- 8%)

 Instance Type : AWS T2 Large
  Machine Configuration : 2 VCPU;s, 8gb Ram, Storage : EBS

Kafka Streams Instance : 3 Kafka Streams  Application Instances (Current
CPU Usage 8%- 24%)

    Instance Type : AWS M4 Large
    Machine Configuration : 2 VCPU;s, 8gb Ram, Storage : EBS (Dedicated EBS
bandwidth 450 mbps)



On Sat, Sep 16, 2017 at 10:05 PM, Bill Bejeck <bi...@confluent.io> wrote:

> Hi,
>
> It's hard to say exactly without a little more information.
>
> On a side note, I don't see where you are closing the KeyValueIterator in
> the code above. Not closing a KeyValueIterator on a Permanent State Store
> can cause a resource leak over time, so I'd add `iter.close()` right before
> your `logger.info` call.  It might be worth retrying at that point.
>
> Thanks,
> Bill
>
> On Sat, Sep 16, 2017 at 9:14 AM, dev loper <sp...@gmail.com> wrote:
>
> > Hi Kafka Streams Users,
> >
> > I am trying to improve the performance of Kafka Streams State Store
> > Persistent Store. In our application we are using Kafka Streams Processor
> > API  and using Persistent State Store.. My application when starts up it
> > performing well but over a period of time the performance deteriorated. I
> > am computing certain results in computeAnalytics method and this method
> is
> > not taking time at all. This method is being called within both process
> and
> > punctuate and I am storing the updated object back to store. Over the
> > period of time its taking huge time for completing the punctuate process
> > and I could see majority of the time is spent in storing the records and
> > Iterating the records. The record size is just 2500 per partition. I am
> not
> > where I am going wrong and how can I improve the performance.
> >
> > Below is one such sample log record.
> >
> > INFO  | 07:59:58 | processors.MyProcessor (MyProcessor.java:123) - Time
> > Metrics for punctuate  for TimeStamp :: 1505564655878 processed Records
> ::
> > 2109 totalTimeTakenToProcessRecords :: 2 totalTimeTakenToStoreRecord ::
> > 27605toal time Taken to retrieve Records :: 12787 Total time Taken ::
> 40394
> >
> > Below I have given my pseudo code for my processor which exactly
> resembles
> > the code which I am using in my application.
> >
> > MyProcessor(){
> >
> >   process(Key objectkey, Update eventupdate){
> >    long timestamp=context.timestamp();
> >    AnalyticeObj storeobj=store.get(objectkey);
> >
> >    if( storeobj ===null)
> >          {
> >           storeobj=new  AnalyticeObj(objectkey,eventupdate,timestamp)
> >          }
> >          else
> >         {
> >            storeobj.update(eventupdate,timestamp)
> >         }
> >      storeobj=storeobj.computeAnalytics();
> >
> >    store.put(objectkey,storeobj);
> >   context.commit();
> > }
> > // Every 5 seconds
> > punctuate(long timestamp)
> > {
> >  long startTime = System.currentTimeMillis();
> > long totalTimeTakenToProcessRecords=0;
> > long totalTimeTakenToStoreRecords=0;
> > long counter=0;
> > KeyValueIterator iter=this.visitStore.all();
> > while (iter.hasNext()) {
> > KeyValue<Key, AnalyticeObj> entry = iter.next();
> >
> >     if(AnalyticeObj.hasExpired(timestamp)
> >          store.remove(entry.key)
> >       else
> >       {
> >         long processStartTime=System.currentTimeMillis();
> >          AnalyticeObj storeobj= entry.value.computeAnalytics(timestamp);
> >
> > totalTimeTakenToProcessRecords=totalTimeTakenToProcessRecords
> > +(System.currentTimeMillis()-processStartTime);
> >
> >          long storeStartTime=System.currentTimeMillis();
> >           store.put(entry.key,storeobj);
> >
> > totalTimeTakenToStoreRecords=totalTimeTakenToStoreRecords+(
> > System.currentTimeMillis()-storeStartTime);
> >        }
> >    counter++;
> > }
> >      logger.info(" Time Metrics for punctuate  "
> >                     " for TimeStamp :: " + "" + timestamp + " processed
> > Records :: "
> >                     + counter +" totalTimeTakenToProcessRecords ::
> > "+totalTimeTakenToProcessRecords +" totalTimeTakenToStoreRecord ::
> > "+totalTimeTakenToStoreRecords
> >                     +"toal time Taken to retrieve Records :: "+
> > (System.currentTimeMillis() -
> > (startTime+totalTimeTakenToProcessRecords+totalTimeTakenToStoreRecords)
> )+"
> > Total time Taken :: " + (System.currentTimeMillis() - startTime));
> > }
> > }
> >
>

Re: Improving Kafka State Store performance

Posted by Bill Bejeck <bi...@confluent.io>.
Hi,

It's hard to say exactly without a little more information.

On a side note, I don't see where you are closing the KeyValueIterator in
the code above. Not closing a KeyValueIterator on a Permanent State Store
can cause a resource leak over time, so I'd add `iter.close()` right before
your `logger.info` call.  It might be worth retrying at that point.

Thanks,
Bill

On Sat, Sep 16, 2017 at 9:14 AM, dev loper <sp...@gmail.com> wrote:

> Hi Kafka Streams Users,
>
> I am trying to improve the performance of Kafka Streams State Store
> Persistent Store. In our application we are using Kafka Streams Processor
> API  and using Persistent State Store.. My application when starts up it
> performing well but over a period of time the performance deteriorated. I
> am computing certain results in computeAnalytics method and this method is
> not taking time at all. This method is being called within both process and
> punctuate and I am storing the updated object back to store. Over the
> period of time its taking huge time for completing the punctuate process
> and I could see majority of the time is spent in storing the records and
> Iterating the records. The record size is just 2500 per partition. I am not
> where I am going wrong and how can I improve the performance.
>
> Below is one such sample log record.
>
> INFO  | 07:59:58 | processors.MyProcessor (MyProcessor.java:123) - Time
> Metrics for punctuate  for TimeStamp :: 1505564655878 processed Records ::
> 2109 totalTimeTakenToProcessRecords :: 2 totalTimeTakenToStoreRecord ::
> 27605toal time Taken to retrieve Records :: 12787 Total time Taken :: 40394
>
> Below I have given my pseudo code for my processor which exactly resembles
> the code which I am using in my application.
>
> MyProcessor(){
>
>   process(Key objectkey, Update eventupdate){
>    long timestamp=context.timestamp();
>    AnalyticeObj storeobj=store.get(objectkey);
>
>    if( storeobj ===null)
>          {
>           storeobj=new  AnalyticeObj(objectkey,eventupdate,timestamp)
>          }
>          else
>         {
>            storeobj.update(eventupdate,timestamp)
>         }
>      storeobj=storeobj.computeAnalytics();
>
>    store.put(objectkey,storeobj);
>   context.commit();
> }
> // Every 5 seconds
> punctuate(long timestamp)
> {
>  long startTime = System.currentTimeMillis();
> long totalTimeTakenToProcessRecords=0;
> long totalTimeTakenToStoreRecords=0;
> long counter=0;
> KeyValueIterator iter=this.visitStore.all();
> while (iter.hasNext()) {
> KeyValue<Key, AnalyticeObj> entry = iter.next();
>
>     if(AnalyticeObj.hasExpired(timestamp)
>          store.remove(entry.key)
>       else
>       {
>         long processStartTime=System.currentTimeMillis();
>          AnalyticeObj storeobj= entry.value.computeAnalytics(timestamp);
>
> totalTimeTakenToProcessRecords=totalTimeTakenToProcessRecords
> +(System.currentTimeMillis()-processStartTime);
>
>          long storeStartTime=System.currentTimeMillis();
>           store.put(entry.key,storeobj);
>
> totalTimeTakenToStoreRecords=totalTimeTakenToStoreRecords+(
> System.currentTimeMillis()-storeStartTime);
>        }
>    counter++;
> }
>      logger.info(" Time Metrics for punctuate  "
>                     " for TimeStamp :: " + "" + timestamp + " processed
> Records :: "
>                     + counter +" totalTimeTakenToProcessRecords ::
> "+totalTimeTakenToProcessRecords +" totalTimeTakenToStoreRecord ::
> "+totalTimeTakenToStoreRecords
>                     +"toal time Taken to retrieve Records :: "+
> (System.currentTimeMillis() -
> (startTime+totalTimeTakenToProcessRecords+totalTimeTakenToStoreRecords))+"
> Total time Taken :: " + (System.currentTimeMillis() - startTime));
> }
> }
>

Re: Improving Kafka State Store performance

Posted by James Cheng <wu...@gmail.com>.
In addition to the measurements that you are doing yourself, Kafka Streams also has its own metrics. They are exposed via JMX, if you have that enabled:

http://kafka.apache.org/documentation/#monitoring <http://kafka.apache.org/documentation/#monitoring>

If you set metrics.recording.level="debug", you can see a bunch of metrics around the state stores. Stuff like put-latency-avg, for example.

See http://kafka.apache.org/documentation/#kafka_streams_store_monitoring <http://kafka.apache.org/documentation/#kafka_streams_store_monitoring>

-James

> On Sep 16, 2017, at 6:14 AM, dev loper <sp...@gmail.com> wrote:
> 
> Hi Kafka Streams Users,
> 
> I am trying to improve the performance of Kafka Streams State Store
> Persistent Store. In our application we are using Kafka Streams Processor
> API  and using Persistent State Store.. My application when starts up it
> performing well but over a period of time the performance deteriorated. I
> am computing certain results in computeAnalytics method and this method is
> not taking time at all. This method is being called within both process and
> punctuate and I am storing the updated object back to store. Over the
> period of time its taking huge time for completing the punctuate process
> and I could see majority of the time is spent in storing the records and
> Iterating the records. The record size is just 2500 per partition. I am not
> where I am going wrong and how can I improve the performance.
> 
> Below is one such sample log record.
> 
> INFO  | 07:59:58 | processors.MyProcessor (MyProcessor.java:123) - Time
> Metrics for punctuate  for TimeStamp :: 1505564655878 processed Records ::
> 2109 totalTimeTakenToProcessRecords :: 2 totalTimeTakenToStoreRecord ::
> 27605toal time Taken to retrieve Records :: 12787 Total time Taken :: 40394
> 
> Below I have given my pseudo code for my processor which exactly resembles
> the code which I am using in my application.
> 
> MyProcessor(){
> 
>  process(Key objectkey, Update eventupdate){
>   long timestamp=context.timestamp();
>   AnalyticeObj storeobj=store.get(objectkey);
> 
>   if( storeobj ===null)
>         {
>          storeobj=new  AnalyticeObj(objectkey,eventupdate,timestamp)
>         }
>         else
>        {
>           storeobj.update(eventupdate,timestamp)
>        }
>     storeobj=storeobj.computeAnalytics();
> 
>   store.put(objectkey,storeobj);
>  context.commit();
> }
> // Every 5 seconds
> punctuate(long timestamp)
> {
> long startTime = System.currentTimeMillis();
> long totalTimeTakenToProcessRecords=0;
> long totalTimeTakenToStoreRecords=0;
> long counter=0;
> KeyValueIterator iter=this.visitStore.all();
> while (iter.hasNext()) {
> KeyValue<Key, AnalyticeObj> entry = iter.next();
> 
>    if(AnalyticeObj.hasExpired(timestamp)
>         store.remove(entry.key)
>      else
>      {
>        long processStartTime=System.currentTimeMillis();
>         AnalyticeObj storeobj= entry.value.computeAnalytics(timestamp);
> 
> totalTimeTakenToProcessRecords=totalTimeTakenToProcessRecords+(System.currentTimeMillis()-processStartTime);
> 
>         long storeStartTime=System.currentTimeMillis();
>          store.put(entry.key,storeobj);
> 
> totalTimeTakenToStoreRecords=totalTimeTakenToStoreRecords+(System.currentTimeMillis()-storeStartTime);
>       }
>   counter++;
> }
>     logger.info(" Time Metrics for punctuate  "
>                    " for TimeStamp :: " + "" + timestamp + " processed
> Records :: "
>                    + counter +" totalTimeTakenToProcessRecords ::
> "+totalTimeTakenToProcessRecords +" totalTimeTakenToStoreRecord ::
> "+totalTimeTakenToStoreRecords
>                    +"toal time Taken to retrieve Records :: "+
> (System.currentTimeMillis() -
> (startTime+totalTimeTakenToProcessRecords+totalTimeTakenToStoreRecords))+"
> Total time Taken :: " + (System.currentTimeMillis() - startTime));
> }
> }