You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by "Chris Riccomini (JIRA)" <ji...@apache.org> on 2014/01/24 19:39:43 UTC

[jira] [Resolved] (SAMZA-135) LevelDbKeyValueStore leaks memory on putAll

     [ https://issues.apache.org/jira/browse/SAMZA-135?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Chris Riccomini resolved SAMZA-135.
-----------------------------------

    Resolution: Fixed

Merged and committed.

> LevelDbKeyValueStore leaks memory on putAll
> -------------------------------------------
>
>                 Key: SAMZA-135
>                 URL: https://issues.apache.org/jira/browse/SAMZA-135
>             Project: Samza
>          Issue Type: Bug
>          Components: kv
>    Affects Versions: 0.6.0
>            Reporter: Chris Riccomini
>            Assignee: Chris Riccomini
>         Attachments: SAMZA-135.0.patch
>
>
> Sitting in a tight loop and calling store.all(); store.close(); shows that memory is being leaked.
> Digging into it a bit more, when we use a KeyValueStore with a cache, and we call store.all, the cache does:
> {code}
>   def all() = {
>     metrics.alls.inc
>     flush()
>     store.all()
>   }
> {code}
> In turn, flush() does:
> {code}
>   def flush() {
>     trace("Flushing.")
>     metrics.flushes.inc
>     // write out the contents of the dirty list oldest first
>     val batch = new java.util.ArrayList[Entry[K, V]](this.dirtyCount)
>     for (k <- this.dirty.reverse) {
>       val entry = this.cache.get(k)
>       entry.dirty = null // not dirty any more
>       batch.add(new Entry(k, entry.value))
>     }
>     store.putAll(batch)
>     store.flush
>     metrics.flushBatchSize.inc(batch.size)
>     // reset the dirty list
>     this.dirty = new mutable.DoubleLinkedList[K]()
>     this.dirtyCount = 0
>   }
> {code}
> The store.putAll in this code, calls LevelDbKeyValueStore.putAll, which has:
> {code}
>   def putAll(entries: java.util.List[Entry[Array[Byte], Array[Byte]]]) {
>     val batch = db.createWriteBatch()
>     val iter = entries.iterator
>     var wrote = 0
>     var deletes = 0
>     while (iter.hasNext) {
>       wrote += 1
>       val curr = iter.next()
>       if (curr.getValue == null) {
>         deletes += 1
>         batch.delete(curr.getKey)
>       } else {
>         val key = curr.getKey
>         val value = curr.getValue
>         metrics.bytesWritten.inc(key.size + value.size)
>         batch.put(key, value)
>       }
>     }
>     db.write(batch)
>     batch.close
>     metrics.puts.inc(wrote)
>     metrics.deletes.inc(deletes)
>   }
> {code}
> According to the docs on https://github.com/fusesource/leveldbjni, the batch needs to be close!
> {code}
> WriteBatch batch = db.createWriteBatch();
> try {
>   batch.delete(bytes("Denver"));
>   batch.put(bytes("Tampa"), bytes("green"));
>   batch.put(bytes("London"), bytes("red"));
>   db.write(batch);
> } finally {
>   // Make sure you close the batch to avoid resource leaks.
>   batch.close();
> }
> {code}
> This should be a one-line fix.



--
This message was sent by Atlassian JIRA
(v6.1.5#6160)