You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by pm...@apache.org on 2017/10/19 18:36:00 UTC

samza git commit: Minor fixes to KeyValueStore and RocksDBKeyValueStore

Repository: samza
Updated Branches:
  refs/heads/master 62904334b -> 958edc42f


Minor fixes to KeyValueStore and RocksDBKeyValueStore

1. Replaced extension class in KeyValueStore with default methods.
2. Fixed formatting in RocksDBKeyValueStore#openDB.
3. Now logs original RocksDBException on errors opening the db. Other minor log message cleanup.

Author: Prateek Maheshwari <pm...@linkedin.com>

Reviewers: Jacob Maes <jm...@apache.org>

Closes #332 from prateekm/store-fixes


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/958edc42
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/958edc42
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/958edc42

Branch: refs/heads/master
Commit: 958edc42f59a1891e14276251462236abd2af5b9
Parents: 6290433
Author: Prateek Maheshwari <pm...@linkedin.com>
Authored: Thu Oct 19 11:36:02 2017 -0700
Committer: Prateek Maheshwari <pm...@linkedin.com>
Committed: Thu Oct 19 11:36:02 2017 -0700

----------------------------------------------------------------------
 .../apache/samza/storage/kv/KeyValueStore.java  | 69 ++++------------
 .../kv/inmemory/InMemoryKeyValueStore.scala     |  8 --
 .../samza/storage/kv/RocksDbKeyValueStore.scala | 86 ++++++++------------
 .../apache/samza/storage/kv/CachedStore.scala   |  2 +-
 .../samza/storage/kv/MockKeyValueStore.scala    |  8 --
 5 files changed, 51 insertions(+), 122 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/958edc42/samza-api/src/main/java/org/apache/samza/storage/kv/KeyValueStore.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/storage/kv/KeyValueStore.java b/samza-api/src/main/java/org/apache/samza/storage/kv/KeyValueStore.java
