You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by "Chris Riccomini (JIRA)" <ji...@apache.org> on 2014/01/24 19:39:43 UTC
[jira] [Resolved] (SAMZA-135) LevelDbKeyValueStore leaks memory on
putAll
[ https://issues.apache.org/jira/browse/SAMZA-135?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Chris Riccomini resolved SAMZA-135.
-----------------------------------
Resolution: Fixed
Merged and committed.
> LevelDbKeyValueStore leaks memory on putAll
> -------------------------------------------
>
> Key: SAMZA-135
> URL: https://issues.apache.org/jira/browse/SAMZA-135
> Project: Samza
> Issue Type: Bug
> Components: kv
> Affects Versions: 0.6.0
> Reporter: Chris Riccomini
> Assignee: Chris Riccomini
> Attachments: SAMZA-135.0.patch
>
>
> Sitting in a tight loop and calling store.all(); store.close(); shows that memory is being leaked.
> Digging into it a bit more, when we use a KeyValueStore with a cache, and we call store.all, the cache does:
> {code}
> def all() = {
> metrics.alls.inc
> flush()
> store.all()
> }
> {code}
> In turn, flush() does:
> {code}
> def flush() {
> trace("Flushing.")
> metrics.flushes.inc
> // write out the contents of the dirty list oldest first
> val batch = new java.util.ArrayList[Entry[K, V]](this.dirtyCount)
> for (k <- this.dirty.reverse) {
> val entry = this.cache.get(k)
> entry.dirty = null // not dirty any more
> batch.add(new Entry(k, entry.value))
> }
> store.putAll(batch)
> store.flush
> metrics.flushBatchSize.inc(batch.size)
> // reset the dirty list
> this.dirty = new mutable.DoubleLinkedList[K]()
> this.dirtyCount = 0
> }
> {code}
> The store.putAll in this code, calls LevelDbKeyValueStore.putAll, which has:
> {code}
> def putAll(entries: java.util.List[Entry[Array[Byte], Array[Byte]]]) {
> val batch = db.createWriteBatch()
> val iter = entries.iterator
> var wrote = 0
> var deletes = 0
> while (iter.hasNext) {
> wrote += 1
> val curr = iter.next()
> if (curr.getValue == null) {
> deletes += 1
> batch.delete(curr.getKey)
> } else {
> val key = curr.getKey
> val value = curr.getValue
> metrics.bytesWritten.inc(key.size + value.size)
> batch.put(key, value)
> }
> }
> db.write(batch)
> batch.close
> metrics.puts.inc(wrote)
> metrics.deletes.inc(deletes)
> }
> {code}
> According to the docs on https://github.com/fusesource/leveldbjni, the batch needs to be close!
> {code}
> WriteBatch batch = db.createWriteBatch();
> try {
> batch.delete(bytes("Denver"));
> batch.put(bytes("Tampa"), bytes("green"));
> batch.put(bytes("London"), bytes("red"));
> db.write(batch);
> } finally {
> // Make sure you close the batch to avoid resource leaks.
> batch.close();
> }
> {code}
> This should be a one-line fix.
--
This message was sent by Atlassian JIRA
(v6.1.5#6160)