You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by ya...@apache.org on 2015/07/16 07:53:28 UTC
samza git commit: SAMZA-625: added a tool to consume changelog and
materialize a state store
Repository: samza
Updated Branches:
refs/heads/master 4f7bbb054 -> 81542ecf4
SAMZA-625: added a tool to consume changelog and materialize a state store
Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/81542ecf
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/81542ecf
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/81542ecf
Branch: refs/heads/master
Commit: 81542ecf484eccab2bfbb8f6ce1443c2f57eb6e4
Parents: 4f7bbb0
Author: Yan Fang <ya...@gmail.com>
Authored: Wed Jul 15 22:47:53 2015 -0700
Committer: Yan Fang <ya...@gmail.com>
Committed: Wed Jul 15 22:47:53 2015 -0700
----------------------------------------------------------------------
checkstyle/import-control.xml | 7 +
.../versioned/container/state-management.md | 10 +
.../apache/samza/config/JavaStorageConfig.java | 63 +++++
.../apache/samza/config/JavaSystemConfig.java | 63 +++++
.../apache/samza/storage/StateStorageTool.java | 61 +++++
.../apache/samza/storage/StorageRecovery.java | 256 +++++++++++++++++++
.../samza/storage/TaskStorageManager.scala | 9 +-
.../main/scala/org/apache/samza/util/Util.scala | 12 +-
.../samza/config/TestJavaStorageConfig.java | 46 ++++
.../samza/config/TestJavaSystemConfig.java | 41 +++
.../apache/samza/storage/MockStorageEngine.java | 60 +++++
.../samza/storage/MockStorageEngineFactory.java | 37 +++
.../samza/storage/MockSystemConsumer.java | 59 +++++
.../apache/samza/storage/MockSystemFactory.java | 45 ++++
.../samza/storage/TestStorageRecovery.java | 111 ++++++++
.../apache/samza/config/Log4jSystemConfig.java | 47 +---
samza-shell/src/main/bash/state-storage-tool.sh | 21 ++
17 files changed, 905 insertions(+), 43 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/samza/blob/81542ecf/checkstyle/import-control.xml
----------------------------------------------------------------------
diff --git a/checkstyle/import-control.xml b/checkstyle/import-control.xml
index de835c7..eef3370 100644
--- a/checkstyle/import-control.xml
+++ b/checkstyle/import-control.xml
@@ -142,6 +142,13 @@
<allow pkg="org.apache.samza.serializers" />
<allow pkg="org.apache.samza.system" />
<allow pkg="org.apache.samza.task" />
+ <allow pkg="org.apache.samza.util" />
+ <allow pkg="org.apache.samza.job" />
+ <allow pkg="org.apache.samza.config" />
+ <allow pkg="joptsimple" />
+
+ <allow class="org.apache.samza.SamzaException" />
+ <allow class="org.apache.samza.Partition" />
</subpackage>
<subpackage name="logging">
http://git-wip-us.apache.org/repos/asf/samza/blob/81542ecf/docs/learn/documentation/versioned/container/state-management.md
----------------------------------------------------------------------
diff --git a/docs/learn/documentation/versioned/container/state-management.md b/docs/learn/documentation/versioned/container/state-management.md
index 79067bb..50d4b65 100644
--- a/docs/learn/documentation/versioned/container/state-management.md
+++ b/docs/learn/documentation/versioned/container/state-management.md
@@ -184,6 +184,16 @@ public interface KeyValueStore<K, V> {
Additional configuration properties for the key-value store are documented in the [configuration reference](../jobs/configuration-table.html#keyvalue-rocksdb).
+### Debug Key-value storage
+
+Currently Samza provides a state storage tool which can recover the state store from the changelog stream to user-specified directory for reusing and debugging.
+
+{% highlight bash %}
+samza-example/target/bin/state-storage-tool.sh \
+ --config-path=file:///path/to/job/config.properties \
+ --path=directory/to/put/state/stores
+{% endhighlight %}
+
#### Known Issues
RocksDB has several rough edges. It's recommended that you read the RocksDB [tuning guide](https://github.com/facebook/rocksdb/wiki/RocksDB-Tuning-Guide). Some other notes to be aware of are:
http://git-wip-us.apache.org/repos/asf/samza/blob/81542ecf/samza-core/src/main/java/org/apache/samza/config/JavaStorageConfig.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/config/JavaStorageConfig.java b/samza-core/src/main/java/org/apache/samza/config/JavaStorageConfig.java
new file mode 100644
index 0000000..af7d4ca
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/config/JavaStorageConfig.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.config;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * a java version of the storage config
+ */
+public class JavaStorageConfig extends MapConfig {
+
+ private static final String FACTORY_SUFFIX = ".factory";
+ private static final String STORE_PREFIX = "stores.";
+
+ public JavaStorageConfig(Config config) {
+ super(config);
+ }
+
+ public List<String> getStoreNames() {
+ Config subConfig = subset(STORE_PREFIX, true);
+ List<String> storeNames = new ArrayList<String>();
+ for (String key : subConfig.keySet()) {
+ if (key.endsWith(FACTORY_SUFFIX)) {
+ storeNames.add(key.substring(0, key.length() - FACTORY_SUFFIX.length()));
+ }
+ }
+ return storeNames;
+ }
+
+ public String getChangelogStream(String storeName) {
+ return get(String.format(StorageConfig.CHANGELOG_STREAM(), storeName), null);
+ }
+
+ public String getStorageFactoryClassName(String storeName) {
+ return get(String.format(StorageConfig.FACTORY(), storeName), null);
+ }
+
+ public String getStorageKeySerde(String storeName) {
+ return get(String.format(StorageConfig.KEY_SERDE(), storeName), null);
+ }
+
+ public String getStorageMsgSerde(String storeName) {
+ return get(String.format(StorageConfig.MSG_SERDE(), storeName), null);
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/samza/blob/81542ecf/samza-core/src/main/java/org/apache/samza/config/JavaSystemConfig.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/config/JavaSystemConfig.java b/samza-core/src/main/java/org/apache/samza/config/JavaSystemConfig.java
new file mode 100644
index 0000000..cf8d640
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/config/JavaSystemConfig.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.config;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * a java version of the system config
+ */
+public class JavaSystemConfig extends MapConfig {
+ private static final String SYSTEM_PREFIX = "systems.";
+ private static final String SYSTEM_FACTORY_SUFFIX = ".samza.factory";
+ private static final String SYSTEM_FACTORY = "systems.%s.samza.factory";
+ private static final String EMPTY = "";
+
+ public JavaSystemConfig(Config config) {
+ super(config);
+ }
+
+ public String getSystemFactory(String name) {
+ if (name == null) {
+ return null;
+ }
+ String systemFactory = String.format(SYSTEM_FACTORY, name);
+ return get(systemFactory, null);
+ }
+
+ /**
+ * Get a list of system names.
+ *
+ * @return A list system names
+ */
+ public List<String> getSystemNames() {
+ Config subConf = subset(SYSTEM_PREFIX, true);
+ ArrayList<String> systemNames = new ArrayList<String>();
+ for (Map.Entry<String, String> entry : subConf.entrySet()) {
+ String key = entry.getKey();
+ if (key.endsWith(SYSTEM_FACTORY_SUFFIX)) {
+ systemNames.add(key.replace(SYSTEM_FACTORY_SUFFIX, EMPTY));
+ }
+ }
+ return systemNames;
+ }
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/81542ecf/samza-core/src/main/java/org/apache/samza/storage/StateStorageTool.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/storage/StateStorageTool.java b/samza-core/src/main/java/org/apache/samza/storage/StateStorageTool.java
new file mode 100644
index 0000000..beba35c
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/storage/StateStorageTool.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.storage;
+
+import joptsimple.ArgumentAcceptingOptionSpec;
+import joptsimple.OptionSet;
+
+import org.apache.samza.config.MapConfig;
+import org.apache.samza.util.CommandLine;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Commandline tool to recover the state storage to a specified directory
+ */
+public class StateStorageTool extends CommandLine {
+ private ArgumentAcceptingOptionSpec<String> newPathArgu = parser().accepts("path", "path of the new state storage").withRequiredArg().ofType(String.class).describedAs("path");
+ private String newPath = "";
+ private Logger log = LoggerFactory.getLogger(StateStorageTool.class);
+
+ @Override
+ public MapConfig loadConfig(OptionSet options) {
+ MapConfig config = super.loadConfig(options);
+ if (options.has(newPathArgu)) {
+ newPath = options.valueOf(newPathArgu);
+ log.info("new state storage is " + newPath);
+ }
+ return config;
+ }
+
+ public String getPath() {
+ return newPath;
+ }
+
+ public static void main(String[] args) {
+ StateStorageTool tool = new StateStorageTool();
+ OptionSet options = tool.parser().parse(args);
+ MapConfig config = tool.loadConfig(options);
+ String path = tool.getPath();
+
+ StorageRecovery storageRecovery = new StorageRecovery(config, path);
+ storageRecovery.run();
+ }
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/81542ecf/samza-core/src/main/java/org/apache/samza/storage/StorageRecovery.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/storage/StorageRecovery.java b/samza-core/src/main/java/org/apache/samza/storage/StorageRecovery.java
new file mode 100644
index 0000000..c564964
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/storage/StorageRecovery.java
@@ -0,0 +1,256 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.storage;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+
+import org.apache.samza.SamzaException;
+import org.apache.samza.config.Config;
+import org.apache.samza.config.JavaStorageConfig;
+import org.apache.samza.config.JavaSystemConfig;
+import org.apache.samza.container.SamzaContainerContext;
+import org.apache.samza.coordinator.JobCoordinator;
+import org.apache.samza.job.model.ContainerModel;
+import org.apache.samza.job.model.JobModel;
+import org.apache.samza.job.model.TaskModel;
+import org.apache.samza.metrics.MetricsRegistryMap;
+import org.apache.samza.serializers.ByteSerde;
+import org.apache.samza.serializers.Serde;
+import org.apache.samza.system.StreamMetadataCache;
+import org.apache.samza.system.SystemAdmin;
+import org.apache.samza.system.SystemConsumer;
+import org.apache.samza.system.SystemFactory;
+import org.apache.samza.system.SystemStream;
+import org.apache.samza.system.SystemStreamPartition;
+import org.apache.samza.util.CommandLine;
+import org.apache.samza.util.SystemClock;
+import org.apache.samza.util.Util;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Recovers the state storages from the changelog streams and store the storages
+ * in the directory provided by the users. The changelog streams are derived
+ * from the job's config file.
+ */
+public class StorageRecovery extends CommandLine {
+
+ private Config jobConfig;
+ private int maxPartitionNumber = 0;
+ private File storeBaseDir = null;
+ private HashMap<String, SystemStream> changeLogSystemStreams = new HashMap<String, SystemStream>();
+ private HashMap<String, StorageEngineFactory<?, ?>> storageEngineFactories = new HashMap<String, StorageEngineFactory<?, ?>>();
+ private HashMap<String, SystemFactory> systemFactories = new HashMap<String, SystemFactory>();
+ private HashMap<String, SystemAdmin> systemAdmins = new HashMap<String, SystemAdmin>();
+ private Map<Integer, ContainerModel> containers = new HashMap<Integer, ContainerModel>();
+ private List<TaskStorageManager> taskStorageManagers = new ArrayList<TaskStorageManager>();
+ private Logger log = LoggerFactory.getLogger(StorageRecovery.class);
+
+ /**
+ * Construct the StorageRecovery
+ *
+ * @param config
+ * the job config
+ * @param path
+ * the directory path where we put the stores
+ */
+ StorageRecovery(Config config, String path) {
+ jobConfig = config;
+ storeBaseDir = new File(path, "state");
+ }
+
+ /**
+ * setup phase which assigns required values to the variables used for all
+ * tasks.
+ */
+ private void setup() {
+ log.info("setting up the recovery...");
+
+ getContainerModels();
+ getSystemFactoriesAndAdmins();
+ getChangeLogSystemStreamsAndStorageFactories();
+ getChangeLogMaxPartitionNumber();
+ getTaskStorageManagers();
+ }
+
+ /**
+ * run the setup phase and restore all the task storages
+ */
+ public void run() {
+ setup();
+
+ log.info("start recovering...");
+
+ for (TaskStorageManager taskStorageManager : taskStorageManagers) {
+ taskStorageManager.init();
+ taskStorageManager.stopStores();
+ log.debug("restored " + taskStorageManager.toString());
+ }
+
+ log.info("successfully recovered in " + storeBaseDir.toString());
+ }
+
+ /**
+ * build the ContainerModels from job config file and put the results in the
+ * map
+ */
+ private void getContainerModels() {
+ JobModel jobModel = JobCoordinator.apply(jobConfig).jobModel();
+ containers = jobModel.getContainers();
+ }
+
+ /**
+ * get the SystemFactories and SystemAdmins specified in the config file and
+ * put them into the maps
+ */
+ private void getSystemFactoriesAndAdmins() {
+ JavaSystemConfig systemConfig = new JavaSystemConfig(jobConfig);
+ List<String> systems = systemConfig.getSystemNames();
+
+ for (String system : systems) {
+ String systemFactory = systemConfig.getSystemFactory(system);
+ if (systemFactory == null) {
+ throw new SamzaException("A stream uses system " + system + " which is missing from the configuration.");
+ }
+ systemFactories.put(system, Util.<SystemFactory>getObj(systemFactory));
+ systemAdmins.put(system, Util.<SystemFactory>getObj(systemFactory).getAdmin(system, jobConfig));
+ }
+
+ log.info("Got system factories: " + systemFactories.keySet().toString());
+ log.info("Got system admins: " + systemAdmins.keySet().toString());
+ }
+
+ /**
+ * get the changelog streams and the storage factories from the config file
+ * and put them into the maps
+ */
+ private void getChangeLogSystemStreamsAndStorageFactories() {
+ JavaStorageConfig config = new JavaStorageConfig(jobConfig);
+ List<String> storeNames = config.getStoreNames();
+
+ log.info("Got store names: " + storeNames.toString());
+
+ for (String storeName : storeNames) {
+ String streamName = config.getChangelogStream(storeName);
+
+ log.info("stream name for " + storeName + " is " + streamName);
+
+ if (streamName != null) {
+ changeLogSystemStreams.put(storeName, Util.getSystemStreamFromNames(streamName));
+ }
+
+ String factoryClass = config.getStorageFactoryClassName(storeName);
+ if (factoryClass != null) {
+ storageEngineFactories.put(storeName, Util.<StorageEngineFactory<Object, Object>>getObj(factoryClass));
+ } else {
+ throw new SamzaException("Missing storage factory for " + storeName + ".");
+ }
+ }
+ }
+
+ /**
+ * get the SystemConsumers for the stores
+ */
+ private HashMap<String, SystemConsumer> getStoreConsumers() {
+ HashMap<String, SystemConsumer> storeConsumers = new HashMap<String, SystemConsumer>();
+
+ for (Entry<String, SystemStream> entry : changeLogSystemStreams.entrySet()) {
+ String storeSystem = entry.getValue().getSystem();
+ if (!systemFactories.containsKey(storeSystem)) {
+ throw new SamzaException("Changelog system " + storeSystem + " for store " + entry.getKey() + " does not exist in the config.");
+ }
+ storeConsumers.put(entry.getKey(), systemFactories.get(storeSystem).getConsumer(storeSystem, jobConfig, new MetricsRegistryMap()));
+ }
+
+ return storeConsumers;
+ }
+
+ /**
+ * get the max partition number of the changelog stream
+ */
+ private void getChangeLogMaxPartitionNumber() {
+ int maxPartitionId = 0;
+ for (ContainerModel containerModel : containers.values()) {
+ for (TaskModel taskModel : containerModel.getTasks().values()) {
+ maxPartitionId = Math.max(maxPartitionId, taskModel.getChangelogPartition().getPartitionId());
+ }
+ }
+ maxPartitionNumber = maxPartitionId + 1;
+ }
+
+ /**
+ * create one TaskStorageManager for each task. Add all of them to the
+ * List<TaskStorageManager>
+ */
+ private void getTaskStorageManagers() {
+ StreamMetadataCache streamMetadataCache = new StreamMetadataCache(Util.javaMapAsScalaMap(systemAdmins), 5000, SystemClock.instance());
+
+ for (ContainerModel containerModel : containers.values()) {
+ HashMap<String, StorageEngine> taskStores = new HashMap<String, StorageEngine>();
+ SamzaContainerContext containerContext = new SamzaContainerContext(containerModel.getContainerId(), jobConfig, containerModel.getTasks()
+ .keySet());
+
+ for (TaskModel taskModel : containerModel.getTasks().values()) {
+ HashMap<String, SystemConsumer> storeConsumers = getStoreConsumers();
+
+ for (Entry<String, StorageEngineFactory<?, ?>> entry : storageEngineFactories.entrySet()) {
+ String storeName = entry.getKey();
+
+ if (changeLogSystemStreams.containsKey(storeName)) {
+ SystemStreamPartition changeLogSystemStreamPartition = new SystemStreamPartition(changeLogSystemStreams.get(storeName),
+ taskModel.getChangelogPartition());
+ File storePartitionDir = TaskStorageManager.getStorePartitionDir(storeBaseDir, storeName, taskModel.getTaskName());
+
+ log.info("Got storage engine directory: " + storePartitionDir);
+
+ StorageEngine storageEngine = (entry.getValue()).getStorageEngine(
+ storeName,
+ storePartitionDir,
+ (Serde) new ByteSerde(),
+ (Serde) new ByteSerde(),
+ null,
+ new MetricsRegistryMap(),
+ changeLogSystemStreamPartition,
+ containerContext);
+ taskStores.put(storeName, storageEngine);
+ }
+ }
+
+ TaskStorageManager taskStorageManager = new TaskStorageManager(
+ taskModel.getTaskName(),
+ Util.javaMapAsScalaMap(taskStores),
+ Util.javaMapAsScalaMap(storeConsumers),
+ Util.javaMapAsScalaMap(changeLogSystemStreams),
+ maxPartitionNumber,
+ streamMetadataCache,
+ storeBaseDir,
+ storeBaseDir, taskModel.getChangelogPartition(),
+ Util.javaMapAsScalaMap(systemAdmins));
+
+ taskStorageManagers.add(taskStorageManager);
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/81542ecf/samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala b/samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala
index aeba61a..c39cdc7 100644
--- a/samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala
+++ b/samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala
@@ -36,7 +36,7 @@ object TaskStorageManager {
def getStorePartitionDir(storeBaseDir: File, storeName: String, taskName: TaskName) = {
// TODO: Sanitize, check and clean taskName string as a valid value for a file
- new File(storeBaseDir, storeName + File.separator + taskName)
+ new File(storeBaseDir, (storeName + File.separator + taskName.toString).replace(' ', '_'))
}
}
@@ -188,9 +188,13 @@ class TaskStorageManager(
taskStores.values.foreach(_.flush)
}
- def stop() {
+ def stopStores() {
debug("Stopping stores.")
taskStores.values.foreach(_.stop)
+ }
+
+ def stop() {
+ stopStores()
debug("Persisting logged key value stores")
changeLogSystemStreams.foreach { case (store, systemStream) => {
@@ -208,7 +212,6 @@ class TaskStorageManager(
}}
}
-
/**
* Builds a map from SystemStreamPartition to oldest offset for changelogs.
*/
http://git-wip-us.apache.org/repos/asf/samza/blob/81542ecf/samza-core/src/main/scala/org/apache/samza/util/Util.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/util/Util.scala b/samza-core/src/main/scala/org/apache/samza/util/Util.scala
index 2feb65b..419452c 100644
--- a/samza-core/src/main/scala/org/apache/samza/util/Util.scala
+++ b/samza-core/src/main/scala/org/apache/samza/util/Util.scala
@@ -34,6 +34,10 @@ import org.apache.samza.config.ConfigException
import org.apache.samza.config.MapConfig
import scala.collection.JavaConversions._
import org.apache.samza.config.JobConfig
+import org.apache.samza.job.model.JobModel
+import java.io.InputStreamReader
+import scala.collection.JavaConverters._
+import scala.collection.immutable.Map
object Util extends Logging {
val random = new Random
@@ -167,7 +171,6 @@ object Util extends Logging {
body
}
-
/**
* Generates a coordinator stream name based off of the job name and job id
* for the jobd. The format is of the stream name will be
@@ -294,4 +297,11 @@ object Util extends Logging {
fis.close()
}
}
+
+ /**
+ * Convert a java map to a Scala map
+ * */
+ def javaMapAsScalaMap[T, K](javaMap: java.util.Map[T, K]): Map[T, K] = {
+ javaMap.toMap
+ }
}
http://git-wip-us.apache.org/repos/asf/samza/blob/81542ecf/samza-core/src/test/java/org/apache/samza/config/TestJavaStorageConfig.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/config/TestJavaStorageConfig.java b/samza-core/src/test/java/org/apache/samza/config/TestJavaStorageConfig.java
new file mode 100644
index 0000000..6c93697
--- /dev/null
+++ b/samza-core/src/test/java/org/apache/samza/config/TestJavaStorageConfig.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.config;
+
+import static org.junit.Assert.*;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.junit.Test;
+
+public class TestJavaStorageConfig {
+
+ @Test
+ public void testStorageConfig() {
+ Map<String, String> map = new HashMap<String, String>();
+ map.put("stores.test.factory", "testFactory");
+ map.put("stores.test.changelog", "testChangelog");
+ map.put("stores.test.key.serde", "string");
+ map.put("stores.test.msg.serde", "integer");
+ JavaStorageConfig config = new JavaStorageConfig(new MapConfig(map));
+
+ assertEquals("testFactory", config.getStorageFactoryClassName("test"));
+ assertEquals("testChangelog", config.getChangelogStream("test"));
+ assertEquals("string", config.getStorageKeySerde("test"));
+ assertEquals("integer", config.getStorageMsgSerde("test"));
+ assertEquals("test", config.getStoreNames().get(0));
+ }
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/81542ecf/samza-core/src/test/java/org/apache/samza/config/TestJavaSystemConfig.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/config/TestJavaSystemConfig.java b/samza-core/src/test/java/org/apache/samza/config/TestJavaSystemConfig.java
new file mode 100644
index 0000000..9b39ec8
--- /dev/null
+++ b/samza-core/src/test/java/org/apache/samza/config/TestJavaSystemConfig.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.config;
+
+import static org.junit.Assert.*;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.junit.Test;
+
+public class TestJavaSystemConfig {
+
+ @Test
+ public void testGetSystemNames() {
+ Map<String, String> map = new HashMap<String, String>();
+ map.put("systems.system1.samza.factory", "1");
+ map.put("systems.system2.samza.factory", "2");
+ JavaSystemConfig systemConfig = new JavaSystemConfig(
+ new MapConfig(map));
+
+ assertEquals(2, systemConfig.getSystemNames().size());
+ }
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/81542ecf/samza-core/src/test/java/org/apache/samza/storage/MockStorageEngine.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/storage/MockStorageEngine.java b/samza-core/src/test/java/org/apache/samza/storage/MockStorageEngine.java
new file mode 100644
index 0000000..b90ea87
--- /dev/null
+++ b/samza-core/src/test/java/org/apache/samza/storage/MockStorageEngine.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.storage;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.Iterator;
+
+import org.apache.samza.system.IncomingMessageEnvelope;
+import org.apache.samza.system.SystemStreamPartition;
+
+/**
+ * A mock StorageEngine that stores what it receives from the StorageEngine.
+ * Those variables/values can be retrieved directly by variable names.
+ */
+public class MockStorageEngine implements StorageEngine {
+
+ public static String storeName;
+ public static File storeDir;
+ public static SystemStreamPartition ssp;
+ public static ArrayList<IncomingMessageEnvelope> incomingMessageEnvelopes = new ArrayList<IncomingMessageEnvelope>();
+
+ public MockStorageEngine(String storeName, File storeDir, SystemStreamPartition changeLogSystemStreamPartition) {
+ MockStorageEngine.storeName = storeName;
+ MockStorageEngine.storeDir = storeDir;
+ MockStorageEngine.ssp = changeLogSystemStreamPartition;
+ }
+
+ @Override
+ public void restore(Iterator<IncomingMessageEnvelope> envelopes) {
+ while (envelopes.hasNext()) {
+ incomingMessageEnvelopes.add(envelopes.next());
+ }
+ }
+
+ @Override
+ public void flush() {
+ }
+
+ @Override
+ public void stop() {
+ }
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/81542ecf/samza-core/src/test/java/org/apache/samza/storage/MockStorageEngineFactory.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/storage/MockStorageEngineFactory.java b/samza-core/src/test/java/org/apache/samza/storage/MockStorageEngineFactory.java
new file mode 100644
index 0000000..c00c454
--- /dev/null
+++ b/samza-core/src/test/java/org/apache/samza/storage/MockStorageEngineFactory.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.storage;
+
+import java.io.File;
+
+import org.apache.samza.container.SamzaContainerContext;
+import org.apache.samza.metrics.MetricsRegistry;
+import org.apache.samza.serializers.Serde;
+import org.apache.samza.system.SystemStreamPartition;
+import org.apache.samza.task.MessageCollector;
+
+public class MockStorageEngineFactory implements StorageEngineFactory<Object, Object> {
+ @Override
+ public StorageEngine getStorageEngine(String storeName, File storeDir, Serde<Object> keySerde, Serde<Object> msgSerde,
+ MessageCollector collector, MetricsRegistry registry, SystemStreamPartition changeLogSystemStreamPartition,
+ SamzaContainerContext containerContext) {
+ return new MockStorageEngine(storeName, storeDir, changeLogSystemStreamPartition);
+ }
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/81542ecf/samza-core/src/test/java/org/apache/samza/storage/MockSystemConsumer.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/storage/MockSystemConsumer.java b/samza-core/src/test/java/org/apache/samza/storage/MockSystemConsumer.java
new file mode 100644
index 0000000..07c4a24
--- /dev/null
+++ b/samza-core/src/test/java/org/apache/samza/storage/MockSystemConsumer.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.storage;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.samza.system.IncomingMessageEnvelope;
+import org.apache.samza.system.SystemConsumer;
+import org.apache.samza.system.SystemStreamPartition;
+
+public class MockSystemConsumer implements SystemConsumer {
+ public static Map<SystemStreamPartition, List<IncomingMessageEnvelope>> messages = new HashMap<SystemStreamPartition, List<IncomingMessageEnvelope>>();
+ private boolean flag = true; // flag to make sure the messages only are
+ // returned once
+
+ @Override
+ public void start() {}
+
+ @Override
+ public void stop() {}
+
+ @Override
+ public void register(SystemStreamPartition systemStreamPartition, String offset) {}
+
+ @Override
+ public Map<SystemStreamPartition, List<IncomingMessageEnvelope>> poll(Set<SystemStreamPartition> systemStreamPartitions, long timeout) throws InterruptedException {
+ if (flag) {
+ ArrayList<IncomingMessageEnvelope> list = new ArrayList<IncomingMessageEnvelope>();
+ list.add(TestStorageRecovery.msg);
+ messages.put(TestStorageRecovery.ssp, list);
+ flag = false;
+ return messages;
+ } else {
+ messages.clear();
+ return messages;
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/samza/blob/81542ecf/samza-core/src/test/java/org/apache/samza/storage/MockSystemFactory.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/storage/MockSystemFactory.java b/samza-core/src/test/java/org/apache/samza/storage/MockSystemFactory.java
new file mode 100644
index 0000000..7abf82b
--- /dev/null
+++ b/samza-core/src/test/java/org/apache/samza/storage/MockSystemFactory.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.storage;
+
+import org.apache.samza.config.Config;
+import org.apache.samza.metrics.MetricsRegistry;
+import org.apache.samza.system.SystemAdmin;
+import org.apache.samza.system.SystemConsumer;
+import org.apache.samza.system.SystemFactory;
+import org.apache.samza.system.SystemProducer;
+
+public class MockSystemFactory implements SystemFactory {
+
+ @Override
+ public SystemConsumer getConsumer(String systemName, Config config, MetricsRegistry registry) {
+ return new MockSystemConsumer();
+ }
+
+ @Override
+ public SystemProducer getProducer(String systemName, Config config, MetricsRegistry registry) {
+ return null;
+ }
+
+ @Override
+ public SystemAdmin getAdmin(String systemName, Config config) {
+ return TestStorageRecovery.systemAdmin;
+ }
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/81542ecf/samza-core/src/test/java/org/apache/samza/storage/TestStorageRecovery.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/storage/TestStorageRecovery.java b/samza-core/src/test/java/org/apache/samza/storage/TestStorageRecovery.java
new file mode 100644
index 0000000..b8ae592
--- /dev/null
+++ b/samza-core/src/test/java/org/apache/samza/storage/TestStorageRecovery.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.storage;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.samza.Partition;
+import org.apache.samza.config.Config;
+import org.apache.samza.config.MapConfig;
+import org.apache.samza.coordinator.stream.MockCoordinatorStreamSystemFactory;
+import org.apache.samza.system.IncomingMessageEnvelope;
+import org.apache.samza.system.SystemAdmin;
+import org.apache.samza.system.SystemConsumer;
+import org.apache.samza.system.SystemStreamMetadata;
+import org.apache.samza.system.SystemStreamMetadata.SystemStreamPartitionMetadata;
+import org.apache.samza.system.SystemStreamPartition;
+import org.junit.Before;
+import org.junit.Test;
+
+import static org.junit.Assert.*;
+import static org.mockito.Mockito.*;
+
+public class TestStorageRecovery {
+
+ public static SystemConsumer systemConsumer1 = null;
+ public static SystemConsumer systemConsumer2 = null;
+ public static SystemAdmin systemAdmin = null;
+ public static Config config = null;
+ public static SystemStreamMetadata systemStreamMetadata = null;
+ public static SystemStreamMetadata inputSystemStreamMetadata = null;
+ public final static String SYSTEM_STREAM_NAME = "changelog";
+ public final static String INPUT_STREAM = "input";
+ public static SystemStreamPartition ssp = new SystemStreamPartition("mockSystem", SYSTEM_STREAM_NAME, new Partition(0));
+ public static IncomingMessageEnvelope msg = new IncomingMessageEnvelope(TestStorageRecovery.ssp, "0", "test", "test");
+
+ @Before
+ public void setup() throws InterruptedException {
+ putConfig();
+ putMetadata();
+
+ systemAdmin = mock(SystemAdmin.class);
+
+ Set<String> set1 = new HashSet<String>(Arrays.asList(SYSTEM_STREAM_NAME));
+ Set<String> set2 = new HashSet<String>(Arrays.asList(INPUT_STREAM));
+ HashMap<String, SystemStreamMetadata> ssmMap = new HashMap<String, SystemStreamMetadata>();
+ ssmMap.put(SYSTEM_STREAM_NAME, systemStreamMetadata);
+ ssmMap.put(INPUT_STREAM, inputSystemStreamMetadata);
+ when(systemAdmin.getSystemStreamMetadata(set1)).thenReturn(ssmMap);
+ when(systemAdmin.getSystemStreamMetadata(set2)).thenReturn(ssmMap);
+ }
+
+ @Test
+ public void testStorageEngineReceivedAllValues() {
+ String path = "/tmp/testing";
+ StorageRecovery storageRecovery = new StorageRecovery(config, path);
+ storageRecovery.run();
+
+ // because the stream has two partitions
+ assertEquals(2, MockStorageEngine.incomingMessageEnvelopes.size());
+ assertEquals(TestStorageRecovery.msg, MockStorageEngine.incomingMessageEnvelopes.get(0));
+ assertEquals(TestStorageRecovery.msg, MockStorageEngine.incomingMessageEnvelopes.get(1));
+ // correct path is passed to the store engine
+ assertEquals(path + "/state/testStore/Partition_1", MockStorageEngine.storeDir.toString());
+ }
+
+ private void putConfig() {
+ Map<String, String> map = new HashMap<String, String>();
+ map.put("job.name", "changelogTest");
+ map.put("systems.mockSystem.samza.factory", MockSystemFactory.class.getCanonicalName());
+ map.put("stores.testStore.factory", MockStorageEngineFactory.class.getCanonicalName());
+ map.put("stores.testStore.changelog", "mockSystem." + SYSTEM_STREAM_NAME);
+ map.put("task.inputs", "mockSystem.input");
+ map.put("job.coordinator.system", "coordinator");
+ map.put("systems.coordinator.samza.factory", MockCoordinatorStreamSystemFactory.class.getCanonicalName());
+ config = new MapConfig(map);
+ }
+
+ private void putMetadata() {
+ SystemStreamMetadata.SystemStreamPartitionMetadata sspm = new SystemStreamMetadata.SystemStreamPartitionMetadata("0", "1", "2");
+ HashMap<Partition, SystemStreamPartitionMetadata> map = new HashMap<Partition, SystemStreamPartitionMetadata>();
+ map.put(new Partition(0), sspm);
+ map.put(new Partition(1), sspm);
+ systemStreamMetadata = new SystemStreamMetadata(SYSTEM_STREAM_NAME, map);
+
+ HashMap<Partition, SystemStreamPartitionMetadata> map1 = new HashMap<Partition, SystemStreamPartitionMetadata>();
+ map1.put(new Partition(0), sspm);
+ map1.put(new Partition(1), sspm);
+ inputSystemStreamMetadata = new SystemStreamMetadata(INPUT_STREAM, map1);
+ }
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/81542ecf/samza-log4j/src/main/java/org/apache/samza/config/Log4jSystemConfig.java
----------------------------------------------------------------------
diff --git a/samza-log4j/src/main/java/org/apache/samza/config/Log4jSystemConfig.java b/samza-log4j/src/main/java/org/apache/samza/config/Log4jSystemConfig.java
index d5e24f2..209296d 100644
--- a/samza-log4j/src/main/java/org/apache/samza/config/Log4jSystemConfig.java
+++ b/samza-log4j/src/main/java/org/apache/samza/config/Log4jSystemConfig.java
@@ -19,25 +19,19 @@
package org.apache.samza.config;
-import java.util.ArrayList;
import java.util.List;
-import java.util.Map;
/**
* This class contains the methods for getting properties that are needed by the
* StreamAppender.
*/
-public class Log4jSystemConfig {
+public class Log4jSystemConfig extends JavaSystemConfig {
private static final String LOCATION_ENABLED = "task.log4j.location.info.enabled";
private static final String TASK_LOG4J_SYSTEM = "task.log4j.system";
- private static final String SYSTEM_PREFIX = "systems.";
- private static final String SYSTEM_FACTORY_SUFFIX = ".samza.factory";
- private static final String EMPTY = "";
- private Config config = null;
public Log4jSystemConfig(Config config) {
- this.config = config;
+ super(config);
}
/**
@@ -49,7 +43,7 @@ public class Log4jSystemConfig {
* information in Log4J appender messages.
*/
public boolean getLocationEnabled() {
- return "true".equals(config.get(Log4jSystemConfig.LOCATION_ENABLED, "false"));
+ return "true".equals(get(Log4jSystemConfig.LOCATION_ENABLED, "false"));
}
/**
@@ -59,7 +53,7 @@ public class Log4jSystemConfig {
* @return log4j system name
*/
public String getSystemName() {
- String log4jSystem = config.get(TASK_LOG4J_SYSTEM, null);
+ String log4jSystem = get(TASK_LOG4J_SYSTEM, null);
if (log4jSystem == null) {
List<String> systemNames = getSystemNames();
if (systemNames.size() == 1) {
@@ -72,19 +66,11 @@ public class Log4jSystemConfig {
}
public String getJobName() {
- return config.get(JobConfig.JOB_NAME(), null);
+ return get(JobConfig.JOB_NAME(), null);
}
public String getJobId() {
- return config.get(JobConfig.JOB_ID(), null);
- }
-
- public String getSystemFactory(String name) {
- if (name == null) {
- return null;
- }
- String systemFactory = String.format(SystemConfig.SYSTEM_FACTORY(), name);
- return config.get(systemFactory, null);
+ return get(JobConfig.JOB_ID(), null);
}
/**
@@ -96,28 +82,11 @@ public class Log4jSystemConfig {
* supplied serde name.
*/
public String getSerdeClass(String name) {
- return config.get(String.format(SerializerConfig.SERDE(), name), null);
+ return get(String.format(SerializerConfig.SERDE(), name), null);
}
public String getStreamSerdeName(String systemName, String streamName) {
String streamSerdeNameConfig = String.format(StreamConfig.MSG_SERDE(), systemName, streamName);
- return config.get(streamSerdeNameConfig, null);
- }
-
- /**
- * Get a list of system names.
- *
- * @return A list system names
- */
- protected List<String> getSystemNames() {
- Config subConf = config.subset(SYSTEM_PREFIX, true);
- ArrayList<String> systemNames = new ArrayList<String>();
- for (Map.Entry<String, String> entry : subConf.entrySet()) {
- String key = entry.getKey();
- if (key.endsWith(SYSTEM_FACTORY_SUFFIX)) {
- systemNames.add(key.replace(SYSTEM_FACTORY_SUFFIX, EMPTY));
- }
- }
- return systemNames;
+ return get(streamSerdeNameConfig, null);
}
}
http://git-wip-us.apache.org/repos/asf/samza/blob/81542ecf/samza-shell/src/main/bash/state-storage-tool.sh
----------------------------------------------------------------------
diff --git a/samza-shell/src/main/bash/state-storage-tool.sh b/samza-shell/src/main/bash/state-storage-tool.sh
new file mode 100755
index 0000000..05a4f25
--- /dev/null
+++ b/samza-shell/src/main/bash/state-storage-tool.sh
@@ -0,0 +1,21 @@
+#!/bin/bash
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+[[ $JAVA_OPTS != *-Dlog4j.configuration* ]] && export JAVA_OPTS="$JAVA_OPTS -Dlog4j.configuration=file:$(dirname $0)/log4j-console.xml"
+
+exec $(dirname $0)/run-class.sh org.apache.samza.storage.StateStorageTool "$@"