You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by Sam Lendle <sl...@pandora.com> on 2018/06/22 19:11:20 UTC

Kafka Streams Session store fetch latency very high with caching turned on

I am using a session store in a kafka streams application. With caching turned on, average fetch latency was very high, about 200 ms after running for about 1 hour. With caching turned off, it was about 100 μs. We seem to be running fine without caching, but I am very curious as to why caching performance is so bad in our case. Any insight into what might be going on would be helpful.


Setup/config

  *   I'm using a custom transformer, but the implementation is almost identical to this section of KStreamSessionWindowAggregate https://github.com/apache/kafka/blob/fdcf75ea326b8e07d8d7c4f5cc14f1e70451bcd4/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregate.java#L94-L113 The main difference is I'm forwarding something other than the updated session downstream
  *   Logging is turned off, so updates are not pushed to a change log topic. The store starts empty whenever streams is initialized
  *   I don't think I'm setting any potentially related configs. Rocksdb config is the default.

We're receiving about 1000 messages/second in a topic w/ five partitions. With caching turned on, this custom transformer is the bottleneck and processing rate is much lower, 100-200 ops per second. With caching turned off the volume is no problem. There are about 500k unique keys per hour.

Using a sampling profiler I saw that most time was spent in TreeMap operations. Unfortunately I don't have a copy of the profile data anymore, but I think the map in question is the `cache` field in the NamedCache class.

If I look at a plot of fetch latency vs time since starting, it looks to me like latency is about O(log(time)). I think what's going on is the size of the map is increasing linearly in time, particularly for the first few minutes that streams is running, because almost all keys will be unique. So the latency is almost entirely spent in TreeMap#get.

Questions:
1) Does my theory make sense?
2) Could the issue be related to the fact that I'm using a state store with the transformer/processor API vs the dsl? I know that caching is turned on by default for state stores in the dsl but not in the processor API, but I don't understand why.
3) My understanding is that streams side state store caching is an optimization to reduce the number of writes to the underlying rocksdb store. In that case, because I have so many unique keys, and the same keys usually show up a few minutes apart, it makes sense that caching wouldn't do much for me. Is that correct?
4) Given that things seem to work fine with caching turned off, could there be any advantage to having it on and configured differently? If so, what configuration changes should I try?

If there's any additional context I can provide please let me know.

Thanks,
Sam




Re: Kafka Streams Session store fetch latency very high with caching turned on

Posted by Guozhang Wang <wa...@gmail.com>.
Hello Sam,

That is an interesting find. My reasoning is similar to yours: since you
have 1K / sec input traffic, it means 3600K / hour. Since you mean there
are about 500K / hour unique keys, it means each key will be updated
roughly about 7 times per hour. Assuming the traffic is even not skewed,
and your cache size is not large (by default it is only 50Mb) then it may
not help too much.

About the caching space, we had some optimizations along with KIP-155
implementations before (https://github.com/apache/kafka/pull/3027), to
reduce the search space inside cache to also corresponding segments in the
underlying store, for the same reason that `TreeMap#get()` is less
efficient with large key space. For session windows, there is no fixed
window length and the segment interval is purely dependent on the retention
period, my suspicion is that if your retention period is set to be small,
then `sessionStore#findSessions()`'s range will still span over almost all
segments which will not help reducing the search key space.

So for your case, I think disabling caching for that session store is a
good idea. At the same time we should consider further improving our
caching implementations to have sth. better than TreeMap.



Guozhang




On Sun, Jun 24, 2018 at 5:47 PM, Matthias J. Sax <ma...@confluent.io>
wrote:

> Sam,
>
> Thanks for your email. This is a very interesting find. I did not double
> check the code but your reasoning makes sense to me. Note, that caching
> was _not_ introduced to reduce the writes to RocksDB, but to reduce the
> write the the changelog topic and to reduce the number of records send
> downstream.
>
> Because, you don't want to have a fault-tolerant store and disabled
> caching, I see no reason why disabling caching would be a bad idea for
> your use case.
>
> From a performance point of view, there should be no difference between
> DSL and Processor API. Note, that the DSL sits on top of Processor API
> and at runtime, we don't even know if DSL was used or not. Caching is
> enabled by default to reduce the downstream load -- we have many
> discussion if this is the best default behavior. The latest conclusion
> was, that it is... :)
>
>
>
> -Matthias
>
>
>
> On 6/22/18 12:11 PM, Sam Lendle wrote:
> > I am using a session store in a kafka streams application. With caching
> turned on, average fetch latency was very high, about 200 ms after running
> for about 1 hour. With caching turned off, it was about 100 μs. We seem to
> be running fine without caching, but I am very curious as to why caching
> performance is so bad in our case. Any insight into what might be going on
> would be helpful.
> >
> >
> > Setup/config
> >
> >   *   I'm using a custom transformer, but the implementation is almost
> identical to this section of KStreamSessionWindowAggregate
> https://github.com/apache/kafka/blob/fdcf75ea326b8e07d8d7c4f5cc14f1
> e70451bcd4/streams/src/main/java/org/apache/kafka/streams/
> kstream/internals/KStreamSessionWindowAggregate.java#L94-L113 The main
> difference is I'm forwarding something other than the updated session
> downstream
> >   *   Logging is turned off, so updates are not pushed to a change log
> topic. The store starts empty whenever streams is initialized
> >   *   I don't think I'm setting any potentially related configs. Rocksdb
> config is the default.
> >
> > We're receiving about 1000 messages/second in a topic w/ five
> partitions. With caching turned on, this custom transformer is the
> bottleneck and processing rate is much lower, 100-200 ops per second. With
> caching turned off the volume is no problem. There are about 500k unique
> keys per hour.
> >
> > Using a sampling profiler I saw that most time was spent in TreeMap
> operations. Unfortunately I don't have a copy of the profile data anymore,
> but I think the map in question is the `cache` field in the NamedCache
> class.
> >
> > If I look at a plot of fetch latency vs time since starting, it looks to
> me like latency is about O(log(time)). I think what's going on is the size
> of the map is increasing linearly in time, particularly for the first few
> minutes that streams is running, because almost all keys will be unique. So
> the latency is almost entirely spent in TreeMap#get.
> >
> > Questions:
> > 1) Does my theory make sense?
> > 2) Could the issue be related to the fact that I'm using a state store
> with the transformer/processor API vs the dsl? I know that caching is
> turned on by default for state stores in the dsl but not in the processor
> API, but I don't understand why.
> > 3) My understanding is that streams side state store caching is an
> optimization to reduce the number of writes to the underlying rocksdb
> store. In that case, because I have so many unique keys, and the same keys
> usually show up a few minutes apart, it makes sense that caching wouldn't
> do much for me. Is that correct?
> > 4) Given that things seem to work fine with caching turned off, could
> there be any advantage to having it on and configured differently? If so,
> what configuration changes should I try?
> >
> > If there's any additional context I can provide please let me know.
> >
> > Thanks,
> > Sam
> >
> >
> >
> >
>
>


