You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by "Chris Riccomini (JIRA)" <ji...@apache.org> on 2014/10/01 19:19:34 UTC

[jira] [Commented] (SAMZA-424) Add a Cache state API to the Samza container

    [ https://issues.apache.org/jira/browse/SAMZA-424?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14155140#comment-14155140 ] 

Chris Riccomini commented on SAMZA-424:
---------------------------------------

The KeyValueStorageEngine uses these configs to assemble a KeyValueStore in the following layers:

{noformat}
KeyValueStorageEngine
NullSafeKeyValueStore
CachedStore
SerializedKeyValueStore
LoggedStore
<KeyValueStore implementation>
{noformat}

Thus far, we've been talking about implementing a new KeyValueStore that handles caching (at the base of the stack above). What if we update the existing CachedStore to handle different eviction policies? Right now, I think it just does LRU. We could add a TTL-based and LFU policy to it. This is kind of interesting to me because it seems we already have the basic building blocks to implement a read-through and write-behind cache. You would need to use the CachedStore, and implement a KeyValueStore that interacts with the remote store. Likewise, if you defined CachedStore, but didn't define any underlying KeyValueStore implementation, you'd have a local in-memory cache with no remote persistence.

bq. I thought about keeping delete, range and all inside Cache since they're all technically applicable. But then it turns out that the Cache and KeyValueStore are exactly the same. In that case, we're back to the same problem that people use 'KeyValueStore' in their code (with some additional config params) while they really mean a Cache.

I don't think we should eliminate useful stuff so that we have two distinct interfaces. I agree that all/range/delete would be useful for caches, so my vote is to make them available. As you said, this would mean that there is only a KeyValueStore interface.

bq. If that doesn't sound like a big deal, then we don't have to change anything.

Thinking out loud. One rudimentary way to solve this is to use proper variable names and store names:

{code}
store = (KeyValueStore<String, Integer>) context.getStore("page-key-counts");
cache = (KeyValueStore<String, Integer>) context.getStore("my-cache-thing");
store.put(pageKey.toString(), currentCount + 1);
cache.put("foo", 123);
{code}

This relies on the developer, though.

bq. In that case, the KeyValueStore will just have a isShared property (either true or False) and optional configs for expiration / eviction.

I think the isShared property should actually be at the framework level, similar to changelog. All stores, regardless of whether they're KV, Lucene, etc, should all be able to be shared.

bq. My fear of merging this is that we will end up with too many config params for the same thing. 

I agree. It kind of feels like we're coding through config with this. We have a bunch of pieces that assemble together in various ways, and we're defining that assembly in config. Taking a step back, here's my list of current configurations for KeyValueStores:

* The underlying store to use (memory, leveldb, rocksdb)
* Changelog enabled (LoggedStore)
* Caching enabled (CachedStore)
* Serde (SerializedKeyValueStore)
* Batching (CachedStore config)
* Config related to the underlying store (e.g. buffer sizes, etc)

And the proposed new configurations:

* Eviction policy (LRU, LFU, none)
* Eviction policy-related config (TTL, size, etc)
* Container-level sharing enabled

Here's a real-world configuration example of a KeyValueStore with a custom serde in Samza 0.7:

{code}
    <property name="stores.grandfather-state-store.factory" value="org.apache.samza.storage.kv.KeyValueStorageEngineFactory" />
    <property name="stores.grandfather-state-store.changelog" value="kafka.aggregator.grandfather-state-store" />
    <property name="stores.grandfather-state-store.key.serde" value="string" />
    <property name="stores.grandfather-state-store.msg.serde" value="aggregator-state-serde" />
{code}

In this case, we mostly default to the correct settings. We could follow the same approach for caching. [~martinkl] has generally been pushing for use-case based approach, which I agree with. We could introduce something like a "type" field which presets a bunch of defaults.

This would setup a CacheSerde with no underlying store, no changelog, and some default cache size.

{code}
    <property name="stores.simple-cache.type" value="cache" />
{code}

This could setup a CacheSerde using a TTL-based policy with a durable changelog.

{code}
    <property name="stores.ttl-cache.type" value="ttl-cache" />
    <property name="stores.ttl-cache.changelog" value="kafka.aggregator.grandfather-state-store" />
{code}

Effectively, the "type" just changes which defaults are used for configuration.

Another approach would be to do store wiring in the code. We could move away from config-based wiring for stores, and add a TaskContext.registerStore(StorageEngine) method instead. I haven't though this through idea in much detail, but it seems more developer friendly. I suppose this kind of implementation would look something like this:

{code}
KeyValueStorageEngine store = null;

public void init(Config config, TaskContext context) {
  int cacheSize = config.getInt("my.cache.size.config")
  int batchSize = config.getInt("my.batch.size.config")
  
  store = new KeyValueStorageEngineBuilder()
    .setCachePolicy(new LRUCache(cacheSize))
    .setBatchSize(batchSize)
    .setStore(new LevelDbKeyValueStore())
    .setChangelog(new SystemStream("kafka", "my-changelog-stream"))
    .build();
  
  context.registerStore(store);
}
{code}

This would certainly impact how shared stores (SAMZA-402) are handled (which task creates the shared store? is there some other code that does this?). It's also unclear how the logged store stuff would get setup. Stores would also not be restored until *after* the StreamTask.init method, whereas now they're restored before, I think. It's not backwards compatible either. If we thought through this, though, it seems like it might be a better way to handle config/wiring for stores.

> Add a Cache state API to the Samza container
> --------------------------------------------
>
>                 Key: SAMZA-424
>                 URL: https://issues.apache.org/jira/browse/SAMZA-424
>             Project: Samza
>          Issue Type: New Feature
>          Components: container
>            Reporter: Chinmay Soman
>            Assignee: Chinmay Soman
>         Attachments: SAMZA-424-Cache-API_0.pdf
>
>
> There are cases when the user code needs access to a 'cache' which can be used to store custom data. This cache is different from the KeyValue store in the following ways:
> * At the very least Needs to support LRU (Least Recently Used) and TTL (Time To Live) eviction strategies
> * May not support all() and range() operations (since this wreaks havoc with the eviction operation)
> * Needs to exist at a per task or a per container level.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)