You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@kafka.apache.org by "ASF GitHub Bot (JIRA)" <ji...@apache.org> on 2015/09/28 21:36:04 UTC

[jira] [Commented] (KAFKA-2594) Add a key-value store that is a fixed-capacity in-memory LRU cache

    [ https://issues.apache.org/jira/browse/KAFKA-2594?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14933849#comment-14933849 ] 

ASF GitHub Bot commented on KAFKA-2594:
---------------------------------------

GitHub user rhauch opened a pull request:

    https://github.com/apache/kafka/pull/256

    KAFKA-2594 Added InMemoryLRUCacheStore

    Added a new `KeyValueStore` implementation called `InMemoryLRUCacheStore` that keeps a maximum number of entries in-memory, and as the size exceeds the capacity the least-recently used entry is removed from the store and the backing topic. Also added unit tests for this new store and the existing `InMemoryKeyValueStore` and `RocksDBKeyValueStore` implementations. A new `KeyValueStoreTestDriver` class simplifies all of the other tests, and can be used by other libraries to help test their own custom implementations.
    
    This PR depends upon [KAFKA-2593](https://issues.apache.org/jira/browse/KAFKA-2593) and its PR at https://github.com/apache/kafka/pull/255. Once that PR is merged, I can rebase this PR if desired.
    
    Two issues were uncovered when creating these new unit tests, and both are also addressed as separate (small) commits in this PR:
    * The `RocksDBKeyValueStore` initialization was not creating the file system directory if missing.
    * `MeteredKeyValueStore` was casting to `ProcessorContextImpl` to access the `RecordCollector`, which prevent using `MeteredKeyValueStore` implementations in tests where something other than `ProcessorContextImpl` was used. The fix was to introduce a `RecordCollector.Supplier` interface to define this `recordCollector()` method, and change `ProcessorContextImpl` and `MockProcessorContext` to both implement this interface. Now, `MeteredKeyValueStore` can cast to the new interface to access the record collector rather than to a single concrete implementation, making it possible to use any and all current stores inside unit tests.

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/rhauch/kafka kafka-2594

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/kafka/pull/256.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #256
    
----
commit d7e25f55dcbac1b64cfb0a8f642ce55078bd7d03
Author: Randall Hauch <rh...@gmail.com>
Date:   2015-09-28T18:34:51Z

    KAFKA-2593 Key value stores can use custom serializers and deserializers
    
    Add support for the key value stores to use specified serializers and deserializers (aka, "serdes"). Prior to this change, the stores were limited to only the default serdes specified in the topology's configuration and exposed to the processors via the ProcessorContext.
    
    Now, using InMemoryKeyValueStore and RocksDBKeyValueStore are similar: both are parameterized on the key and value types, and both have similar multiple static factory methods. The static factory methods either take explicit key and value serdes, take key and value class types so the serdes can be inferred (only for the built-in serdes for string, integer, long, and byte array types), or use the default serdes on the ProcessorContext.

commit 3b903d31503fcce7cf2b9607b68893a02425dacf
Author: Randall Hauch <rh...@gmail.com>
Date:   2015-09-28T19:11:55Z

    KAFKA-2594: Corrected RocksDBKeyValueStore to properly create directory if missing

commit 33e4fd2b8bb56783d62f55e223757e8c479212ed
Author: Randall Hauch <rh...@gmail.com>
Date:   2015-09-28T19:15:24Z

    KAFKA-2594: MeteredKeyValueStore no longer casts to ProcessorContextImpl
    
    The cast was used to access ProcessorContextImpl.recordCollector(), and the cast prevent using MeteredKeyValueStore implementations in tests where something other than ProcessorContextImpl was used. Introduced a RecordCollector.Supplier interface to define this `recordCollector()` method, and changed ProcessorContextImpl and MockProcessorContext to both implement this interface. Now, MeteredKeyValueStore can cast to the interface to access the record collector rather than to a single concrete implementation, making it possible to use the stores inside unit tests.

commit ff5e779fa1d6955bf0bf633191edd9d7a7cb9a9c
Author: Randall Hauch <rh...@gmail.com>
Date:   2015-09-28T19:26:16Z

    KAFKA-2594 Added InMemoryLRUCacheStore
    
    Added a new KeyValueStore implementation that keeps a maximum number of entries in-memory, and as the size exceeds the capacity the least-recently used entry is removed from the store and the backing topic. Also added unit tests for this new store and the existing InMemoryKeyValueStore and RocksDBKeyValueStore implementations. A new KeyValueStoreTestDriver class simplifies all of the other tests.

----


> Add a key-value store that is a fixed-capacity in-memory LRU cache 
> -------------------------------------------------------------------
>
>                 Key: KAFKA-2594
>                 URL: https://issues.apache.org/jira/browse/KAFKA-2594
>             Project: Kafka
>          Issue Type: Sub-task
>          Components: kafka streams
>            Reporter: Randall Hauch
>            Assignee: Randall Hauch
>             Fix For: 0.9.0.0
>
>
> The current {{KeyValueStore}} implementations are not limited in size, and thus are less useful for some use cases. This subtask will add a simple key-value store that maintains in memory at most a maximum number of entries that were recently read or written. When the cache size reaches the capacity and a new entry is to be added, the least recently used entry will be automatically purged from the cache. This key-value store will extend {{MeteredKeyValueStore}} for monitoring and recording of changes to a backing topic, enabling recovery of the cache contents from the replicated state.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)