You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by ni...@apache.org on 2016/02/02 01:35:29 UTC

[13/50] [abbrv] samza git commit: SAMZA-626 - Read KV state store for a running job

SAMZA-626 - Read KV state store for a running job


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/07c69840
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/07c69840
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/07c69840

Branch: refs/heads/samza-sql
Commit: 07c6984080558f0523fa903d567b35239c421037
Parents: 6f23832
Author: Yan Fang <ya...@gmail.com>
Authored: Mon Oct 26 13:52:38 2015 -0700
Committer: Navina <na...@gmail.com>
Committed: Mon Oct 26 13:56:06 2015 -0700

----------------------------------------------------------------------
 build.gradle                                    |   3 +
 .../versioned/container/state-management.md     |  32 ++++
 .../samza/config/JavaSerializerConfig.java      |  54 +++++++
 .../apache/samza/config/JavaStorageConfig.java  |  12 +-
 .../apache/samza/container/SamzaContainer.scala |  24 +--
 .../main/scala/org/apache/samza/util/Util.scala |  23 ++-
 .../samza/container/TestSamzaContainer.scala    |  22 ---
 .../scala/org/apache/samza/util/TestUtil.scala  |  26 +++-
 .../samza/storage/kv/RocksDbKeyValueReader.java | 129 ++++++++++++++++
 .../samza/storage/kv/RocksDbOptionsHelper.java  |  99 ++++++++++++
 .../samza/storage/kv/RocksDbReadingTool.java    | 153 +++++++++++++++++++
 .../RocksDbKeyValueStorageEngineFactory.scala   |   2 +-
 .../samza/storage/kv/RocksDbKeyValueStore.scala |  43 ------
 .../storage/kv/TestRocksDbKeyValueReader.java   | 130 ++++++++++++++++
 samza-shell/src/main/bash/read-rocksdb-tool.sh  |  21 +++
 15 files changed, 678 insertions(+), 95 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/07c69840/build.gradle
