You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by pm...@apache.org on 2018/12/18 21:06:07 UTC

samza git commit: SAMZA-2018: State restore improvements using RocksDB writebatch API

Repository: samza
Updated Branches:
  refs/heads/master 6a75503d7 -> 84d144c51


SAMZA-2018: State restore improvements using RocksDB writebatch API

This PR enables the RocksDbKeyValueStore to use the writeBatch API.

Author: Ray Matharu <rm...@linkedin.com>

Reviewers: Jacob Maes <jm...@apache.org>, Prateek Maheshwari <pm...@apache.org>

Closes #864 from rmatharu/writebatch


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/84d144c5
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/84d144c5
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/84d144c5

Branch: refs/heads/master
Commit: 84d144c5120bf8bae8dc02ba7a1de1c68bed6418
Parents: 6a75503
Author: Ray Matharu <rm...@linkedin.com>
Authored: Tue Dec 18 13:06:04 2018 -0800
Committer: Prateek Maheshwari <pm...@apache.org>
Committed: Tue Dec 18 13:06:04 2018 -0800

----------------------------------------------------------------------
 .../org/apache/samza/storage/kv/RocksDbKeyValueStore.scala   | 8 +++++---
 1 file changed, 5 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/84d144c5/samza-kv-rocksdb/src/main/scala/org/apache/samza/storage/kv/RocksDbKeyValueStore.scala
----------------------------------------------------------------------
diff --git a/samza-kv-rocksdb/src/main/scala/org/apache/samza/storage/kv/RocksDbKeyValueStore.scala b/samza-kv-rocksdb/src/main/scala/org/apache/samza/storage/kv/RocksDbKeyValueStore.scala
index b7baede..c5a89d9 100644
--- a/samza-kv-rocksdb/src/main/scala/org/apache/samza/storage/kv/RocksDbKeyValueStore.scala
+++ b/samza-kv-rocksdb/src/main/scala/org/apache/samza/storage/kv/RocksDbKeyValueStore.scala
@@ -172,25 +172,27 @@ class RocksDbKeyValueStore(
     }
   }
 
-  // Write batch from RocksDB API is not used currently because of: https://github.com/facebook/rocksdb/issues/262
   def putAll(entries: java.util.List[Entry[Array[Byte], Array[Byte]]]): Unit = ifOpen {
     metrics.putAlls.inc()
     val iter = entries.iterator
     var wrote = 0
     var deletes = 0
+    val writeBatch = new WriteBatch()
     while (iter.hasNext) {
       val curr = iter.next()
       if (curr.getValue == null) {
         deletes += 1
-        db.delete(writeOptions, curr.getKey)
+        writeBatch.remove(curr.getKey)
       } else {
         wrote += 1
         val key = curr.getKey
         val value = curr.getValue
         metrics.bytesWritten.inc(key.length + value.length)
-        db.put(writeOptions, key, value)
+        writeBatch.put(key, value)
       }
     }
+    db.write(writeOptions, writeBatch)
+    writeBatch.close()
     metrics.puts.inc(wrote)
     metrics.deletes.inc(deletes)
   }