index b1fea7b..18a89ec 100644
--- a/samza-api/src/main/java/org/apache/samza/storage/kv/KeyValueStore.java
+++ b/samza-api/src/main/java/org/apache/samza/storage/kv/KeyValueStore.java
@@ -46,7 +46,19 @@ public interface KeyValueStore<K, V> {
    * @return a map of the keys that were found and their respective values.
    * @throws NullPointerException if the specified {@code keys} list, or any of the keys, is {@code null}.
    */
-  Map<K, V> getAll(List<K> keys);
+  default Map<K, V> getAll(List<K> keys) {
+    Map<K, V> map = new HashMap<>(keys.size());
+
+    for (K key : keys) {
+      V value = get(key);
+
+      if (value != null) {
+        map.put(key, value);
+      }
+    }
+
+    return map;
+  }
 
   /**
    * Updates the mapping of the specified key-value pair; Associates the specified {@code key} with the specified {@code value}.
@@ -79,7 +91,11 @@ public interface KeyValueStore<K, V> {
    * @param keys the keys for which the mappings are to be deleted.
    * @throws NullPointerException if the specified {@code keys} list, or any of the keys, is {@code null}.
    */
-  void deleteAll(List<K> keys);
+  default void deleteAll(List<K> keys) {
+    for (K key : keys) {
+      delete(key);
+    }
+  }
 
   /**
    * Returns an iterator for a sorted range of entries specified by [{@code from}, {@code to}).
@@ -111,53 +127,4 @@ public interface KeyValueStore<K, V> {
    * Flushes this key-value store, if applicable.
    */
   void flush();
-
-  /**
-   * Represents an extension for classes that implement {@link KeyValueStore}.
-   */
-  // TODO replace with default interface methods when we can use Java 8 features.
-  class Extension {
-    private Extension() {
-      // This class cannot be instantiated
-    }
-
-    /**
-     * Gets the values with which the specified {@code keys} are associated.
-     *
-     * @param store the key-value store for which this operation is to be performed.
-     * @param keys the keys with which the associated values are to be fetched.
-     * @param <K> the type of keys maintained by the specified {@code store}.
-     * @param <V> the type of values maintained by the specified {@code store}.
-     * @return a map of the keys that were found and their respective values.
-     * @throws NullPointerException if the specified {@code keys} list, or any of the keys, is {@code null}.
-     */
-    public static <K, V> Map<K, V> getAll(final KeyValueStore<K, V> store, final List<K> keys) {
-      final Map<K, V> map = new HashMap<>(keys.size());
-
-      for (final K key : keys) {
-        final V value = store.get(key);
-
-        if (value != null) {
-          map.put(key, value);
-        }
-      }
-
-      return map;
-    }
-
-    /**
-     * Deletes the mappings for the specified {@code keys} from this key-value store (if such mappings exist).
-     *
-     * @param store the key-value store for which this operation is to be performed.
-     * @param keys the keys for which the mappings are to be deleted.
-     * @param <K> the type of keys maintained by the specified {@code store}.
-     * @param <V> the type of values maintained by the specified {@code store}.
-     * @throws NullPointerException if the specified {@code keys} list, or any of the keys, is {@code null}.
-     */
-    public static <K, V> void deleteAll(final KeyValueStore<K, V> store, final List<K> keys) {
-      for (final K key : keys) {
-        store.delete(key);
-      }
-    }
-  }
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/958edc42/samza-kv-inmemory/src/main/scala/org/apache/samza/storage/kv/inmemory/InMemoryKeyValueStore.scala
----------------------------------------------------------------------
diff --git a/samza-kv-inmemory/src/main/scala/org/apache/samza/storage/kv/inmemory/InMemoryKeyValueStore.scala b/samza-kv-inmemory/src/main/scala/org/apache/samza/storage/kv/inmemory/InMemoryKeyValueStore.scala
index 4c245b6..7b83163 100644
--- a/samza-kv-inmemory/src/main/scala/org/apache/samza/storage/kv/inmemory/InMemoryKeyValueStore.scala
+++ b/samza-kv-inmemory/src/main/scala/org/apache/samza/storage/kv/inmemory/InMemoryKeyValueStore.scala
@@ -81,10 +81,6 @@ class InMemoryKeyValueStore(val metrics: KeyValueStoreMetrics = new KeyValueStor
     put(key, null)
   }
 
-  override def deleteAll(keys: java.util.List[Array[Byte]]) = {
-    KeyValueStore.Extension.deleteAll(this, keys)
-  }
-
   override def putAll(entries: util.List[Entry[Array[Byte], Array[Byte]]]): Unit = {
     // TreeMap's putAll requires a map, so we'd need to iterate over all the entries anyway
     // to use it, in order to putAll here.  Therefore, just iterate here.
@@ -116,8 +112,4 @@ class InMemoryKeyValueStore(val metrics: KeyValueStoreMetrics = new KeyValueStor
     }
     found
   }
-
-  override def getAll(keys: java.util.List[Array[Byte]]): java.util.Map[Array[Byte], Array[Byte]] = {
-    KeyValueStore.Extension.getAll(this, keys);
-  }
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/958edc42/samza-kv-rocksdb/src/main/scala/org/apache/samza/storage/kv/RocksDbKeyValueStore.scala
----------------------------------------------------------------------
diff --git a/samza-kv-rocksdb/src/main/scala/org/apache/samza/storage/kv/RocksDbKeyValueStore.scala b/samza-kv-rocksdb/src/main/scala/org/apache/samza/storage/kv/RocksDbKeyValueStore.scala
index 023e4a8..135cff9 100644
--- a/samza-kv-rocksdb/src/main/scala/org/apache/samza/storage/kv/RocksDbKeyValueStore.scala
+++ b/samza-kv-rocksdb/src/main/scala/org/apache/samza/storage/kv/RocksDbKeyValueStore.scala
@@ -20,89 +20,71 @@
 package org.apache.samza.storage.kv
 
 import java.io.File
+import java.util.concurrent.TimeUnit
+
 import org.apache.samza.SamzaException
-import org.apache.samza.util.{ LexicographicComparator, Logging }
 import org.apache.samza.config.Config
-import org.rocksdb._
-import org.rocksdb.TtlDB
+import org.apache.samza.util.{LexicographicComparator, Logging}
+import org.rocksdb.{TtlDB, _}
 
 object RocksDbKeyValueStore extends Logging {
 
-  def openDB(dir: File, options: Options, storeConfig: Config, isLoggedStore: Boolean, storeName: String, metrics: KeyValueStoreMetrics): RocksDB = {
+  def openDB(dir: File, options: Options, storeConfig: Config, isLoggedStore: Boolean,
+             storeName: String, metrics: KeyValueStoreMetrics): RocksDB = {
     var ttl = 0L
     var useTTL = false
 
-    if (storeConfig.containsKey("rocksdb.ttl.ms"))
-    {
-      try
-      {
+    if (storeConfig.containsKey("rocksdb.ttl.ms")) {
+      try {
         ttl = storeConfig.getLong("rocksdb.ttl.ms")
 
-        // RocksDB accepts TTL in seconds, convert ms to seconds
-        if(ttl > 0) {
-          if (ttl < 1000)
-          {
-            warn("The ttl values requested for %s is %d, which is less than 1000 (minimum), using 1000 instead",
-              storeName,
-              ttl)
+        if (ttl > 0) {
+          if (ttl < 1000) {
+            warn("The ttl value requested for %s is %d which is less than 1000 (minimum). " +
+              "Using 1000 ms instead.", storeName, ttl)
             ttl = 1000
           }
-          ttl = ttl / 1000
-        }
-        else {
-          warn("Non-positive TTL for RocksDB implies infinite TTL for the data. More Info -https://github.com/facebook/rocksdb/wiki/Time-to-Live")
+          ttl = TimeUnit.MILLISECONDS.toSeconds(ttl)
+        } else {
+          warn("Non-positive TTL for RocksDB implies infinite TTL for the data. " +
+            "More Info - https://github.com/facebook/rocksdb/wiki/Time-to-Live")
         }
 
         useTTL = true
-        if (isLoggedStore)
-        {
-          warn("%s is a TTL based store, changelog is not supported for TTL based stores, use at your own discretion" format storeName)
+        if (isLoggedStore) {
+          warn("%s is a TTL based store. Changelog is not supported for TTL based stores. " +
+            "Use at your own discretion." format storeName)
         }
+      } catch {
+        case nfe: NumberFormatException =>
+          throw new SamzaException("rocksdb.ttl.ms configuration value %s for store %s is not a number."
+            format (storeConfig.get("rocksdb.ttl.ms"), storeName), nfe)
       }
-      catch
-        {
-          case nfe: NumberFormatException => throw new SamzaException("rocksdb.ttl.ms configuration is not a number, " + "value found %s" format storeConfig.get(
-            "rocksdb.ttl.ms"))
-        }
     }
 
-    try
-    {
+    try {
       val rocksDb =
-        if (useTTL)
-        {
+        if (useTTL) {
           info("Opening RocksDB store with TTL value: %s" format ttl)
           TtlDB.open(options, dir.toString, ttl.toInt, false)
-        }
-        else
-        {
+        } else {
           RocksDB.open(options, dir.toString)
         }
 
-      if (storeConfig.containsKey("rocksdb.metrics.list"))
-      {
+      if (storeConfig.containsKey("rocksdb.metrics.list")) {
         storeConfig
           .get("rocksdb.metrics.list")
           .split(",")
           .map(property => property.trim)
-          .foreach(property =>
-            metrics.newGauge(property, () => rocksDb.getProperty(property))
-          )
+          .foreach(property => metrics.newGauge(property, () => rocksDb.getProperty(property)))
       }
 
       rocksDb
+    } catch {
+      case rocksDBException: RocksDBException =>
+        throw new SamzaException("Error opening RocksDB store %s at location %s" format (storeName, dir.toString),
+          rocksDBException)
     }
-    catch
-      {
-        case rocksDBException: RocksDBException =>
-        {
-          throw new SamzaException(
-            "Error opening RocksDB store %s at location %s, received the following exception from RocksDB %s".format(
-              storeName,
-              dir.toString,
-              rocksDBException))
-        }
-      }
   }
 }
 
@@ -187,10 +169,6 @@ class RocksDbKeyValueStore(
     put(key, null)
   }
 
-  def deleteAll(keys: java.util.List[Array[Byte]]) = {
-    KeyValueStore.Extension.deleteAll(this, keys)
-  }
-
   def range(from: Array[Byte], to: Array[Byte]): KeyValueIterator[Array[Byte], Array[Byte]] = {
     metrics.ranges.inc
     require(from != null && to != null, "Null bound not allowed.")

http://git-wip-us.apache.org/repos/asf/samza/blob/958edc42/samza-kv/src/main/scala/org/apache/samza/storage/kv/CachedStore.scala
----------------------------------------------------------------------
diff --git a/samza-kv/src/main/scala/org/apache/samza/storage/kv/CachedStore.scala b/samza-kv/src/main/scala/org/apache/samza/storage/kv/CachedStore.scala
index 44f96b4..d40999a 100644
--- a/samza-kv/src/main/scala/org/apache/samza/storage/kv/CachedStore.scala
+++ b/samza-kv/src/main/scala/org/apache/samza/storage/kv/CachedStore.scala
@@ -244,7 +244,7 @@ class CachedStore[K, V](
 
   override def deleteAll(keys: java.util.List[K]) = {
     lock.synchronized({
-      KeyValueStore.Extension.deleteAll(this, keys)
+      super.deleteAll(keys)
     })
   }
 

http://git-wip-us.apache.org/repos/asf/samza/blob/958edc42/samza-kv/src/test/scala/org/apache/samza/storage/kv/MockKeyValueStore.scala
----------------------------------------------------------------------
diff --git a/samza-kv/src/test/scala/org/apache/samza/storage/kv/MockKeyValueStore.scala b/samza-kv/src/test/scala/org/apache/samza/storage/kv/MockKeyValueStore.scala
index f57b275..f66dc04 100644
--- a/samza-kv/src/test/scala/org/apache/samza/storage/kv/MockKeyValueStore.scala
+++ b/samza-kv/src/test/scala/org/apache/samza/storage/kv/MockKeyValueStore.scala
@@ -69,12 +69,4 @@ class MockKeyValueStore extends KeyValueStore[String, String] {
   override def flush() {}  // no-op
 
   override def close() { kvMap.clear() }
-
-  override def deleteAll(keys: java.util.List[String]) {
-    KeyValueStore.Extension.deleteAll(this, keys)
-  }
-
-  override def getAll(keys: java.util.List[String]): java.util.Map[String, String] = {
-    KeyValueStore.Extension.getAll(this, keys)
-  }
 }
\ No newline at end of file