You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by pm...@apache.org on 2018/12/18 21:06:07 UTC
samza git commit: SAMZA-2018: State restore improvements using
RocksDB writebatch API
Repository: samza
Updated Branches:
refs/heads/master 6a75503d7 -> 84d144c51
SAMZA-2018: State restore improvements using RocksDB writebatch API
This PR enables the RocksDbKeyValueStore to use the writeBatch API.
Author: Ray Matharu <rm...@linkedin.com>
Reviewers: Jacob Maes <jm...@apache.org>, Prateek Maheshwari <pm...@apache.org>
Closes #864 from rmatharu/writebatch
Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/84d144c5
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/84d144c5
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/84d144c5
Branch: refs/heads/master
Commit: 84d144c5120bf8bae8dc02ba7a1de1c68bed6418
Parents: 6a75503
Author: Ray Matharu <rm...@linkedin.com>
Authored: Tue Dec 18 13:06:04 2018 -0800
Committer: Prateek Maheshwari <pm...@apache.org>
Committed: Tue Dec 18 13:06:04 2018 -0800
----------------------------------------------------------------------
.../org/apache/samza/storage/kv/RocksDbKeyValueStore.scala | 8 +++++---
1 file changed, 5 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/samza/blob/84d144c5/samza-kv-rocksdb/src/main/scala/org/apache/samza/storage/kv/RocksDbKeyValueStore.scala
----------------------------------------------------------------------
diff --git a/samza-kv-rocksdb/src/main/scala/org/apache/samza/storage/kv/RocksDbKeyValueStore.scala b/samza-kv-rocksdb/src/main/scala/org/apache/samza/storage/kv/RocksDbKeyValueStore.scala
index b7baede..c5a89d9 100644
--- a/samza-kv-rocksdb/src/main/scala/org/apache/samza/storage/kv/RocksDbKeyValueStore.scala
+++ b/samza-kv-rocksdb/src/main/scala/org/apache/samza/storage/kv/RocksDbKeyValueStore.scala
@@ -172,25 +172,27 @@ class RocksDbKeyValueStore(
}
}
- // Write batch from RocksDB API is not used currently because of: https://github.com/facebook/rocksdb/issues/262
def putAll(entries: java.util.List[Entry[Array[Byte], Array[Byte]]]): Unit = ifOpen {
metrics.putAlls.inc()
val iter = entries.iterator
var wrote = 0
var deletes = 0
+ val writeBatch = new WriteBatch()
while (iter.hasNext) {
val curr = iter.next()
if (curr.getValue == null) {
deletes += 1
- db.delete(writeOptions, curr.getKey)
+ writeBatch.remove(curr.getKey)
} else {
wrote += 1
val key = curr.getKey
val value = curr.getValue
metrics.bytesWritten.inc(key.length + value.length)
- db.put(writeOptions, key, value)
+ writeBatch.put(key, value)
}
}
+ db.write(writeOptions, writeBatch)
+ writeBatch.close()
metrics.puts.inc(wrote)
metrics.deletes.inc(deletes)
}