-- 
-- Guozhang

Re: Kafka Streams Session store fetch latency very high with caching turned on

Posted by "Matthias J. Sax" <ma...@confluent.io>.
Sam,

Thanks for your email. This is a very interesting find. I did not double
check the code but your reasoning makes sense to me. Note, that caching
was _not_ introduced to reduce the writes to RocksDB, but to reduce the
write the the changelog topic and to reduce the number of records send
downstream.

Because, you don't want to have a fault-tolerant store and disabled
caching, I see no reason why disabling caching would be a bad idea for
your use case.

From a performance point of view, there should be no difference between
DSL and Processor API. Note, that the DSL sits on top of Processor API
and at runtime, we don't even know if DSL was used or not. Caching is
enabled by default to reduce the downstream load -- we have many
discussion if this is the best default behavior. The latest conclusion
was, that it is... :)



-Matthias



On 6/22/18 12:11 PM, Sam Lendle wrote:
> I am using a session store in a kafka streams application. With caching turned on, average fetch latency was very high, about 200 ms after running for about 1 hour. With caching turned off, it was about 100 μs. We seem to be running fine without caching, but I am very curious as to why caching performance is so bad in our case. Any insight into what might be going on would be helpful.
> 
> 
> Setup/config
> 
>   *   I'm using a custom transformer, but the implementation is almost identical to this section of KStreamSessionWindowAggregate https://github.com/apache/kafka/blob/fdcf75ea326b8e07d8d7c4f5cc14f1e70451bcd4/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregate.java#L94-L113 The main difference is I'm forwarding something other than the updated session downstream
>   *   Logging is turned off, so updates are not pushed to a change log topic. The store starts empty whenever streams is initialized
>   *   I don't think I'm setting any potentially related configs. Rocksdb config is the default.
> 
> We're receiving about 1000 messages/second in a topic w/ five partitions. With caching turned on, this custom transformer is the bottleneck and processing rate is much lower, 100-200 ops per second. With caching turned off the volume is no problem. There are about 500k unique keys per hour.
> 
> Using a sampling profiler I saw that most time was spent in TreeMap operations. Unfortunately I don't have a copy of the profile data anymore, but I think the map in question is the `cache` field in the NamedCache class.
> 
> If I look at a plot of fetch latency vs time since starting, it looks to me like latency is about O(log(time)). I think what's going on is the size of the map is increasing linearly in time, particularly for the first few minutes that streams is running, because almost all keys will be unique. So the latency is almost entirely spent in TreeMap#get.
> 
> Questions:
> 1) Does my theory make sense?
> 2) Could the issue be related to the fact that I'm using a state store with the transformer/processor API vs the dsl? I know that caching is turned on by default for state stores in the dsl but not in the processor API, but I don't understand why.
> 3) My understanding is that streams side state store caching is an optimization to reduce the number of writes to the underlying rocksdb store. In that case, because I have so many unique keys, and the same keys usually show up a few minutes apart, it makes sense that caching wouldn't do much for me. Is that correct?
> 4) Given that things seem to work fine with caching turned off, could there be any advantage to having it on and configured differently? If so, what configuration changes should I try?
> 
> If there's any additional context I can provide please let me know.
> 
> Thanks,
> Sam
> 
> 
> 
>