----------------------------------------------------------------------
diff --git a/build.gradle b/build.gradle
index 682d4f8..0541def 100644
--- a/build.gradle
+++ b/build.gradle
@@ -428,6 +428,9 @@ project(":samza-kv-inmemory_$scalaVersion") {
 project(":samza-kv-rocksdb_$scalaVersion") {
   apply plugin: 'scala'
 
+  sourceSets.test.scala.srcDir "src/test/java"
+  sourceSets.test.java.srcDirs = []
+
   dependencies {
     compile project(':samza-api')
     compile project(":samza-core_$scalaVersion")

http://git-wip-us.apache.org/repos/asf/samza/blob/07c69840/docs/learn/documentation/versioned/container/state-management.md
----------------------------------------------------------------------
diff --git a/docs/learn/documentation/versioned/container/state-management.md b/docs/learn/documentation/versioned/container/state-management.md
index 50d4b65..320392c 100644
--- a/docs/learn/documentation/versioned/container/state-management.md
+++ b/docs/learn/documentation/versioned/container/state-management.md
@@ -186,6 +186,8 @@ Additional configuration properties for the key-value store are documented in th
 
 ### Debug Key-value storage
 
+#### Materialize a state store from the changelog
+
 Currently Samza provides a state storage tool which can recover the state store from the changelog stream to user-specified directory for reusing and debugging.
 
 {% highlight bash %}
@@ -194,6 +196,36 @@ samza-example/target/bin/state-storage-tool.sh \
   --path=directory/to/put/state/stores
 {% endhighlight %}
 
+#### Read the value from a running RocksDB
+
+Samza also provides a tool to read the value from a running job's RocksDB.
+
+{% highlight bash %}
+samza-example/target/bin/read-rocksdb-tool.sh \
+  --config-path=file:///path/to/job/config.properties \
+  --db-path=/tmp/nm-local-dir/state/test-state/Partition_0 \
+  --db-name=test-state \
+  --string-key=a,b,c
+{% endhighlight %}
+
+* `--config-path`(required): your job's configuration file
+* `--db-path`(required): the location of your RocksDB. This is convenient if the RocksDB is in the same machine as the tool. E.g. if you are running hello-samza in your local machine, the location maybe in 
+_/tmp/hadoop/nm-local-dir/usercache/username/appcache/applicationId/containerId/state/storeName/PartitionNumber_
+* `--db-name`(required): if you only have one state store specified in the config file, you can ignore this one. Otherwise, you need to provide the state store name here.
+* `--string-key`: the key list. This one only works if your keys are string. There are also another two options: `--integer-key`, `--long-key`. They work for integer keys and long keys respectively.
+
+**Limitations**:
+
+* This only works with three kinds of keys: string, integer and long. This is because we can only accept those kinds of keys from the command line (it is really tricky to accept bytes, avro, json, etc from the command line). But it is also easy to use this tool programmatically (The key and value both are deserialized.)
+{% highlight bash %}
+RocksDbKeyValueReader kvReader = new RocksDbKeyValueReader(dbName, pathOfdb, config)
+Object value = kvReader.get(key)
+{% endhighlight %}
+
+
+* Because Samza job has some caches and buffers, you may not be able to see expected values (or even not be able to see any value, if all the data is buffered). Some of the related configuration are `stores.store-name.container.write.buffer.size.bytes`, `stores.store-name.write.batch.size`, `stores.store-name.object.cache.size`. You may want to set them to very small for testing.
+* Since RocksDB memtable is not flushed to disk immediately on every write, you may not be able to see the expected values until it is written to the SST file on disk. For more details on RocksDb, you can refer the docs [here](https://github.com/facebook/rocksdb/wiki/RocksDB-Basics).
+
 #### Known Issues
 
 RocksDB has several rough edges. It's recommended that you read the RocksDB [tuning guide](https://github.com/facebook/rocksdb/wiki/RocksDB-Tuning-Guide). Some other notes to be aware of are:

http://git-wip-us.apache.org/repos/asf/samza/blob/07c69840/samza-core/src/main/java/org/apache/samza/config/JavaSerializerConfig.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/config/JavaSerializerConfig.java b/samza-core/src/main/java/org/apache/samza/config/JavaSerializerConfig.java
new file mode 100644
index 0000000..7db3e1c
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/config/JavaSerializerConfig.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.config;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * java version of the SerializerConfig
+ */
+public class JavaSerializerConfig extends MapConfig {
+  private final static String SERIALIZER_PREFIX = "serializers.registry.%s";
+  private final static String SERDE = "serializers.registry.%s.class";
+
+  public JavaSerializerConfig(Config config) {
+    super(config);
+  }
+
+  public String getSerdeClass(String name) {
+    return get(String.format(SERDE, name), null);
+  }
+
+  /**
+   * Returns a list of all serializer names from the config file. Useful for
+   * getting individual serializers.
+   */
+  public List<String> getSerdeNames() {
+    List<String> results = new ArrayList<String>();
+    Config subConfig = subset(String.format(SERIALIZER_PREFIX, ""), true);
+    for (String key : subConfig.keySet()) {
+      if (key.endsWith(".class")) {
+        results.add(key.replace(".class", ""));
+      }
+    }
+    return results;
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/07c69840/samza-core/src/main/java/org/apache/samza/config/JavaStorageConfig.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/config/JavaStorageConfig.java b/samza-core/src/main/java/org/apache/samza/config/JavaStorageConfig.java
index af7d4ca..4ac689e 100644
--- a/samza-core/src/main/java/org/apache/samza/config/JavaStorageConfig.java
+++ b/samza-core/src/main/java/org/apache/samza/config/JavaStorageConfig.java
@@ -29,6 +29,10 @@ public class JavaStorageConfig extends MapConfig {
 
   private static final String FACTORY_SUFFIX = ".factory";
   private static final String STORE_PREFIX = "stores.";
+  private static final String FACTORY = "stores.%s.factory";
+  private static final String KEY_SERDE = "stores.%s.key.serde";
+  private static final String MSG_SERDE = "stores.%s.msg.serde";
+  private static final String CHANGELOG_STREAM = "stores.%s.changelog";
 
   public JavaStorageConfig(Config config) {
     super(config);
@@ -46,18 +50,18 @@ public class JavaStorageConfig extends MapConfig {
   }
 
   public String getChangelogStream(String storeName) {
-    return get(String.format(StorageConfig.CHANGELOG_STREAM(), storeName), null);
+    return get(String.format(CHANGELOG_STREAM, storeName), null);
   }
 
   public String getStorageFactoryClassName(String storeName) {
-    return get(String.format(StorageConfig.FACTORY(), storeName), null);
+    return get(String.format(FACTORY, storeName), null);
   }
 
   public String getStorageKeySerde(String storeName) {
-    return get(String.format(StorageConfig.KEY_SERDE(), storeName), null);
+    return get(String.format(KEY_SERDE, storeName), null);
   }
 
   public String getStorageMsgSerde(String storeName) {
-    return get(String.format(StorageConfig.MSG_SERDE(), storeName), null);
+    return get(String.format(MSG_SERDE, storeName), null);
   }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/samza/blob/07c69840/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
index f351ad6..0b73403 100644
--- a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
+++ b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
@@ -61,7 +61,6 @@ import org.apache.samza.job.model.{TaskModel, ContainerModel, JobModel}
 import org.apache.samza.serializers.model.SamzaObjectMapper
 import org.apache.samza.config.JobConfig.Config2Job
 import java.lang.Thread.UncaughtExceptionHandler
-import org.apache.samza.serializers._
 import org.apache.samza.checkpoint.OffsetManagerMetrics
 
 object SamzaContainer extends Logging {
@@ -112,27 +111,6 @@ object SamzaContainer extends Logging {
       .readValue(Util.read(new URL(url)), classOf[JobModel])
   }
 
-  /**
-   * A helper function which returns system's default serde factory class according to the
-   * serde name. If not found, throw exception.
-   */
-  def defaultSerdeFactoryFromSerdeName(serdeName: String) = {
-    info("looking for default serdes")
-
-    val serde = serdeName match {
-      case "byte" => classOf[ByteSerdeFactory].getCanonicalName
-      case "bytebuffer" => classOf[ByteBufferSerdeFactory].getCanonicalName
-      case "integer" => classOf[IntegerSerdeFactory].getCanonicalName
-      case "json" => classOf[JsonSerdeFactory].getCanonicalName
-      case "long" => classOf[LongSerdeFactory].getCanonicalName
-      case "serializable" => classOf[SerializableSerdeFactory[java.io.Serializable]].getCanonicalName
-      case "string" => classOf[StringSerdeFactory].getCanonicalName
-      case _ => throw new SamzaException("No class defined for serde %s" format serdeName)
-    }
-    info("use default serde %s for %s" format (serde, serdeName))
-    serde
-  }
-
   def apply(containerModel: ContainerModel, jobModel: JobModel, jmxServer: JmxServer) = {
     val config = jobModel.getConfig
     val containerId = containerModel.getContainerId
@@ -232,7 +210,7 @@ object SamzaContainer extends Logging {
     val serdes = serdeNames.map(serdeName => {
       val serdeClassName = config
         .getSerdeClass(serdeName)
-        .getOrElse(defaultSerdeFactoryFromSerdeName(serdeName))
+        .getOrElse(Util.defaultSerdeFactoryFromSerdeName(serdeName))
 
       val serde = Util.getObj[SerdeFactory[Object]](serdeClassName)
         .getSerde(serdeName, config)

http://git-wip-us.apache.org/repos/asf/samza/blob/07c69840/samza-core/src/main/scala/org/apache/samza/util/Util.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/util/Util.scala b/samza-core/src/main/scala/org/apache/samza/util/Util.scala
index 948c19a..58fbb8f 100644
--- a/samza-core/src/main/scala/org/apache/samza/util/Util.scala
+++ b/samza-core/src/main/scala/org/apache/samza/util/Util.scala
@@ -36,7 +36,7 @@ import scala.collection.JavaConversions._
 import org.apache.samza.config.JobConfig
 import java.io.InputStreamReader
 import scala.collection.immutable.Map
-import scala.util.control.Breaks._
+import org.apache.samza.serializers._
 
 object Util extends Logging {
   val random = new Random
@@ -326,4 +326,25 @@ object Util extends Logging {
     }
     localHost
   }
+
+  /**
+   * A helper function which returns system's default serde factory class according to the
+   * serde name. If not found, throw exception.
+   */
+  def defaultSerdeFactoryFromSerdeName(serdeName: String) = {
+    info("looking for default serdes")
+
+    val serde = serdeName match {
+      case "byte" => classOf[ByteSerdeFactory].getCanonicalName
+      case "bytebuffer" => classOf[ByteBufferSerdeFactory].getCanonicalName
+      case "integer" => classOf[IntegerSerdeFactory].getCanonicalName
+      case "json" => classOf[JsonSerdeFactory].getCanonicalName
+      case "long" => classOf[LongSerdeFactory].getCanonicalName
+      case "serializable" => classOf[SerializableSerdeFactory[java.io.Serializable]].getCanonicalName
+      case "string" => classOf[StringSerdeFactory].getCanonicalName
+      case _ => throw new SamzaException("No class defined for serde %s" format serdeName)
+    }
+    info("use default serde %s for %s" format (serde, serdeName))
+    serde
+  }
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/07c69840/samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala b/samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala
index 6de8710..a77ddc7 100644
--- a/samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala
+++ b/samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala
@@ -202,28 +202,6 @@ class TestSamzaContainer extends AssertionsForJUnit {
     t.join
     assertTrue(caughtException)
   }
-
-  @Test
-  def testDefaultSerdeFactoryFromSerdeName {
-    import SamzaContainer._
-    val config = new MapConfig
-    assertEquals(classOf[ByteSerdeFactory].getName, defaultSerdeFactoryFromSerdeName("byte"))
-    assertEquals(classOf[IntegerSerdeFactory].getName, defaultSerdeFactoryFromSerdeName("integer"))
-    assertEquals(classOf[JsonSerdeFactory].getName, defaultSerdeFactoryFromSerdeName("json"))
-    assertEquals(classOf[LongSerdeFactory].getName, defaultSerdeFactoryFromSerdeName("long"))
-    assertEquals(classOf[SerializableSerdeFactory[java.io.Serializable@unchecked]].getName, defaultSerdeFactoryFromSerdeName("serializable"))
-    assertEquals(classOf[StringSerdeFactory].getName, defaultSerdeFactoryFromSerdeName("string"))
-
-    // throw SamzaException if can not find the correct serde
-    var throwSamzaException = false
-    try {
-      defaultSerdeFactoryFromSerdeName("otherName")
-    } catch {
-      case e: SamzaException => throwSamzaException = true
-      case _: Exception =>
-    }
-    assertTrue(throwSamzaException)
-  }
 }
 
 class MockCheckpointManager extends CheckpointManager(null, null, "Unknown") {

http://git-wip-us.apache.org/repos/asf/samza/blob/07c69840/samza-core/src/test/scala/org/apache/samza/util/TestUtil.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/test/scala/org/apache/samza/util/TestUtil.scala b/samza-core/src/test/scala/org/apache/samza/util/TestUtil.scala
index d167263..05b4e5c 100644
--- a/samza-core/src/test/scala/org/apache/samza/util/TestUtil.scala
+++ b/samza-core/src/test/scala/org/apache/samza/util/TestUtil.scala
@@ -20,9 +20,11 @@
 package org.apache.samza.util
 
 import java.io._
-import java.net.InetAddress
 import org.junit.Assert._
 import org.junit.Test
+import org.apache.samza.config.MapConfig
+import org.apache.samza.serializers._
+import org.apache.samza.SamzaException
 
 class TestUtil {
 
@@ -69,4 +71,26 @@ class TestUtil {
   def testGetLocalHost(): Unit = {
     assertNotNull(Util.getLocalHost)
   }
+
+  @Test
+  def testDefaultSerdeFactoryFromSerdeName {
+    import Util._
+    val config = new MapConfig
+    assertEquals(classOf[ByteSerdeFactory].getName, defaultSerdeFactoryFromSerdeName("byte"))
+    assertEquals(classOf[IntegerSerdeFactory].getName, defaultSerdeFactoryFromSerdeName("integer"))
+    assertEquals(classOf[JsonSerdeFactory].getName, defaultSerdeFactoryFromSerdeName("json"))
+    assertEquals(classOf[LongSerdeFactory].getName, defaultSerdeFactoryFromSerdeName("long"))
+    assertEquals(classOf[SerializableSerdeFactory[java.io.Serializable@unchecked]].getName, defaultSerdeFactoryFromSerdeName("serializable"))
+    assertEquals(classOf[StringSerdeFactory].getName, defaultSerdeFactoryFromSerdeName("string"))
+
+    // throw SamzaException if can not find the correct serde
+    var throwSamzaException = false
+    try {
+      defaultSerdeFactoryFromSerdeName("otherName")
+    } catch {
+      case e: SamzaException => throwSamzaException = true
+      case _: Exception =>
+    }
+    assertTrue(throwSamzaException)
+  }
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/07c69840/samza-kv-rocksdb/src/main/java/org/apache/samza/storage/kv/RocksDbKeyValueReader.java
----------------------------------------------------------------------
diff --git a/samza-kv-rocksdb/src/main/java/org/apache/samza/storage/kv/RocksDbKeyValueReader.java b/samza-kv-rocksdb/src/main/java/org/apache/samza/storage/kv/RocksDbKeyValueReader.java
new file mode 100644
index 0000000..6dcb407
--- /dev/null
+++ b/samza-kv-rocksdb/src/main/java/org/apache/samza/storage/kv/RocksDbKeyValueReader.java
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.storage.kv;
+
+import java.util.ArrayList;
+
+import org.apache.samza.SamzaException;
+import org.apache.samza.config.Config;
+import org.apache.samza.config.JavaSerializerConfig;
+import org.apache.samza.config.JavaStorageConfig;
+import org.apache.samza.container.SamzaContainerContext;
+import org.apache.samza.container.TaskName;
+import org.apache.samza.serializers.Serde;
+import org.apache.samza.serializers.SerdeFactory;
+import org.apache.samza.util.Util;
+import org.rocksdb.Options;
+import org.rocksdb.RocksDB;
+import org.rocksdb.RocksDBException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This class is to read the RocksDb according to the provided directory
+ * position
+ */
+public class RocksDbKeyValueReader {
+  private static final Logger log = LoggerFactory.getLogger(RocksDbKeyValueReader.class);
+  private RocksDB db;
+  private Serde<Object> keySerde;
+  private Serde<Object> valueSerde;
+
+  /**
+   * Construct the <code>RocksDbKeyValueReader</code> with store's name,
+   * database's path and Samza's config
+   *
+   * @param storeName name of the RocksDb defined in the config file
+   * @param dbPath path to the db directory
+   * @param config Samza's config
+   */
+  public RocksDbKeyValueReader(String storeName, String dbPath, Config config) {
+    // get the key serde and value serde from the config
+    JavaStorageConfig storageConfig = new JavaStorageConfig(config);
+    JavaSerializerConfig serializerConfig = new JavaSerializerConfig(config);
+
+    keySerde = getSerdeFromName(storageConfig.getStorageKeySerde(storeName), serializerConfig);
+    valueSerde = getSerdeFromName(storageConfig.getStorageMsgSerde(storeName), serializerConfig);
+
+    // get db options
+    ArrayList<TaskName> taskNameList = new ArrayList<TaskName>();
+    taskNameList.add(new TaskName("read-rocks-db"));
+    SamzaContainerContext samzaContainerContext =
+        new SamzaContainerContext(0,  config, taskNameList);
+    Options options = RocksDbOptionsHelper.options(config, samzaContainerContext);
+
+    // open the db
+    RocksDB.loadLibrary();
+    try {
+      db = RocksDB.openReadOnly(options, dbPath);
+    } catch (RocksDBException e) {
+      throw new SamzaException("can not open the rocksDb in " + dbPath, e);
+    }
+  }
+
+  /**
+   * get the value from the key. This key will be serialized to bytes using the
+   * serde defined in <i>systems.system-name.samza.key.serde</i>. The result
+   * will be deserialized back to the object using the serde in
+   * <i>systems.system-name.samza.msg.serde</i>. If the value does not exist in
+   * the db, it return null.
+   *
+   * @param key the key of the value you want to get
+   */
+  public Object get(Object key) {
+    byte[] byteKey = keySerde.toBytes(key);
+    byte[] result = null;
+    try {
+      result = db.get(byteKey);
+    } catch (RocksDBException e) {
+      log.error("can not get the value for key: " + key);
+    }
+
+    if (result == null) {
+      log.info(key + " does not exist in the rocksDb");
+      return null;
+    } else {
+      return valueSerde.fromBytes(result);
+    }
+  }
+
+  public void stop() {
+    log.debug("closing the db");
+    if (db != null) {
+      db.close();
+    }
+    log.info("db is closed.");
+  }
+
+  /**
+   * A helper method to get the Serde from the serdeName
+   *
+   * @param name serde name
+   * @param serializerConfig serializer config
+   * @return a Serde of this serde name
+   */
+  private Serde<Object> getSerdeFromName(String name, JavaSerializerConfig serializerConfig) {
+    String serdeClassName = serializerConfig.getSerdeClass(name);
+    if (serdeClassName == null) {
+      serdeClassName = Util.defaultSerdeFactoryFromSerdeName(name);
+    }
+    return Util.<SerdeFactory<Object>> getObj(serdeClassName).getSerde(name, serializerConfig);
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/07c69840/samza-kv-rocksdb/src/main/java/org/apache/samza/storage/kv/RocksDbOptionsHelper.java
----------------------------------------------------------------------
diff --git a/samza-kv-rocksdb/src/main/java/org/apache/samza/storage/kv/RocksDbOptionsHelper.java b/samza-kv-rocksdb/src/main/java/org/apache/samza/storage/kv/RocksDbOptionsHelper.java
new file mode 100644
index 0000000..e474231
--- /dev/null
+++ b/samza-kv-rocksdb/src/main/java/org/apache/samza/storage/kv/RocksDbOptionsHelper.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.storage.kv;
+
+import org.apache.samza.config.Config;
+import org.apache.samza.container.SamzaContainerContext;
+import org.rocksdb.BlockBasedTableConfig;
+import org.rocksdb.CompactionStyle;
+import org.rocksdb.CompressionType;
+import org.rocksdb.Options;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A helper class to help construct the <code>Options</code> for RocksDb.
+ */
+public class RocksDbOptionsHelper {
+  private static final Logger log = LoggerFactory.getLogger(RocksDbOptionsHelper.class);
+
+  public static Options options(Config storeConfig, SamzaContainerContext containerContext) {
+    Options options = new Options();
+    Long writeBufSize = storeConfig.getLong("container.write.buffer.size.bytes", 32 * 1024 * 1024);
+    // Cache size and write buffer size are specified on a per-container basis.
+    int numTasks = containerContext.taskNames.size();
+    options.setWriteBufferSize((int) (writeBufSize / numTasks));
+
+    CompressionType compressionType = CompressionType.SNAPPY_COMPRESSION;
+    String compressionInConfig = storeConfig.get("rocksdb.compression", "snappy");
+    switch (compressionInConfig) {
+    case "snappy":
+      compressionType = CompressionType.SNAPPY_COMPRESSION;
+      break;
+    case "bzip2":
+      compressionType = CompressionType.BZLIB2_COMPRESSION;
+      break;
+    case "zlib":
+      compressionType = CompressionType.ZLIB_COMPRESSION;
+      break;
+    case "lz4":
+      compressionType = CompressionType.LZ4_COMPRESSION;
+      break;
+    case "lz4hc":
+      compressionType = CompressionType.LZ4HC_COMPRESSION;
+      break;
+    case "none":
+      compressionType = CompressionType.NO_COMPRESSION;
+    default:
+      log.warn("Unknown rocksdb.compression codec " + compressionInConfig + ", overwriting to Snappy");
+    }
+    options.setCompressionType(compressionType);
+
+    Long cacheSize = storeConfig.getLong("container.cache.size.bytes", 100 * 1024 * 1024L);
+    Long cacheSizePerContainer = cacheSize / numTasks;
+    int blockSize = storeConfig.getInt("rocksdb.block.size.bytes", 4096);
+    BlockBasedTableConfig tableOptions = new BlockBasedTableConfig();
+    tableOptions.setBlockCacheSize(cacheSizePerContainer).setBlockSize(blockSize);
+    options.setTableFormatConfig(tableOptions);
+
+    CompactionStyle compactionStyle = CompactionStyle.UNIVERSAL;
+    String compactionStyleInConfig = storeConfig.get("rocksdb.compaction.style", "universal");
+    switch (compactionStyleInConfig) {
+    case "universal":
+      compactionStyle = CompactionStyle.UNIVERSAL;
+      break;
+    case "fifo":
+      compactionStyle = CompactionStyle.FIFO;
+      break;
+    case "level":
+      compactionStyle = CompactionStyle.LEVEL;
+      break;
+    default:
+      log.warn("Unknown rocksdb.compactionStyle " + compactionStyleInConfig + ", overwriting to universal");
+    }
+    options.setCompactionStyle(compactionStyle);
+
+    options.setMaxWriteBufferNumber(storeConfig.getInt("rocksdb.num.write.buffers", 3));
+    options.setCreateIfMissing(true);
+    options.setErrorIfExists(false);
+
+    return options;
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/samza/blob/07c69840/samza-kv-rocksdb/src/main/java/org/apache/samza/storage/kv/RocksDbReadingTool.java
----------------------------------------------------------------------
diff --git a/samza-kv-rocksdb/src/main/java/org/apache/samza/storage/kv/RocksDbReadingTool.java b/samza-kv-rocksdb/src/main/java/org/apache/samza/storage/kv/RocksDbReadingTool.java
new file mode 100644
index 0000000..02f1616
--- /dev/null
+++ b/samza-kv-rocksdb/src/main/java/org/apache/samza/storage/kv/RocksDbReadingTool.java
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.storage.kv;
+
+import java.util.List;
+
+import joptsimple.ArgumentAcceptingOptionSpec;
+import joptsimple.OptionSet;
+
+import org.apache.samza.config.MapConfig;
+import org.apache.samza.util.CommandLine;
+import org.rocksdb.RocksDBException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Commandline tool to get the value of a given key from the RocksDb.
+ */
+public class RocksDbReadingTool extends CommandLine {
+  private ArgumentAcceptingOptionSpec<String> dbPathArgument = parser()
+      .accepts("db-path", "path of RocksDb location")
+      .withRequiredArg()
+      .ofType(String.class)
+      .describedAs("db-path");
+
+  private ArgumentAcceptingOptionSpec<String> dbNameArgument = parser()
+      .accepts("db-name", "name of the db")
+      .withRequiredArg()
+      .ofType(String.class)
+      .describedAs("db-name");
+
+  private ArgumentAcceptingOptionSpec<Long> longKeyArgu = parser()
+      .accepts("long-key", "a list of long keys. Sperated by ','.")
+      .withOptionalArg()
+      .ofType(Long.class)
+      .describedAs("long-key")
+      .withValuesSeparatedBy( ',' );
+
+  private ArgumentAcceptingOptionSpec<String> stringKeyArgu = parser()
+      .accepts("string-key", "a list of string keys. Sperated by ','.")
+      .withOptionalArg()
+      .ofType(String.class)
+      .describedAs("string-key")
+      .withValuesSeparatedBy( ',' );
+
+  private ArgumentAcceptingOptionSpec<Integer> integerKeyArgu = parser()
+      .accepts("integer-key", "a list of integer keys. Sperated by ','.")
+      .withOptionalArg()
+      .ofType(Integer.class)
+      .describedAs("integer-key")
+      .withValuesSeparatedBy( ',' );
+
+  private String dbPath = "";
+  private String dbName = "";
+  private Object key = null;
+  private Logger log = LoggerFactory.getLogger(RocksDbReadingTool.class);
+
+  @Override
+  public MapConfig loadConfig(OptionSet options) {
+    MapConfig config = super.loadConfig(options);
+    // get the db name
+    if (options.has(dbNameArgument)) {
+      dbName = options.valueOf(dbNameArgument);
+    } else {
+      log.error("Please specify DB Name using --db-name");
+      System.exit(-1);
+    }
+    // get the db location
+    if (options.has(dbPathArgument)) {
+      dbPath = options.valueOf(dbPathArgument);
+    } else {
+      log.error("Please specify DB path using --db-path");
+      System.exit(-1);
+    }
+    log.debug("Will read the RocksDb store " + dbName + " in " + dbPath);
+
+    // get the key value
+    int keyTypeOptions = 0;
+    if (options.has(integerKeyArgu)) {
+      key = options.valuesOf(integerKeyArgu);
+      keyTypeOptions++;
+    }
+    if (options.has(longKeyArgu)) {
+      key = options.valuesOf(longKeyArgu);
+      keyTypeOptions++;
+    }
+    if (options.has(stringKeyArgu)) {
+      key = options.valuesOf(stringKeyArgu);
+      keyTypeOptions++;
+    }
+
+    if (keyTypeOptions > 1) {
+      log.error("Found more than 1 type of key. Please specify only one type of key to use.");
+      System.exit(-1);
+    }
+
+    if (key == null) {
+      log.error("Can not find the key. Please specify the type of key to use");
+      System.exit(-1);
+    }
+    return config;
+  }
+
+  public String getDbPath() {
+    return dbPath;
+  }
+
+  public String getDbName() {
+    return dbName;
+  }
+
+  @SuppressWarnings("unchecked")
+  public List<Object> getKeys() {
+    return (List<Object>) key;
+  }
+
+  public void outputResult(Object key, Object value) {
+    System.out.println("key=" + key + "," + "value=" + value);
+  }
+
+  public static void main(String[] args) throws RocksDBException {
+    RocksDbReadingTool tool = new RocksDbReadingTool();
+    OptionSet options = tool.parser().parse(args);
+    MapConfig config = tool.loadConfig(options);
+    String path = tool.getDbPath();
+    String dbName = tool.getDbName();
+    RocksDbKeyValueReader kvReader = new RocksDbKeyValueReader(dbName, path, config);
+
+    for (Object obj : tool.getKeys()) {
+      Object result = kvReader.get(obj);
+      tool.outputResult(obj, result);
+    }
+
+    kvReader.stop();
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/07c69840/samza-kv-rocksdb/src/main/scala/org/apache/samza/storage/kv/RocksDbKeyValueStorageEngineFactory.scala
----------------------------------------------------------------------
diff --git a/samza-kv-rocksdb/src/main/scala/org/apache/samza/storage/kv/RocksDbKeyValueStorageEngineFactory.scala b/samza-kv-rocksdb/src/main/scala/org/apache/samza/storage/kv/RocksDbKeyValueStorageEngineFactory.scala
index 571a50e..b949793 100644
--- a/samza-kv-rocksdb/src/main/scala/org/apache/samza/storage/kv/RocksDbKeyValueStorageEngineFactory.scala
+++ b/samza-kv-rocksdb/src/main/scala/org/apache/samza/storage/kv/RocksDbKeyValueStorageEngineFactory.scala
@@ -46,7 +46,7 @@ class RocksDbKeyValueStorageEngineFactory [K, V] extends BaseKeyValueStorageEngi
     val storageConfig = containerContext.config.subset("stores." + storeName + ".", true)
     val isLoggedStore = containerContext.config.getChangelogStream(storeName).isDefined
     val rocksDbMetrics = new KeyValueStoreMetrics(storeName, registry)
-    val rocksDbOptions = RocksDbKeyValueStore.options(storageConfig, containerContext)
+    val rocksDbOptions = RocksDbOptionsHelper.options(storageConfig, containerContext)
     val rocksDbWriteOptions = new WriteOptions().setDisableWAL(true)
     val rocksDb = new RocksDbKeyValueStore(storeDir, rocksDbOptions, storageConfig, isLoggedStore, storeName, rocksDbWriteOptions, rocksDbMetrics)
     rocksDb

http://git-wip-us.apache.org/repos/asf/samza/blob/07c69840/samza-kv-rocksdb/src/main/scala/org/apache/samza/storage/kv/RocksDbKeyValueStore.scala
----------------------------------------------------------------------
diff --git a/samza-kv-rocksdb/src/main/scala/org/apache/samza/storage/kv/RocksDbKeyValueStore.scala b/samza-kv-rocksdb/src/main/scala/org/apache/samza/storage/kv/RocksDbKeyValueStore.scala
index a423f7b..4620037 100644
--- a/samza-kv-rocksdb/src/main/scala/org/apache/samza/storage/kv/RocksDbKeyValueStore.scala
+++ b/samza-kv-rocksdb/src/main/scala/org/apache/samza/storage/kv/RocksDbKeyValueStore.scala
@@ -28,49 +28,6 @@ import org.rocksdb._
 import org.rocksdb.TtlDB;
 
 object RocksDbKeyValueStore extends Logging {
-  def options(storeConfig: Config, containerContext: SamzaContainerContext) = {
-    val cacheSize = storeConfig.getLong("container.cache.size.bytes", 100 * 1024 * 1024L)
-    val writeBufSize = storeConfig.getLong("container.write.buffer.size.bytes", 32 * 1024 * 1024)
-    val options = new Options()
-
-    // Cache size and write buffer size are specified on a per-container basis.
-    val numTasks = containerContext.taskNames.size
-    options.setWriteBufferSize((writeBufSize / numTasks).toInt)
-    var cacheSizePerContainer = cacheSize / numTasks
-    options.setCompressionType(
-      storeConfig.get("rocksdb.compression", "snappy") match {
-        case "snappy" => CompressionType.SNAPPY_COMPRESSION
-        case "bzip2" => CompressionType.BZLIB2_COMPRESSION
-        case "zlib" => CompressionType.ZLIB_COMPRESSION
-        case "lz4" => CompressionType.LZ4_COMPRESSION
-        case "lz4hc" => CompressionType.LZ4HC_COMPRESSION
-        case "none" => CompressionType.NO_COMPRESSION
-        case _ =>
-          warn("Unknown rocksdb.compression codec %s, defaulting to Snappy" format storeConfig.get("rocksdb.compression", "snappy"))
-          CompressionType.SNAPPY_COMPRESSION
-      })
-
-    val blockSize = storeConfig.getInt("rocksdb.block.size.bytes", 4096)
-    val table_options = new BlockBasedTableConfig()
-    table_options.setBlockCacheSize(cacheSizePerContainer)
-      .setBlockSize(blockSize)
-
-    options.setTableFormatConfig(table_options)
-    options.setCompactionStyle(
-      storeConfig.get("rocksdb.compaction.style", "universal") match {
-        case "universal" => CompactionStyle.UNIVERSAL
-        case "fifo" => CompactionStyle.FIFO
-        case "level" => CompactionStyle.LEVEL
-        case _ =>
-          warn("Unknown rocksdb.compactionStyle %s, defaulting to universal" format storeConfig.get("rocksdb.compaction.style", "universal"))
-          CompactionStyle.UNIVERSAL
-      })
-
-    options.setMaxWriteBufferNumber(storeConfig.get("rocksdb.num.write.buffers", "3").toInt)
-    options.setCreateIfMissing(true)
-    options.setErrorIfExists(false)
-    options
-  }
 
   def openDB(dir: File, options: Options, storeConfig: Config, isLoggedStore: Boolean, storeName: String): RocksDB = {
     var ttl = 0L

http://git-wip-us.apache.org/repos/asf/samza/blob/07c69840/samza-kv-rocksdb/src/test/java/org/apache/samza/storage/kv/TestRocksDbKeyValueReader.java
----------------------------------------------------------------------
diff --git a/samza-kv-rocksdb/src/test/java/org/apache/samza/storage/kv/TestRocksDbKeyValueReader.java b/samza-kv-rocksdb/src/test/java/org/apache/samza/storage/kv/TestRocksDbKeyValueReader.java
new file mode 100644
index 0000000..f97b8f7
--- /dev/null
+++ b/samza-kv-rocksdb/src/test/java/org/apache/samza/storage/kv/TestRocksDbKeyValueReader.java
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.storage.kv;
+
+import static org.junit.Assert.*;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.file.FileVisitResult;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.nio.file.SimpleFileVisitor;
+import java.nio.file.attribute.BasicFileAttributes;
+import java.util.HashMap;
+
+import org.apache.samza.SamzaException;
+import org.apache.samza.config.Config;
+import org.apache.samza.config.MapConfig;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.rocksdb.Options;
+import org.rocksdb.RocksDB;
+import org.rocksdb.RocksDBException;
+import org.apache.samza.serializers.IntegerSerdeFactory;
+
+public class TestRocksDbKeyValueReader {
+  private static final String DB_NAME = "testKvStore";
+  private static Path dirPath = Paths.get(DB_NAME);
+  private static RocksDB db = null;
+
+  @BeforeClass
+  static public void createRocksDb() throws IOException, RocksDBException {
+    if (Files.exists(dirPath)) {
+      removeRecursiveDirectory(dirPath);
+    }
+    Files.createDirectories(dirPath);
+    Options options = new Options().setCreateIfMissing(true);
+    db = RocksDB.open(options, dirPath.toString());
+    db.put("testString".getBytes(), "this is string".getBytes());
+    db.put(ByteBuffer.allocate(4).putInt(123).array(), ByteBuffer.allocate(4).putInt(456).array());
+  }
+
+  @AfterClass
+  static public void tearDownRocksDb() {
+    if (db != null) {
+      db.close();
+    }
+    if (Files.exists(dirPath)) {
+      removeRecursiveDirectory(dirPath);
+    }
+  }
+
+  @Test
+  public void testReadCorrectDbValue() throws RocksDBException {
+    HashMap<String, String> map = new HashMap<String, String>();
+    map.put("stores." + DB_NAME + ".factory", "mockFactory");
+    map.put("stores." + DB_NAME + ".key.serde", "string");
+    map.put("stores." + DB_NAME + ".msg.serde", "string");
+    Config config = new MapConfig(map);
+
+    RocksDbKeyValueReader reader = new RocksDbKeyValueReader(DB_NAME, dirPath.toString(), config);
+    assertEquals("this is string", reader.get("testString"));
+
+    // should throw exception if the input is in other type
+    boolean throwClassCastException = false;
+    try {
+      reader.get(123);
+    } catch (Exception e) {
+      if (e instanceof ClassCastException) {
+        throwClassCastException = true;
+      }
+    }
+    assertTrue(throwClassCastException);
+    reader.stop();
+
+    // test with customized serde
+    map.put("serializers.registry.mock.class", IntegerSerdeFactory.class.getCanonicalName());
+    map.put("stores." + DB_NAME + ".key.serde", "mock");
+    map.put("stores." + DB_NAME + ".msg.serde", "mock");
+    config = new MapConfig(map);
+    reader = new RocksDbKeyValueReader(DB_NAME, dirPath.toString(), config);
+    assertEquals(456, reader.get(123));
+
+    assertNull(reader.get(789));
+    reader.stop();
+  }
+
+  private static void removeRecursiveDirectory(Path path) {
+    try {
+      Files.walkFileTree(path, new SimpleFileVisitor<Path>() {
+        @Override
+        public FileVisitResult visitFile(Path file, BasicFileAttributes attrs) throws IOException {
+          Files.delete(file);
+          return FileVisitResult.CONTINUE;
+        }
+
+        @Override
+        public FileVisitResult postVisitDirectory(Path dir, IOException exc) throws IOException {
+          if (exc == null) {
+            Files.delete(dir);
+            return FileVisitResult.CONTINUE;
+          } else {
+            throw exc;
+          }
+        }
+      });
+    } catch (IOException e) {
+      throw new SamzaException("can not delete " + path.toString(), e);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/07c69840/samza-shell/src/main/bash/read-rocksdb-tool.sh
----------------------------------------------------------------------
diff --git a/samza-shell/src/main/bash/read-rocksdb-tool.sh b/samza-shell/src/main/bash/read-rocksdb-tool.sh
new file mode 100644
index 0000000..c4565c5
--- /dev/null
+++ b/samza-shell/src/main/bash/read-rocksdb-tool.sh
@@ -0,0 +1,21 @@
+#!/bin/bash
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+[[ $JAVA_OPTS != *-Dlog4j.configuration* ]] && export JAVA_OPTS="$JAVA_OPTS -Dlog4j.configuration=file:$(dirname $0)/log4j-console.xml"
+
+exec $(dirname $0)/run-class.sh org.apache.samza.storage.kv.RocksDbReadingTool "$@"