You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by ya...@apache.org on 2015/07/16 07:53:28 UTC

samza git commit: SAMZA-625: added a tool to consume changelog and materialize a state store

Repository: samza
Updated Branches:
  refs/heads/master 4f7bbb054 -> 81542ecf4


SAMZA-625: added a tool to consume changelog and materialize a state store


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/81542ecf
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/81542ecf
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/81542ecf

Branch: refs/heads/master
Commit: 81542ecf484eccab2bfbb8f6ce1443c2f57eb6e4
Parents: 4f7bbb0
Author: Yan Fang <ya...@gmail.com>
Authored: Wed Jul 15 22:47:53 2015 -0700
Committer: Yan Fang <ya...@gmail.com>
Committed: Wed Jul 15 22:47:53 2015 -0700

----------------------------------------------------------------------
 checkstyle/import-control.xml                   |   7 +
 .../versioned/container/state-management.md     |  10 +
 .../apache/samza/config/JavaStorageConfig.java  |  63 +++++
 .../apache/samza/config/JavaSystemConfig.java   |  63 +++++
 .../apache/samza/storage/StateStorageTool.java  |  61 +++++
 .../apache/samza/storage/StorageRecovery.java   | 256 +++++++++++++++++++
 .../samza/storage/TaskStorageManager.scala      |   9 +-
 .../main/scala/org/apache/samza/util/Util.scala |  12 +-
 .../samza/config/TestJavaStorageConfig.java     |  46 ++++
 .../samza/config/TestJavaSystemConfig.java      |  41 +++
 .../apache/samza/storage/MockStorageEngine.java |  60 +++++
 .../samza/storage/MockStorageEngineFactory.java |  37 +++
 .../samza/storage/MockSystemConsumer.java       |  59 +++++
 .../apache/samza/storage/MockSystemFactory.java |  45 ++++
 .../samza/storage/TestStorageRecovery.java      | 111 ++++++++
 .../apache/samza/config/Log4jSystemConfig.java  |  47 +---
 samza-shell/src/main/bash/state-storage-tool.sh |  21 ++
 17 files changed, 905 insertions(+), 43 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/81542ecf/checkstyle/import-control.xml
----------------------------------------------------------------------
diff --git a/checkstyle/import-control.xml b/checkstyle/import-control.xml
index de835c7..eef3370 100644
--- a/checkstyle/import-control.xml
+++ b/checkstyle/import-control.xml
@@ -142,6 +142,13 @@
         <allow pkg="org.apache.samza.serializers" />
         <allow pkg="org.apache.samza.system" />
         <allow pkg="org.apache.samza.task" />
+        <allow pkg="org.apache.samza.util" />
+        <allow pkg="org.apache.samza.job" />
+        <allow pkg="org.apache.samza.config" />
+        <allow pkg="joptsimple" />
+
+        <allow class="org.apache.samza.SamzaException" />
+        <allow class="org.apache.samza.Partition" />
     </subpackage>
 
     <subpackage name="logging">

