You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by "Guozhang Wang (Jira)" <ji...@apache.org> on 2021/09/09 19:31:00 UTC

[jira] [Created] (KAFKA-13286) Revisit Streams State Store and Serde Implementation

Guozhang Wang created KAFKA-13286:
-------------------------------------

             Summary: Revisit Streams State Store and Serde Implementation
                 Key: KAFKA-13286
                 URL: https://issues.apache.org/jira/browse/KAFKA-13286
             Project: Kafka
          Issue Type: Improvement
          Components: streams
            Reporter: Guozhang Wang


Kafka Streams state store is built in hierarchical layers as metered -> cached -> logged -> [convert] -> raw stores (rocksDB, in-memory), and it leveraged on the builtin Serde libraries for serialize / deserialize. There are several inefficiencies in the current design:

* The API only supports serde using byte arrays. This means we generate a lot of garbage and spend unnecessary time copying bytes, especially when working with windowed state stores that rely on composite keys. In many places in the code we have extract parts of the composite key to deserialize the either the timestamp or the message key from the state store key (e.g. the methods in WindowStoreUtils).
* The serde operation could happen on multiple layers of the state store hierarchies, which means we need to extra byte array copies as we move along doing serdes. For example, we do serde in the metered layer, but then again in cached layer with cache functions, and also in logged stores for generated the key/value in bytes to send to Kafka.

To improve on this, we can consider having support for serde into/from ByteBuffers would allow us to reuse the underlying bytearrays and just pass around slices of the underlying Buffers to avoid the unnecessary copying. 

1) More specifically, e.g. the serialize interface could be refactored to:

{code}
ByteBuffer serialize(String topic, T data, ByteBuffer);
{code}

Where the serialized bytes would be appended to the ByteBuffer. When a series of serialize functions are called along side the state store hierarchies, we then just need to make sure that what's should be appended first to the ByteBuffer would be serialized first. E.g. if the serialized bytes format of a WindowSchema is <timestamp, boolean, key>

Then we would need to call the serialize as in:

{code}
serialize(key, serialize(leftRightBoolean, serialize(timestamp, buffer))); 
{code}

2) In addition, we can consider having a pool of ByteBuffers representing a set of byte arrays that can be re-used. This can be captured as an intelligent {{ByteBufferSupplier}}, which provides:

{code}
ByteBuffer ByteBufferSupplier#allocate(long size)
{code}

Its implementation can choose to either create new byte arrays, or re-use existing ones in the pool; the gottcha though is that we may usually not know the serialized byte length for raw keys (think: in practice the keys would be in json/avro etc), and hence would not know how to pass in {{size}} for serialization, and hence may need to be conservative, or trial and error etc.

Of course callers then would be responsible for returning the used ByteBuffer back to the Supplier via

{code}
ByteBufferSupplier#deallocate(ByteBuffer buffer)
{code}

3) With RocksDB's direct byte-buffer (KAFKA-9168) we can optionally also allocate them from RocksDB directly so that using them for puts/gets would not go through JNI, hence is more efficient. The Supplier then would need to be careful to deallocate these direct byte-buffers since they would not be GC'ed by the JVM.




--
This message was sent by Atlassian Jira
(v8.3.4#803005)