You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by pm...@apache.org on 2017/10/19 18:36:00 UTC
samza git commit: Minor fixes to KeyValueStore and
RocksDBKeyValueStore
Repository: samza
Updated Branches:
refs/heads/master 62904334b -> 958edc42f
Minor fixes to KeyValueStore and RocksDBKeyValueStore
1. Replaced extension class in KeyValueStore with default methods.
2. Fixed formatting in RocksDBKeyValueStore#openDB.
3. Now logs original RocksDBException on errors opening the db. Other minor log message cleanup.
Author: Prateek Maheshwari <pm...@linkedin.com>
Reviewers: Jacob Maes <jm...@apache.org>
Closes #332 from prateekm/store-fixes
Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/958edc42
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/958edc42
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/958edc42
Branch: refs/heads/master
Commit: 958edc42f59a1891e14276251462236abd2af5b9
Parents: 6290433
Author: Prateek Maheshwari <pm...@linkedin.com>
Authored: Thu Oct 19 11:36:02 2017 -0700
Committer: Prateek Maheshwari <pm...@linkedin.com>
Committed: Thu Oct 19 11:36:02 2017 -0700
----------------------------------------------------------------------
.../apache/samza/storage/kv/KeyValueStore.java | 69 ++++------------
.../kv/inmemory/InMemoryKeyValueStore.scala | 8 --
.../samza/storage/kv/RocksDbKeyValueStore.scala | 86 ++++++++------------
.../apache/samza/storage/kv/CachedStore.scala | 2 +-
.../samza/storage/kv/MockKeyValueStore.scala | 8 --
5 files changed, 51 insertions(+), 122 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/samza/blob/958edc42/samza-api/src/main/java/org/apache/samza/storage/kv/KeyValueStore.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/storage/kv/KeyValueStore.java b/samza-api/src/main/java/org/apache/samza/storage/kv/KeyValueStore.java
index b1fea7b..18a89ec 100644
--- a/samza-api/src/main/java/org/apache/samza/storage/kv/KeyValueStore.java
+++ b/samza-api/src/main/java/org/apache/samza/storage/kv/KeyValueStore.java
@@ -46,7 +46,19 @@ public interface KeyValueStore<K, V> {
* @return a map of the keys that were found and their respective values.
* @throws NullPointerException if the specified {@code keys} list, or any of the keys, is {@code null}.
*/
- Map<K, V> getAll(List<K> keys);
+ default Map<K, V> getAll(List<K> keys) {
+ Map<K, V> map = new HashMap<>(keys.size());
+
+ for (K key : keys) {
+ V value = get(key);
+
+ if (value != null) {
+ map.put(key, value);
+ }
+ }
+
+ return map;
+ }
/**
* Updates the mapping of the specified key-value pair; Associates the specified {@code key} with the specified {@code value}.
@@ -79,7 +91,11 @@ public interface KeyValueStore<K, V> {
* @param keys the keys for which the mappings are to be deleted.
* @throws NullPointerException if the specified {@code keys} list, or any of the keys, is {@code null}.
*/
- void deleteAll(List<K> keys);
+ default void deleteAll(List<K> keys) {
+ for (K key : keys) {
+ delete(key);
+ }
+ }
/**
* Returns an iterator for a sorted range of entries specified by [{@code from}, {@code to}).
@@ -111,53 +127,4 @@ public interface KeyValueStore<K, V> {
* Flushes this key-value store, if applicable.
*/
void flush();
-
- /**
- * Represents an extension for classes that implement {@link KeyValueStore}.
- */
- // TODO replace with default interface methods when we can use Java 8 features.
- class Extension {
- private Extension() {
- // This class cannot be instantiated
- }
-
- /**
- * Gets the values with which the specified {@code keys} are associated.
- *
- * @param store the key-value store for which this operation is to be performed.
- * @param keys the keys with which the associated values are to be fetched.
- * @param <K> the type of keys maintained by the specified {@code store}.
- * @param <V> the type of values maintained by the specified {@code store}.
- * @return a map of the keys that were found and their respective values.
- * @throws NullPointerException if the specified {@code keys} list, or any of the keys, is {@code null}.
- */
- public static <K, V> Map<K, V> getAll(final KeyValueStore<K, V> store, final List<K> keys) {
- final Map<K, V> map = new HashMap<>(keys.size());
-
- for (final K key : keys) {
- final V value = store.get(key);
-
- if (value != null) {
- map.put(key, value);
- }
- }
-
- return map;
- }
-
- /**
- * Deletes the mappings for the specified {@code keys} from this key-value store (if such mappings exist).
- *
- * @param store the key-value store for which this operation is to be performed.
- * @param keys the keys for which the mappings are to be deleted.
- * @param <K> the type of keys maintained by the specified {@code store}.
- * @param <V> the type of values maintained by the specified {@code store}.
- * @throws NullPointerException if the specified {@code keys} list, or any of the keys, is {@code null}.
- */
- public static <K, V> void deleteAll(final KeyValueStore<K, V> store, final List<K> keys) {
- for (final K key : keys) {
- store.delete(key);
- }
- }
- }
}
http://git-wip-us.apache.org/repos/asf/samza/blob/958edc42/samza-kv-inmemory/src/main/scala/org/apache/samza/storage/kv/inmemory/InMemoryKeyValueStore.scala
----------------------------------------------------------------------
diff --git a/samza-kv-inmemory/src/main/scala/org/apache/samza/storage/kv/inmemory/InMemoryKeyValueStore.scala b/samza-kv-inmemory/src/main/scala/org/apache/samza/storage/kv/inmemory/InMemoryKeyValueStore.scala
index 4c245b6..7b83163 100644
--- a/samza-kv-inmemory/src/main/scala/org/apache/samza/storage/kv/inmemory/InMemoryKeyValueStore.scala
+++ b/samza-kv-inmemory/src/main/scala/org/apache/samza/storage/kv/inmemory/InMemoryKeyValueStore.scala
@@ -81,10 +81,6 @@ class InMemoryKeyValueStore(val metrics: KeyValueStoreMetrics = new KeyValueStor
put(key, null)
}
- override def deleteAll(keys: java.util.List[Array[Byte]]) = {
- KeyValueStore.Extension.deleteAll(this, keys)
- }
-
override def putAll(entries: util.List[Entry[Array[Byte], Array[Byte]]]): Unit = {
// TreeMap's putAll requires a map, so we'd need to iterate over all the entries anyway
// to use it, in order to putAll here. Therefore, just iterate here.
@@ -116,8 +112,4 @@ class InMemoryKeyValueStore(val metrics: KeyValueStoreMetrics = new KeyValueStor
}
found
}
-
- override def getAll(keys: java.util.List[Array[Byte]]): java.util.Map[Array[Byte], Array[Byte]] = {
- KeyValueStore.Extension.getAll(this, keys);
- }
}
http://git-wip-us.apache.org/repos/asf/samza/blob/958edc42/samza-kv-rocksdb/src/main/scala/org/apache/samza/storage/kv/RocksDbKeyValueStore.scala
----------------------------------------------------------------------
diff --git a/samza-kv-rocksdb/src/main/scala/org/apache/samza/storage/kv/RocksDbKeyValueStore.scala b/samza-kv-rocksdb/src/main/scala/org/apache/samza/storage/kv/RocksDbKeyValueStore.scala
index 023e4a8..135cff9 100644
--- a/samza-kv-rocksdb/src/main/scala/org/apache/samza/storage/kv/RocksDbKeyValueStore.scala
+++ b/samza-kv-rocksdb/src/main/scala/org/apache/samza/storage/kv/RocksDbKeyValueStore.scala
@@ -20,89 +20,71 @@
package org.apache.samza.storage.kv
import java.io.File
+import java.util.concurrent.TimeUnit
+
import org.apache.samza.SamzaException
-import org.apache.samza.util.{ LexicographicComparator, Logging }
import org.apache.samza.config.Config
-import org.rocksdb._
-import org.rocksdb.TtlDB
+import org.apache.samza.util.{LexicographicComparator, Logging}
+import org.rocksdb.{TtlDB, _}
object RocksDbKeyValueStore extends Logging {
- def openDB(dir: File, options: Options, storeConfig: Config, isLoggedStore: Boolean, storeName: String, metrics: KeyValueStoreMetrics): RocksDB = {
+ def openDB(dir: File, options: Options, storeConfig: Config, isLoggedStore: Boolean,
+ storeName: String, metrics: KeyValueStoreMetrics): RocksDB = {
var ttl = 0L
var useTTL = false
- if (storeConfig.containsKey("rocksdb.ttl.ms"))
- {
- try
- {
+ if (storeConfig.containsKey("rocksdb.ttl.ms")) {
+ try {
ttl = storeConfig.getLong("rocksdb.ttl.ms")
- // RocksDB accepts TTL in seconds, convert ms to seconds
- if(ttl > 0) {
- if (ttl < 1000)
- {
- warn("The ttl values requested for %s is %d, which is less than 1000 (minimum), using 1000 instead",
- storeName,
- ttl)
+ if (ttl > 0) {
+ if (ttl < 1000) {
+ warn("The ttl value requested for %s is %d which is less than 1000 (minimum). " +
+ "Using 1000 ms instead.", storeName, ttl)
ttl = 1000
}
- ttl = ttl / 1000
- }
- else {
- warn("Non-positive TTL for RocksDB implies infinite TTL for the data. More Info -https://github.com/facebook/rocksdb/wiki/Time-to-Live")
+ ttl = TimeUnit.MILLISECONDS.toSeconds(ttl)
+ } else {
+ warn("Non-positive TTL for RocksDB implies infinite TTL for the data. " +
+ "More Info - https://github.com/facebook/rocksdb/wiki/Time-to-Live")
}
useTTL = true
- if (isLoggedStore)
- {
- warn("%s is a TTL based store, changelog is not supported for TTL based stores, use at your own discretion" format storeName)
+ if (isLoggedStore) {
+ warn("%s is a TTL based store. Changelog is not supported for TTL based stores. " +
+ "Use at your own discretion." format storeName)
}
+ } catch {
+ case nfe: NumberFormatException =>
+ throw new SamzaException("rocksdb.ttl.ms configuration value %s for store %s is not a number."
+ format (storeConfig.get("rocksdb.ttl.ms"), storeName), nfe)
}
- catch
- {
- case nfe: NumberFormatException => throw new SamzaException("rocksdb.ttl.ms configuration is not a number, " + "value found %s" format storeConfig.get(
- "rocksdb.ttl.ms"))
- }
}
- try
- {
+ try {
val rocksDb =
- if (useTTL)
- {
+ if (useTTL) {
info("Opening RocksDB store with TTL value: %s" format ttl)
TtlDB.open(options, dir.toString, ttl.toInt, false)
- }
- else
- {
+ } else {
RocksDB.open(options, dir.toString)
}
- if (storeConfig.containsKey("rocksdb.metrics.list"))
- {
+ if (storeConfig.containsKey("rocksdb.metrics.list")) {
storeConfig
.get("rocksdb.metrics.list")
.split(",")
.map(property => property.trim)
- .foreach(property =>
- metrics.newGauge(property, () => rocksDb.getProperty(property))
- )
+ .foreach(property => metrics.newGauge(property, () => rocksDb.getProperty(property)))
}
rocksDb
+ } catch {
+ case rocksDBException: RocksDBException =>
+ throw new SamzaException("Error opening RocksDB store %s at location %s" format (storeName, dir.toString),
+ rocksDBException)
}
- catch
- {
- case rocksDBException: RocksDBException =>
- {
- throw new SamzaException(
- "Error opening RocksDB store %s at location %s, received the following exception from RocksDB %s".format(
- storeName,
- dir.toString,
- rocksDBException))
- }
- }
}
}
@@ -187,10 +169,6 @@ class RocksDbKeyValueStore(
put(key, null)
}
- def deleteAll(keys: java.util.List[Array[Byte]]) = {
- KeyValueStore.Extension.deleteAll(this, keys)
- }
-
def range(from: Array[Byte], to: Array[Byte]): KeyValueIterator[Array[Byte], Array[Byte]] = {
metrics.ranges.inc
require(from != null && to != null, "Null bound not allowed.")
http://git-wip-us.apache.org/repos/asf/samza/blob/958edc42/samza-kv/src/main/scala/org/apache/samza/storage/kv/CachedStore.scala
----------------------------------------------------------------------
diff --git a/samza-kv/src/main/scala/org/apache/samza/storage/kv/CachedStore.scala b/samza-kv/src/main/scala/org/apache/samza/storage/kv/CachedStore.scala
index 44f96b4..d40999a 100644
--- a/samza-kv/src/main/scala/org/apache/samza/storage/kv/CachedStore.scala
+++ b/samza-kv/src/main/scala/org/apache/samza/storage/kv/CachedStore.scala
@@ -244,7 +244,7 @@ class CachedStore[K, V](
override def deleteAll(keys: java.util.List[K]) = {
lock.synchronized({
- KeyValueStore.Extension.deleteAll(this, keys)
+ super.deleteAll(keys)
})
}
http://git-wip-us.apache.org/repos/asf/samza/blob/958edc42/samza-kv/src/test/scala/org/apache/samza/storage/kv/MockKeyValueStore.scala
----------------------------------------------------------------------
diff --git a/samza-kv/src/test/scala/org/apache/samza/storage/kv/MockKeyValueStore.scala b/samza-kv/src/test/scala/org/apache/samza/storage/kv/MockKeyValueStore.scala
index f57b275..f66dc04 100644
--- a/samza-kv/src/test/scala/org/apache/samza/storage/kv/MockKeyValueStore.scala
+++ b/samza-kv/src/test/scala/org/apache/samza/storage/kv/MockKeyValueStore.scala
@@ -69,12 +69,4 @@ class MockKeyValueStore extends KeyValueStore[String, String] {
override def flush() {} // no-op
override def close() { kvMap.clear() }
-
- override def deleteAll(keys: java.util.List[String]) {
- KeyValueStore.Extension.deleteAll(this, keys)
- }
-
- override def getAll(keys: java.util.List[String]): java.util.Map[String, String] = {
- KeyValueStore.Extension.getAll(this, keys)
- }
}
\ No newline at end of file