http://git-wip-us.apache.org/repos/asf/samza/blob/81542ecf/docs/learn/documentation/versioned/container/state-management.md
----------------------------------------------------------------------
diff --git a/docs/learn/documentation/versioned/container/state-management.md b/docs/learn/documentation/versioned/container/state-management.md
index 79067bb..50d4b65 100644
--- a/docs/learn/documentation/versioned/container/state-management.md
+++ b/docs/learn/documentation/versioned/container/state-management.md
@@ -184,6 +184,16 @@ public interface KeyValueStore<K, V> {
 
 Additional configuration properties for the key-value store are documented in the [configuration reference](../jobs/configuration-table.html#keyvalue-rocksdb).
 
+### Debug Key-value storage
+
+Currently Samza provides a state storage tool which can recover the state store from the changelog stream to user-specified directory for reusing and debugging.
+
+{% highlight bash %}
+samza-example/target/bin/state-storage-tool.sh \
+  --config-path=file:///path/to/job/config.properties \
+  --path=directory/to/put/state/stores
+{% endhighlight %}
+
 #### Known Issues
 
 RocksDB has several rough edges. It's recommended that you read the RocksDB [tuning guide](https://github.com/facebook/rocksdb/wiki/RocksDB-Tuning-Guide). Some other notes to be aware of are:

http://git-wip-us.apache.org/repos/asf/samza/blob/81542ecf/samza-core/src/main/java/org/apache/samza/config/JavaStorageConfig.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/config/JavaStorageConfig.java b/samza-core/src/main/java/org/apache/samza/config/JavaStorageConfig.java
new file mode 100644
index 0000000..af7d4ca
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/config/JavaStorageConfig.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.config;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * a java version of the storage config
+ */
+public class JavaStorageConfig extends MapConfig {
+
+  private static final String FACTORY_SUFFIX = ".factory";
+  private static final String STORE_PREFIX = "stores.";
+
+  public JavaStorageConfig(Config config) {
+    super(config);
+  }
+
+  public List<String> getStoreNames() {
+    Config subConfig = subset(STORE_PREFIX, true);
+    List<String> storeNames = new ArrayList<String>();
+    for (String key : subConfig.keySet()) {
+      if (key.endsWith(FACTORY_SUFFIX)) {
+        storeNames.add(key.substring(0, key.length() - FACTORY_SUFFIX.length()));
+      }
+    }
+    return storeNames;
+  }
+
+  public String getChangelogStream(String storeName) {
+    return get(String.format(StorageConfig.CHANGELOG_STREAM(), storeName), null);
+  }
+
+  public String getStorageFactoryClassName(String storeName) {
+    return get(String.format(StorageConfig.FACTORY(), storeName), null);
+  }
+
+  public String getStorageKeySerde(String storeName) {
+    return get(String.format(StorageConfig.KEY_SERDE(), storeName), null);
+  }
+
+  public String getStorageMsgSerde(String storeName) {
+    return get(String.format(StorageConfig.MSG_SERDE(), storeName), null);
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/samza/blob/81542ecf/samza-core/src/main/java/org/apache/samza/config/JavaSystemConfig.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/config/JavaSystemConfig.java b/samza-core/src/main/java/org/apache/samza/config/JavaSystemConfig.java
new file mode 100644
index 0000000..cf8d640
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/config/JavaSystemConfig.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.config;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * a java version of the system config
+ */
+public class JavaSystemConfig extends MapConfig {
+  private static final String SYSTEM_PREFIX = "systems.";
+  private static final String SYSTEM_FACTORY_SUFFIX = ".samza.factory";
+  private static final String SYSTEM_FACTORY = "systems.%s.samza.factory";
+  private static final String EMPTY = "";
+
+  public JavaSystemConfig(Config config) {
+    super(config);
+  }
+
+  public String getSystemFactory(String name) {
+    if (name == null) {
+      return null;
+    }
+    String systemFactory = String.format(SYSTEM_FACTORY, name);
+    return get(systemFactory, null);
+  }
+
+  /**
+   * Get a list of system names.
+   *
+   * @return A list system names
+   */
+  public List<String> getSystemNames() {
+    Config subConf = subset(SYSTEM_PREFIX, true);
+    ArrayList<String> systemNames = new ArrayList<String>();
+    for (Map.Entry<String, String> entry : subConf.entrySet()) {
+      String key = entry.getKey();
+      if (key.endsWith(SYSTEM_FACTORY_SUFFIX)) {
+        systemNames.add(key.replace(SYSTEM_FACTORY_SUFFIX, EMPTY));
+      }
+    }
+    return systemNames;
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/81542ecf/samza-core/src/main/java/org/apache/samza/storage/StateStorageTool.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/storage/StateStorageTool.java b/samza-core/src/main/java/org/apache/samza/storage/StateStorageTool.java
new file mode 100644
index 0000000..beba35c
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/storage/StateStorageTool.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.storage;
+
+import joptsimple.ArgumentAcceptingOptionSpec;
+import joptsimple.OptionSet;
+
+import org.apache.samza.config.MapConfig;
+import org.apache.samza.util.CommandLine;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Commandline tool to recover the state storage to a specified directory
+ */
+public class StateStorageTool extends CommandLine {
+  private ArgumentAcceptingOptionSpec<String> newPathArgu = parser().accepts("path", "path of the new state storage").withRequiredArg().ofType(String.class).describedAs("path");
+  private String newPath = "";
+  private Logger log = LoggerFactory.getLogger(StateStorageTool.class);
+
+  @Override
+  public MapConfig loadConfig(OptionSet options) {
+    MapConfig config = super.loadConfig(options);
+    if (options.has(newPathArgu)) {
+      newPath = options.valueOf(newPathArgu);
+      log.info("new state storage is " + newPath);
+    }
+    return config;
+  }
+
+  public String getPath() {
+    return newPath;
+  }
+
+  public static void main(String[] args) {
+    StateStorageTool tool = new StateStorageTool();
+    OptionSet options = tool.parser().parse(args);
+    MapConfig config = tool.loadConfig(options);
+    String path = tool.getPath();
+
+    StorageRecovery storageRecovery = new StorageRecovery(config, path);
+    storageRecovery.run();
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/81542ecf/samza-core/src/main/java/org/apache/samza/storage/StorageRecovery.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/storage/StorageRecovery.java b/samza-core/src/main/java/org/apache/samza/storage/StorageRecovery.java
new file mode 100644
index 0000000..c564964
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/storage/StorageRecovery.java
@@ -0,0 +1,256 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.storage;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+
+import org.apache.samza.SamzaException;
+import org.apache.samza.config.Config;
+import org.apache.samza.config.JavaStorageConfig;
+import org.apache.samza.config.JavaSystemConfig;
+import org.apache.samza.container.SamzaContainerContext;
+import org.apache.samza.coordinator.JobCoordinator;
+import org.apache.samza.job.model.ContainerModel;
+import org.apache.samza.job.model.JobModel;
+import org.apache.samza.job.model.TaskModel;
+import org.apache.samza.metrics.MetricsRegistryMap;
+import org.apache.samza.serializers.ByteSerde;
+import org.apache.samza.serializers.Serde;
+import org.apache.samza.system.StreamMetadataCache;
+import org.apache.samza.system.SystemAdmin;
+import org.apache.samza.system.SystemConsumer;
+import org.apache.samza.system.SystemFactory;
+import org.apache.samza.system.SystemStream;
+import org.apache.samza.system.SystemStreamPartition;
+import org.apache.samza.util.CommandLine;
+import org.apache.samza.util.SystemClock;
+import org.apache.samza.util.Util;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Recovers the state storages from the changelog streams and store the storages
+ * in the directory provided by the users. The changelog streams are derived
+ * from the job's config file.
+ */
+public class StorageRecovery extends CommandLine {
+
+  private Config jobConfig;
+  private int maxPartitionNumber = 0;
+  private File storeBaseDir = null;
+  private HashMap<String, SystemStream> changeLogSystemStreams = new HashMap<String, SystemStream>();
+  private HashMap<String, StorageEngineFactory<?, ?>> storageEngineFactories = new HashMap<String, StorageEngineFactory<?, ?>>();
+  private HashMap<String, SystemFactory> systemFactories = new HashMap<String, SystemFactory>();
+  private HashMap<String, SystemAdmin> systemAdmins = new HashMap<String, SystemAdmin>();
+  private Map<Integer, ContainerModel> containers = new HashMap<Integer, ContainerModel>();
+  private List<TaskStorageManager> taskStorageManagers = new ArrayList<TaskStorageManager>();
+  private Logger log = LoggerFactory.getLogger(StorageRecovery.class);
+
+  /**
+   * Construct the StorageRecovery
+   *
+   * @param config
+   *          the job config
+   * @param path
+   *          the directory path where we put the stores
+   */
+  StorageRecovery(Config config, String path) {
+    jobConfig = config;
+    storeBaseDir = new File(path, "state");
+  }
+
+  /**
+   * setup phase which assigns required values to the variables used for all
+   * tasks.
+   */
+  private void setup() {
+    log.info("setting up the recovery...");
+
+    getContainerModels();
+    getSystemFactoriesAndAdmins();
+    getChangeLogSystemStreamsAndStorageFactories();
+    getChangeLogMaxPartitionNumber();
+    getTaskStorageManagers();
+  }
+
+  /**
+   * run the setup phase and restore all the task storages
+   */
+  public void run() {
+    setup();
+
+    log.info("start recovering...");
+
+    for (TaskStorageManager taskStorageManager : taskStorageManagers) {
+      taskStorageManager.init();
+      taskStorageManager.stopStores();
+      log.debug("restored " + taskStorageManager.toString());
+    }
+
+    log.info("successfully recovered in " + storeBaseDir.toString());
+  }
+
+  /**
+   * build the ContainerModels from job config file and put the results in the
+   * map
+   */
+  private void getContainerModels() {
+    JobModel jobModel = JobCoordinator.apply(jobConfig).jobModel();
+    containers = jobModel.getContainers();
+  }
+
+  /**
+   * get the SystemFactories and SystemAdmins specified in the config file and
+   * put them into the maps
+   */
+  private void getSystemFactoriesAndAdmins() {
+    JavaSystemConfig systemConfig = new JavaSystemConfig(jobConfig);
+    List<String> systems = systemConfig.getSystemNames();
+
+    for (String system : systems) {
+      String systemFactory = systemConfig.getSystemFactory(system);
+      if (systemFactory == null) {
+        throw new SamzaException("A stream uses system " + system + " which is missing from the configuration.");
+      }
+      systemFactories.put(system, Util.<SystemFactory>getObj(systemFactory));
+      systemAdmins.put(system, Util.<SystemFactory>getObj(systemFactory).getAdmin(system, jobConfig));
+    }
+
+    log.info("Got system factories: " + systemFactories.keySet().toString());
+    log.info("Got system admins: " + systemAdmins.keySet().toString());
+  }
+
+  /**
+   * get the changelog streams and the storage factories from the config file
+   * and put them into the maps
+   */
+  private void getChangeLogSystemStreamsAndStorageFactories() {
+    JavaStorageConfig config = new JavaStorageConfig(jobConfig);
+    List<String> storeNames = config.getStoreNames();
+
+    log.info("Got store names: " + storeNames.toString());
+
+    for (String storeName : storeNames) {
+      String streamName = config.getChangelogStream(storeName);
+
+      log.info("stream name for " + storeName + " is " + streamName);
+
+      if (streamName != null) {
+        changeLogSystemStreams.put(storeName, Util.getSystemStreamFromNames(streamName));
+      }
+
+      String factoryClass = config.getStorageFactoryClassName(storeName);
+      if (factoryClass != null) {
+        storageEngineFactories.put(storeName, Util.<StorageEngineFactory<Object, Object>>getObj(factoryClass));
+      } else {
+        throw new SamzaException("Missing storage factory for " + storeName + ".");
+      }
+    }
+  }
+
+  /**
+   * get the SystemConsumers for the stores
+   */
+  private HashMap<String, SystemConsumer> getStoreConsumers() {
+    HashMap<String, SystemConsumer> storeConsumers = new HashMap<String, SystemConsumer>();
+
+    for (Entry<String, SystemStream> entry : changeLogSystemStreams.entrySet()) {
+      String storeSystem = entry.getValue().getSystem();
+      if (!systemFactories.containsKey(storeSystem)) {
+        throw new SamzaException("Changelog system " + storeSystem + " for store " + entry.getKey() + " does not exist in the config.");
+      }
+      storeConsumers.put(entry.getKey(), systemFactories.get(storeSystem).getConsumer(storeSystem, jobConfig, new MetricsRegistryMap()));
+    }
+
+    return storeConsumers;
+  }
+
+  /**
+   * get the max partition number of the changelog stream
+   */
+  private void getChangeLogMaxPartitionNumber() {
+    int maxPartitionId = 0;
+    for (ContainerModel containerModel : containers.values()) {
+      for (TaskModel taskModel : containerModel.getTasks().values()) {
+        maxPartitionId = Math.max(maxPartitionId, taskModel.getChangelogPartition().getPartitionId());
+      }
+    }
+    maxPartitionNumber = maxPartitionId + 1;
+  }
+
+  /**
+   * create one TaskStorageManager for each task. Add all of them to the
+   * List<TaskStorageManager>
+   */
+  private void getTaskStorageManagers() {
+    StreamMetadataCache streamMetadataCache = new StreamMetadataCache(Util.javaMapAsScalaMap(systemAdmins), 5000, SystemClock.instance());
+
+    for (ContainerModel containerModel : containers.values()) {
+      HashMap<String, StorageEngine> taskStores = new HashMap<String, StorageEngine>();
+      SamzaContainerContext containerContext = new SamzaContainerContext(containerModel.getContainerId(), jobConfig, containerModel.getTasks()
+          .keySet());
+
+      for (TaskModel taskModel : containerModel.getTasks().values()) {
+        HashMap<String, SystemConsumer> storeConsumers = getStoreConsumers();
+
+        for (Entry<String, StorageEngineFactory<?, ?>> entry : storageEngineFactories.entrySet()) {
+          String storeName = entry.getKey();
+
+          if (changeLogSystemStreams.containsKey(storeName)) {
+            SystemStreamPartition changeLogSystemStreamPartition = new SystemStreamPartition(changeLogSystemStreams.get(storeName),
+                taskModel.getChangelogPartition());
+            File storePartitionDir = TaskStorageManager.getStorePartitionDir(storeBaseDir, storeName, taskModel.getTaskName());
+
+            log.info("Got storage engine directory: " + storePartitionDir);
+
+            StorageEngine storageEngine = (entry.getValue()).getStorageEngine(
+                storeName,
+                storePartitionDir,
+                (Serde) new ByteSerde(),
+                (Serde) new ByteSerde(),
+                null,
+                new MetricsRegistryMap(),
+                changeLogSystemStreamPartition,
+                containerContext);
+            taskStores.put(storeName, storageEngine);
+          }
+        }
+
+        TaskStorageManager taskStorageManager = new TaskStorageManager(
+            taskModel.getTaskName(),
+            Util.javaMapAsScalaMap(taskStores),
+            Util.javaMapAsScalaMap(storeConsumers),
+            Util.javaMapAsScalaMap(changeLogSystemStreams),
+            maxPartitionNumber,
+            streamMetadataCache,
+            storeBaseDir,
+            storeBaseDir, taskModel.getChangelogPartition(),
+            Util.javaMapAsScalaMap(systemAdmins));
+
+        taskStorageManagers.add(taskStorageManager);
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/81542ecf/samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala b/samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala
index aeba61a..c39cdc7 100644
--- a/samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala
+++ b/samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala
@@ -36,7 +36,7 @@ object TaskStorageManager {
 
   def getStorePartitionDir(storeBaseDir: File, storeName: String, taskName: TaskName) = {
     // TODO: Sanitize, check and clean taskName string as a valid value for a file
-    new File(storeBaseDir, storeName + File.separator + taskName)
+    new File(storeBaseDir, (storeName + File.separator + taskName.toString).replace(' ', '_'))
   }
 }
 
@@ -188,9 +188,13 @@ class TaskStorageManager(
     taskStores.values.foreach(_.flush)
   }
 
-  def stop() {
+  def stopStores() {
     debug("Stopping stores.")
     taskStores.values.foreach(_.stop)
+  }
+
+  def stop() {
+    stopStores()
 
     debug("Persisting logged key value stores")
     changeLogSystemStreams.foreach { case (store, systemStream) => {
@@ -208,7 +212,6 @@ class TaskStorageManager(
     }}
   }
 
-
   /**
    * Builds a map from SystemStreamPartition to oldest offset for changelogs.
    */

http://git-wip-us.apache.org/repos/asf/samza/blob/81542ecf/samza-core/src/main/scala/org/apache/samza/util/Util.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/util/Util.scala b/samza-core/src/main/scala/org/apache/samza/util/Util.scala
index 2feb65b..419452c 100644
--- a/samza-core/src/main/scala/org/apache/samza/util/Util.scala
+++ b/samza-core/src/main/scala/org/apache/samza/util/Util.scala
@@ -34,6 +34,10 @@ import org.apache.samza.config.ConfigException
 import org.apache.samza.config.MapConfig
 import scala.collection.JavaConversions._
 import org.apache.samza.config.JobConfig
+import org.apache.samza.job.model.JobModel
+import java.io.InputStreamReader
+import scala.collection.JavaConverters._
+import scala.collection.immutable.Map
 
 object Util extends Logging {
   val random = new Random
@@ -167,7 +171,6 @@ object Util extends Logging {
     body
   }
 
-
   /**
    * Generates a coordinator stream name based off of the job name and job id
    * for the jobd. The format is of the stream name will be
@@ -294,4 +297,11 @@ object Util extends Logging {
       fis.close()
     }
   }
+
+  /**
+   * Convert a java map to a Scala map
+   * */
+  def javaMapAsScalaMap[T, K](javaMap: java.util.Map[T, K]): Map[T, K] = {
+    javaMap.toMap
+  }
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/81542ecf/samza-core/src/test/java/org/apache/samza/config/TestJavaStorageConfig.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/config/TestJavaStorageConfig.java b/samza-core/src/test/java/org/apache/samza/config/TestJavaStorageConfig.java
new file mode 100644
index 0000000..6c93697
--- /dev/null
+++ b/samza-core/src/test/java/org/apache/samza/config/TestJavaStorageConfig.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.config;
+
+import static org.junit.Assert.*;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.junit.Test;
+
+public class TestJavaStorageConfig {
+
+  @Test
+  public void testStorageConfig() {
+    Map<String, String> map = new HashMap<String, String>();
+    map.put("stores.test.factory", "testFactory");
+    map.put("stores.test.changelog", "testChangelog");
+    map.put("stores.test.key.serde", "string");
+    map.put("stores.test.msg.serde", "integer");
+    JavaStorageConfig config = new JavaStorageConfig(new MapConfig(map));
+
+    assertEquals("testFactory", config.getStorageFactoryClassName("test"));
+    assertEquals("testChangelog", config.getChangelogStream("test"));
+    assertEquals("string", config.getStorageKeySerde("test"));
+    assertEquals("integer", config.getStorageMsgSerde("test"));
+    assertEquals("test", config.getStoreNames().get(0));
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/81542ecf/samza-core/src/test/java/org/apache/samza/config/TestJavaSystemConfig.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/config/TestJavaSystemConfig.java b/samza-core/src/test/java/org/apache/samza/config/TestJavaSystemConfig.java
new file mode 100644
index 0000000..9b39ec8
--- /dev/null
+++ b/samza-core/src/test/java/org/apache/samza/config/TestJavaSystemConfig.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.config;
+
+import static org.junit.Assert.*;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.junit.Test;
+
+public class TestJavaSystemConfig {
+
+  @Test
+  public void testGetSystemNames() {
+    Map<String, String> map = new HashMap<String, String>();
+    map.put("systems.system1.samza.factory", "1");
+    map.put("systems.system2.samza.factory", "2");
+    JavaSystemConfig systemConfig = new JavaSystemConfig(
+        new MapConfig(map));
+
+    assertEquals(2, systemConfig.getSystemNames().size());
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/81542ecf/samza-core/src/test/java/org/apache/samza/storage/MockStorageEngine.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/storage/MockStorageEngine.java b/samza-core/src/test/java/org/apache/samza/storage/MockStorageEngine.java
new file mode 100644
index 0000000..b90ea87
--- /dev/null
+++ b/samza-core/src/test/java/org/apache/samza/storage/MockStorageEngine.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.storage;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.Iterator;
+
+import org.apache.samza.system.IncomingMessageEnvelope;
+import org.apache.samza.system.SystemStreamPartition;
+
+/**
+ * A mock StorageEngine that stores what it receives from the StorageEngine.
+ * Those variables/values can be retrieved directly by variable names.
+ */
+public class MockStorageEngine implements StorageEngine {
+
+  public static String storeName;
+  public static File storeDir;
+  public static SystemStreamPartition ssp;
+  public static ArrayList<IncomingMessageEnvelope> incomingMessageEnvelopes = new ArrayList<IncomingMessageEnvelope>();
+
+  public MockStorageEngine(String storeName, File storeDir, SystemStreamPartition changeLogSystemStreamPartition) {
+    MockStorageEngine.storeName = storeName;
+    MockStorageEngine.storeDir = storeDir;
+    MockStorageEngine.ssp = changeLogSystemStreamPartition;
+  }
+
+  @Override
+  public void restore(Iterator<IncomingMessageEnvelope> envelopes) {
+    while (envelopes.hasNext()) {
+      incomingMessageEnvelopes.add(envelopes.next());
+    }
+  }
+
+  @Override
+  public void flush() {
+  }
+
+  @Override
+  public void stop() {
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/81542ecf/samza-core/src/test/java/org/apache/samza/storage/MockStorageEngineFactory.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/storage/MockStorageEngineFactory.java b/samza-core/src/test/java/org/apache/samza/storage/MockStorageEngineFactory.java
new file mode 100644
index 0000000..c00c454
--- /dev/null
+++ b/samza-core/src/test/java/org/apache/samza/storage/MockStorageEngineFactory.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.storage;
+
+import java.io.File;
+
+import org.apache.samza.container.SamzaContainerContext;
+import org.apache.samza.metrics.MetricsRegistry;
+import org.apache.samza.serializers.Serde;
+import org.apache.samza.system.SystemStreamPartition;
+import org.apache.samza.task.MessageCollector;
+
+public class MockStorageEngineFactory implements StorageEngineFactory<Object, Object> {
+  @Override
+  public StorageEngine getStorageEngine(String storeName, File storeDir, Serde<Object> keySerde, Serde<Object> msgSerde,
+      MessageCollector collector, MetricsRegistry registry, SystemStreamPartition changeLogSystemStreamPartition,
+      SamzaContainerContext containerContext) {
+    return new MockStorageEngine(storeName, storeDir, changeLogSystemStreamPartition);
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/81542ecf/samza-core/src/test/java/org/apache/samza/storage/MockSystemConsumer.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/storage/MockSystemConsumer.java b/samza-core/src/test/java/org/apache/samza/storage/MockSystemConsumer.java
new file mode 100644
index 0000000..07c4a24
--- /dev/null
+++ b/samza-core/src/test/java/org/apache/samza/storage/MockSystemConsumer.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.storage;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.samza.system.IncomingMessageEnvelope;
+import org.apache.samza.system.SystemConsumer;
+import org.apache.samza.system.SystemStreamPartition;
+
+public class MockSystemConsumer implements SystemConsumer {
+  public static Map<SystemStreamPartition, List<IncomingMessageEnvelope>> messages = new HashMap<SystemStreamPartition, List<IncomingMessageEnvelope>>();
+  private boolean flag = true; // flag to make sure the messages only are
+                               // returned once
+
+  @Override
+  public void start() {}
+
+  @Override
+  public void stop() {}
+
+  @Override
+  public void register(SystemStreamPartition systemStreamPartition, String offset) {}
+
+  @Override
+  public Map<SystemStreamPartition, List<IncomingMessageEnvelope>> poll(Set<SystemStreamPartition> systemStreamPartitions, long timeout) throws InterruptedException {
+    if (flag) {
+      ArrayList<IncomingMessageEnvelope> list = new ArrayList<IncomingMessageEnvelope>();
+      list.add(TestStorageRecovery.msg);
+      messages.put(TestStorageRecovery.ssp, list);
+      flag = false;
+      return messages; 
+    } else {
+      messages.clear();
+      return messages;
+    }
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/samza/blob/81542ecf/samza-core/src/test/java/org/apache/samza/storage/MockSystemFactory.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/storage/MockSystemFactory.java b/samza-core/src/test/java/org/apache/samza/storage/MockSystemFactory.java
new file mode 100644
index 0000000..7abf82b
--- /dev/null
+++ b/samza-core/src/test/java/org/apache/samza/storage/MockSystemFactory.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.storage;
+
+import org.apache.samza.config.Config;
+import org.apache.samza.metrics.MetricsRegistry;
+import org.apache.samza.system.SystemAdmin;
+import org.apache.samza.system.SystemConsumer;
+import org.apache.samza.system.SystemFactory;
+import org.apache.samza.system.SystemProducer;
+
+public class MockSystemFactory implements SystemFactory {
+
+  @Override
+  public SystemConsumer getConsumer(String systemName, Config config, MetricsRegistry registry) {
+    return new MockSystemConsumer();
+  }
+
+  @Override
+  public SystemProducer getProducer(String systemName, Config config, MetricsRegistry registry) {
+    return null;
+  }
+
+  @Override
+  public SystemAdmin getAdmin(String systemName, Config config) {
+    return TestStorageRecovery.systemAdmin;
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/81542ecf/samza-core/src/test/java/org/apache/samza/storage/TestStorageRecovery.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/storage/TestStorageRecovery.java b/samza-core/src/test/java/org/apache/samza/storage/TestStorageRecovery.java
new file mode 100644
index 0000000..b8ae592
--- /dev/null
+++ b/samza-core/src/test/java/org/apache/samza/storage/TestStorageRecovery.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.storage;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.samza.Partition;
+import org.apache.samza.config.Config;
+import org.apache.samza.config.MapConfig;
+import org.apache.samza.coordinator.stream.MockCoordinatorStreamSystemFactory;
+import org.apache.samza.system.IncomingMessageEnvelope;
+import org.apache.samza.system.SystemAdmin;
+import org.apache.samza.system.SystemConsumer;
+import org.apache.samza.system.SystemStreamMetadata;
+import org.apache.samza.system.SystemStreamMetadata.SystemStreamPartitionMetadata;
+import org.apache.samza.system.SystemStreamPartition;
+import org.junit.Before;
+import org.junit.Test;
+
+import static org.junit.Assert.*;
+import static org.mockito.Mockito.*;
+
+public class TestStorageRecovery {
+
+  public static SystemConsumer systemConsumer1 = null;
+  public static SystemConsumer systemConsumer2 = null;
+  public static SystemAdmin systemAdmin = null;
+  public static Config config = null;
+  public static SystemStreamMetadata systemStreamMetadata = null;
+  public static SystemStreamMetadata inputSystemStreamMetadata = null;
+  public final static String SYSTEM_STREAM_NAME = "changelog";
+  public final static String INPUT_STREAM = "input";
+  public static SystemStreamPartition ssp = new SystemStreamPartition("mockSystem", SYSTEM_STREAM_NAME, new Partition(0));
+  public static IncomingMessageEnvelope msg = new IncomingMessageEnvelope(TestStorageRecovery.ssp, "0", "test", "test");
+
+  @Before
+  public void setup() throws InterruptedException {
+    putConfig();
+    putMetadata();
+
+    systemAdmin = mock(SystemAdmin.class);
+
+    Set<String> set1 = new HashSet<String>(Arrays.asList(SYSTEM_STREAM_NAME));
+    Set<String> set2 = new HashSet<String>(Arrays.asList(INPUT_STREAM));
+    HashMap<String, SystemStreamMetadata> ssmMap = new HashMap<String, SystemStreamMetadata>();
+    ssmMap.put(SYSTEM_STREAM_NAME, systemStreamMetadata);
+    ssmMap.put(INPUT_STREAM, inputSystemStreamMetadata);
+    when(systemAdmin.getSystemStreamMetadata(set1)).thenReturn(ssmMap);
+    when(systemAdmin.getSystemStreamMetadata(set2)).thenReturn(ssmMap);
+  }
+
+  @Test
+  public void testStorageEngineReceivedAllValues() {
+    String path = "/tmp/testing";
+    StorageRecovery storageRecovery = new StorageRecovery(config, path);
+    storageRecovery.run();
+
+    // because the stream has two partitions
+    assertEquals(2, MockStorageEngine.incomingMessageEnvelopes.size());
+    assertEquals(TestStorageRecovery.msg, MockStorageEngine.incomingMessageEnvelopes.get(0));
+    assertEquals(TestStorageRecovery.msg, MockStorageEngine.incomingMessageEnvelopes.get(1));
+    // correct path is passed to the store engine
+    assertEquals(path + "/state/testStore/Partition_1", MockStorageEngine.storeDir.toString());
+  }
+
+  private void putConfig() {
+    Map<String, String> map = new HashMap<String, String>();
+    map.put("job.name", "changelogTest");
+    map.put("systems.mockSystem.samza.factory", MockSystemFactory.class.getCanonicalName());
+    map.put("stores.testStore.factory", MockStorageEngineFactory.class.getCanonicalName());
+    map.put("stores.testStore.changelog", "mockSystem." + SYSTEM_STREAM_NAME);
+    map.put("task.inputs", "mockSystem.input");
+    map.put("job.coordinator.system", "coordinator");
+    map.put("systems.coordinator.samza.factory", MockCoordinatorStreamSystemFactory.class.getCanonicalName());
+    config = new MapConfig(map);
+  }
+
+  private void putMetadata() {
+    SystemStreamMetadata.SystemStreamPartitionMetadata sspm = new SystemStreamMetadata.SystemStreamPartitionMetadata("0", "1", "2");
+    HashMap<Partition, SystemStreamPartitionMetadata> map = new HashMap<Partition, SystemStreamPartitionMetadata>();
+    map.put(new Partition(0), sspm);
+    map.put(new Partition(1), sspm);
+    systemStreamMetadata = new SystemStreamMetadata(SYSTEM_STREAM_NAME, map);
+
+    HashMap<Partition, SystemStreamPartitionMetadata> map1 = new HashMap<Partition, SystemStreamPartitionMetadata>();
+    map1.put(new Partition(0), sspm);
+    map1.put(new Partition(1), sspm);
+    inputSystemStreamMetadata = new SystemStreamMetadata(INPUT_STREAM, map1);
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/81542ecf/samza-log4j/src/main/java/org/apache/samza/config/Log4jSystemConfig.java
----------------------------------------------------------------------
diff --git a/samza-log4j/src/main/java/org/apache/samza/config/Log4jSystemConfig.java b/samza-log4j/src/main/java/org/apache/samza/config/Log4jSystemConfig.java
index d5e24f2..209296d 100644
--- a/samza-log4j/src/main/java/org/apache/samza/config/Log4jSystemConfig.java
+++ b/samza-log4j/src/main/java/org/apache/samza/config/Log4jSystemConfig.java
@@ -19,25 +19,19 @@
 
 package org.apache.samza.config;
 
-import java.util.ArrayList;
 import java.util.List;
-import java.util.Map;
 
 /**
  * This class contains the methods for getting properties that are needed by the
  * StreamAppender.
  */
-public class Log4jSystemConfig {
+public class Log4jSystemConfig extends JavaSystemConfig {
 
   private static final String LOCATION_ENABLED = "task.log4j.location.info.enabled";
   private static final String TASK_LOG4J_SYSTEM = "task.log4j.system";
-  private static final String SYSTEM_PREFIX = "systems.";
-  private static final String SYSTEM_FACTORY_SUFFIX = ".samza.factory";
-  private static final String EMPTY = "";
-  private Config config = null;
 
   public Log4jSystemConfig(Config config) {
-    this.config = config;
+    super(config);
   }
 
   /**
@@ -49,7 +43,7 @@ public class Log4jSystemConfig {
    *         information in Log4J appender messages.
    */
   public boolean getLocationEnabled() {
-    return "true".equals(config.get(Log4jSystemConfig.LOCATION_ENABLED, "false"));
+    return "true".equals(get(Log4jSystemConfig.LOCATION_ENABLED, "false"));
   }
 
   /**
@@ -59,7 +53,7 @@ public class Log4jSystemConfig {
    * @return log4j system name
    */
   public String getSystemName() {
-    String log4jSystem = config.get(TASK_LOG4J_SYSTEM, null);
+    String log4jSystem = get(TASK_LOG4J_SYSTEM, null);
     if (log4jSystem == null) {
       List<String> systemNames = getSystemNames();
       if (systemNames.size() == 1) {
@@ -72,19 +66,11 @@ public class Log4jSystemConfig {
   }
 
   public String getJobName() {
-    return config.get(JobConfig.JOB_NAME(), null);
+    return get(JobConfig.JOB_NAME(), null);
   }
 
   public String getJobId() {
-    return config.get(JobConfig.JOB_ID(), null);
-  }
-
-  public String getSystemFactory(String name) {
-    if (name == null) {
-      return null;
-    }
-    String systemFactory = String.format(SystemConfig.SYSTEM_FACTORY(), name);
-    return config.get(systemFactory, null);
+    return get(JobConfig.JOB_ID(), null);
   }
 
   /**
@@ -96,28 +82,11 @@ public class Log4jSystemConfig {
    *         supplied serde name.
    */
   public String getSerdeClass(String name) {
-    return config.get(String.format(SerializerConfig.SERDE(), name), null);
+    return get(String.format(SerializerConfig.SERDE(), name), null);
   }
 
   public String getStreamSerdeName(String systemName, String streamName) {
     String streamSerdeNameConfig = String.format(StreamConfig.MSG_SERDE(), systemName, streamName);
-    return config.get(streamSerdeNameConfig, null);
-  }
-
-  /**
-   * Get a list of system names.
-   * 
-   * @return A list system names
-   */
-  protected List<String> getSystemNames() {
-    Config subConf = config.subset(SYSTEM_PREFIX, true);
-    ArrayList<String> systemNames = new ArrayList<String>();
-    for (Map.Entry<String, String> entry : subConf.entrySet()) {
-      String key = entry.getKey();
-      if (key.endsWith(SYSTEM_FACTORY_SUFFIX)) {
-        systemNames.add(key.replace(SYSTEM_FACTORY_SUFFIX, EMPTY));
-      }
-    }
-    return systemNames;
+    return get(streamSerdeNameConfig, null);
   }
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/81542ecf/samza-shell/src/main/bash/state-storage-tool.sh
----------------------------------------------------------------------
diff --git a/samza-shell/src/main/bash/state-storage-tool.sh b/samza-shell/src/main/bash/state-storage-tool.sh
new file mode 100755
index 0000000..05a4f25
--- /dev/null
+++ b/samza-shell/src/main/bash/state-storage-tool.sh
@@ -0,0 +1,21 @@
+#!/bin/bash
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+[[ $JAVA_OPTS != *-Dlog4j.configuration* ]] && export JAVA_OPTS="$JAVA_OPTS -Dlog4j.configuration=file:$(dirname $0)/log4j-console.xml"
+
+exec $(dirname $0)/run-class.sh org.apache.samza.storage.StateStorageTool "$@"