You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by bo...@apache.org on 2017/04/06 17:33:33 UTC

[44/52] [partial] storm git commit: STORM-2441 Break down 'storm-core' to extract client (worker) artifacts

http://git-wip-us.apache.org/repos/asf/storm/blob/4de339a8/storm-client/src/jvm/org/apache/storm/cluster/ClusterStateContext.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/cluster/ClusterStateContext.java b/storm-client/src/jvm/org/apache/storm/cluster/ClusterStateContext.java
new file mode 100644
index 0000000..9ad6a92
--- /dev/null
+++ b/storm-client/src/jvm/org/apache/storm/cluster/ClusterStateContext.java
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.cluster;
+
+/**
+ * This class is intended to provide runtime-context to StateStorageFactory
+ * implementors, giving information such as what daemon is creating it.
+ */
+public class ClusterStateContext {
+    
+    private DaemonType daemonType;
+
+    public ClusterStateContext() {
+        daemonType = DaemonType.UNKNOWN;
+    }
+    
+    public ClusterStateContext(DaemonType daemonType) {
+        this.daemonType = daemonType;
+    }
+    
+    public DaemonType getDaemonType() {
+        return daemonType;
+    }
+    
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/4de339a8/storm-client/src/jvm/org/apache/storm/cluster/ClusterStateListener.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/cluster/ClusterStateListener.java b/storm-client/src/jvm/org/apache/storm/cluster/ClusterStateListener.java
new file mode 100644
index 0000000..30e467e
--- /dev/null
+++ b/storm-client/src/jvm/org/apache/storm/cluster/ClusterStateListener.java
@@ -0,0 +1,22 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.cluster;
+
+public interface ClusterStateListener {
+    void stateChanged(ConnectionState newState);
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/4de339a8/storm-client/src/jvm/org/apache/storm/cluster/ClusterUtils.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/cluster/ClusterUtils.java b/storm-client/src/jvm/org/apache/storm/cluster/ClusterUtils.java
new file mode 100644
index 0000000..a800b07
--- /dev/null
+++ b/storm-client/src/jvm/org/apache/storm/cluster/ClusterUtils.java
@@ -0,0 +1,244 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.cluster;
+
+import org.apache.storm.Config;
+import org.apache.storm.generated.ClusterWorkerHeartbeat;
+import org.apache.storm.generated.ExecutorInfo;
+import org.apache.storm.generated.ExecutorStats;
+import org.apache.storm.generated.ProfileAction;
+import org.apache.storm.utils.Utils;
+import org.apache.zookeeper.ZooDefs;
+import org.apache.zookeeper.data.ACL;
+import org.apache.zookeeper.data.Id;
+import org.apache.zookeeper.server.auth.DigestAuthenticationProvider;
+
+import java.io.PrintWriter;
+import java.io.StringWriter;
+import java.io.UnsupportedEncodingException;
+import java.net.URLEncoder;
+import java.security.NoSuchAlgorithmException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class ClusterUtils {
+
+    public static final String ZK_SEPERATOR = "/";
+
+    public static final String ASSIGNMENTS_ROOT = "assignments";
+    public static final String CODE_ROOT = "code";
+    public static final String STORMS_ROOT = "storms";
+    public static final String SUPERVISORS_ROOT = "supervisors";
+    public static final String WORKERBEATS_ROOT = "workerbeats";
+    public static final String BACKPRESSURE_ROOT = "backpressure";
+    public static final String ERRORS_ROOT = "errors";
+    public static final String BLOBSTORE_ROOT = "blobstore";
+    public static final String BLOBSTORE_MAX_KEY_SEQUENCE_NUMBER_ROOT = "blobstoremaxkeysequencenumber";
+    public static final String NIMBUSES_ROOT = "nimbuses";
+    public static final String CREDENTIALS_ROOT = "credentials";
+    public static final String LOGCONFIG_ROOT = "logconfigs";
+    public static final String PROFILERCONFIG_ROOT = "profilerconfigs";
+
+    public static final String ASSIGNMENTS_SUBTREE = ZK_SEPERATOR + ASSIGNMENTS_ROOT;
+    public static final String STORMS_SUBTREE = ZK_SEPERATOR + STORMS_ROOT;
+    public static final String SUPERVISORS_SUBTREE = ZK_SEPERATOR + SUPERVISORS_ROOT;
+    public static final String WORKERBEATS_SUBTREE = ZK_SEPERATOR + WORKERBEATS_ROOT;
+    public static final String BACKPRESSURE_SUBTREE = ZK_SEPERATOR + BACKPRESSURE_ROOT;
+    public static final String ERRORS_SUBTREE = ZK_SEPERATOR + ERRORS_ROOT;
+    public static final String BLOBSTORE_SUBTREE = ZK_SEPERATOR + BLOBSTORE_ROOT;
+    public static final String BLOBSTORE_MAX_KEY_SEQUENCE_NUMBER_SUBTREE = ZK_SEPERATOR + BLOBSTORE_MAX_KEY_SEQUENCE_NUMBER_ROOT;
+    public static final String NIMBUSES_SUBTREE = ZK_SEPERATOR + NIMBUSES_ROOT;
+    public static final String CREDENTIALS_SUBTREE = ZK_SEPERATOR + CREDENTIALS_ROOT;
+    public static final String LOGCONFIG_SUBTREE = ZK_SEPERATOR + LOGCONFIG_ROOT;
+    public static final String PROFILERCONFIG_SUBTREE = ZK_SEPERATOR + PROFILERCONFIG_ROOT;
+
+    // A singleton instance allows us to mock delegated static methods in our
+    // tests by subclassing.
+    private static final ClusterUtils INSTANCE = new ClusterUtils();
+    private static ClusterUtils _instance = INSTANCE;
+
+    /**
+     * Provide an instance of this class for delegates to use. To mock out delegated methods, provide an instance of a subclass that overrides the
+     * implementation of the delegated method.
+     *
+     * @param u a Cluster instance
+     */
+    public static void setInstance(ClusterUtils u) {
+        _instance = u;
+    }
+
+    /**
+     * Resets the singleton instance to the default. This is helpful to reset the class to its original functionality when mocking is no longer desired.
+     */
+    public static void resetInstance() {
+        _instance = INSTANCE;
+    }
+
+    public static List<ACL> mkTopoOnlyAcls(Map topoConf) throws NoSuchAlgorithmException {
+        List<ACL> aclList = null;
+        String payload = (String) topoConf.get(Config.STORM_ZOOKEEPER_TOPOLOGY_AUTH_PAYLOAD);
+        if (Utils.isZkAuthenticationConfiguredTopology(topoConf)) {
+            aclList = new ArrayList<>();
+            ACL acl1 = ZooDefs.Ids.CREATOR_ALL_ACL.get(0);
+            aclList.add(acl1);
+            ACL acl2 = new ACL(ZooDefs.Perms.READ, new Id("digest", DigestAuthenticationProvider.generateDigest(payload)));
+            aclList.add(acl2);
+        }
+        return aclList;
+    }
+
+    public static String supervisorPath(String id) {
+        return SUPERVISORS_SUBTREE + ZK_SEPERATOR + id;
+    }
+
+    public static String assignmentPath(String id) {
+        return ASSIGNMENTS_SUBTREE + ZK_SEPERATOR + id;
+    }
+
+    public static String blobstorePath(String key) {
+        return BLOBSTORE_SUBTREE + ZK_SEPERATOR + key;
+    }
+
+    public static String blobstoreMaxKeySequenceNumberPath(String key) {
+        return BLOBSTORE_MAX_KEY_SEQUENCE_NUMBER_SUBTREE + ZK_SEPERATOR + key;
+    }
+
+    public static String nimbusPath(String id) {
+        return NIMBUSES_SUBTREE + ZK_SEPERATOR + id;
+    }
+
+    public static String stormPath(String id) {
+        return STORMS_SUBTREE + ZK_SEPERATOR + id;
+    }
+
+    public static String workerbeatStormRoot(String stormId) {
+        return WORKERBEATS_SUBTREE + ZK_SEPERATOR + stormId;
+    }
+
+    public static String workerbeatPath(String stormId, String node, Long port) {
+        return workerbeatStormRoot(stormId) + ZK_SEPERATOR + node + "-" + port;
+    }
+
+    public static String backpressureStormRoot(String stormId) {
+        return BACKPRESSURE_SUBTREE + ZK_SEPERATOR + stormId;
+    }
+
+    public static String backpressurePath(String stormId, String node, Long port) {
+        return backpressureStormRoot(stormId) + ZK_SEPERATOR + node + "-" + port;
+    }
+
+    public static String errorStormRoot(String stormId) {
+        return ERRORS_SUBTREE + ZK_SEPERATOR + stormId;
+    }
+
+    public static String errorPath(String stormId, String componentId) {
+        try {
+            return errorStormRoot(stormId) + ZK_SEPERATOR + URLEncoder.encode(componentId, "UTF-8");
+        } catch (UnsupportedEncodingException e) {
+            throw Utils.wrapInRuntime(e);
+        }
+    }
+
+    public static String lastErrorPath(String stormId, String componentId) {
+        return errorPath(stormId, componentId) + "-last-error";
+    }
+
+    public static String credentialsPath(String stormId) {
+        return CREDENTIALS_SUBTREE + ZK_SEPERATOR + stormId;
+    }
+
+    public static String logConfigPath(String stormId) {
+        return LOGCONFIG_SUBTREE + ZK_SEPERATOR + stormId;
+    }
+
+    public static String profilerConfigPath(String stormId) {
+        return PROFILERCONFIG_SUBTREE + ZK_SEPERATOR + stormId;
+    }
+
+    public static String profilerConfigPath(String stormId, String host, Long port, ProfileAction requestType) {
+        return profilerConfigPath(stormId) + ZK_SEPERATOR + host + "_" + port + "_" + requestType;
+    }
+
+    public static <T> T maybeDeserialize(byte[] serialized, Class<T> clazz) {
+        if (serialized != null) {
+            return Utils.deserialize(serialized, clazz);
+        }
+        return null;
+    }
+
+    /**
+     * Ensures that we only return heartbeats for executors assigned to this worker
+     * @param executors
+     * @param workerHeartbeat
+     * @return
+     */
+    public static Map<ExecutorInfo, ExecutorBeat> convertExecutorBeats(List<ExecutorInfo> executors, ClusterWorkerHeartbeat workerHeartbeat) {
+        Map<ExecutorInfo, ExecutorBeat> executorWhb = new HashMap<>();
+        Map<ExecutorInfo, ExecutorStats> executorStatsMap = workerHeartbeat.get_executor_stats();
+        for (ExecutorInfo executor : executors) {
+            if (executorStatsMap.containsKey(executor)) {
+                int time = workerHeartbeat.get_time_secs();
+                int uptime = workerHeartbeat.get_uptime_secs();
+                ExecutorStats executorStats = workerHeartbeat.get_executor_stats().get(executor);
+                ExecutorBeat executorBeat = new ExecutorBeat(time, uptime, executorStats);
+                executorWhb.put(executor, executorBeat);
+            }
+        }
+        return executorWhb;
+    }
+
+    public IStormClusterState mkStormClusterStateImpl(Object stateStorage, List<ACL> acls, ClusterStateContext context) throws Exception {
+        if (stateStorage instanceof IStateStorage) {
+            return new StormClusterStateImpl((IStateStorage) stateStorage, acls, context, false);
+        } else {
+            IStateStorage Storage = _instance.mkStateStorageImpl((Map) stateStorage, (Map) stateStorage, acls, context);
+            return new StormClusterStateImpl(Storage, acls, context, true);
+        }
+    }
+
+    public IStateStorage mkStateStorageImpl(Map config, Map auth_conf, List<ACL> acls, ClusterStateContext context) throws Exception {
+        String className = null;
+        IStateStorage stateStorage = null;
+        if (config.get(Config.STORM_CLUSTER_STATE_STORE) != null) {
+            className = (String) config.get(Config.STORM_CLUSTER_STATE_STORE);
+        } else {
+            className = "org.apache.storm.cluster.ZKStateStorageFactory";
+        }
+        Class clazz = Class.forName(className);
+        StateStorageFactory storageFactory = (StateStorageFactory) clazz.newInstance();
+        stateStorage = storageFactory.mkStore(config, auth_conf, acls, context);
+        return stateStorage;
+    }
+
+    public static IStateStorage mkStateStorage(Map config, Map auth_conf, List<ACL> acls, ClusterStateContext context) throws Exception {
+        return _instance.mkStateStorageImpl(config, auth_conf, acls, context);
+    }
+
+    public static IStormClusterState mkStormClusterState(Object StateStorage, List<ACL> acls, ClusterStateContext context) throws Exception {
+        return _instance.mkStormClusterStateImpl(StateStorage, acls, context);
+    }
+
+    public static String stringifyError(Throwable error) {
+        StringWriter result = new StringWriter();
+        PrintWriter printWriter = new PrintWriter(result);
+        error.printStackTrace(printWriter);
+        return result.toString();
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/4de339a8/storm-client/src/jvm/org/apache/storm/cluster/ConnectionState.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/cluster/ConnectionState.java b/storm-client/src/jvm/org/apache/storm/cluster/ConnectionState.java
new file mode 100644
index 0000000..ec5e97d
--- /dev/null
+++ b/storm-client/src/jvm/org/apache/storm/cluster/ConnectionState.java
@@ -0,0 +1,24 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.cluster;
+
+public enum ConnectionState {
+    CONNECTED,
+    RECONNECTED,
+    LOST
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/4de339a8/storm-client/src/jvm/org/apache/storm/cluster/DaemonType.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/cluster/DaemonType.java b/storm-client/src/jvm/org/apache/storm/cluster/DaemonType.java
new file mode 100644
index 0000000..4589be3
--- /dev/null
+++ b/storm-client/src/jvm/org/apache/storm/cluster/DaemonType.java
@@ -0,0 +1,27 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.cluster;
+
+public enum DaemonType {
+    SUPERVISOR,
+    NIMBUS,
+    WORKER,
+    PACEMAKER,
+    UNKNOWN
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/4de339a8/storm-client/src/jvm/org/apache/storm/cluster/ExecutorBeat.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/cluster/ExecutorBeat.java b/storm-client/src/jvm/org/apache/storm/cluster/ExecutorBeat.java
new file mode 100644
index 0000000..b32615e
--- /dev/null
+++ b/storm-client/src/jvm/org/apache/storm/cluster/ExecutorBeat.java
@@ -0,0 +1,44 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.cluster;
+
+import org.apache.storm.generated.ExecutorStats;
+
+public class ExecutorBeat {
+    private final int timeSecs;
+    private final int uptime;
+    private final ExecutorStats stats;
+
+    public ExecutorBeat(int timeSecs, int uptime, ExecutorStats stats) {
+        this.timeSecs = timeSecs;
+        this.uptime = uptime;
+        this.stats = stats;
+    }
+
+    public int getTimeSecs() {
+        return timeSecs;
+    }
+
+    public int getUptime() {
+        return uptime;
+    }
+
+    public ExecutorStats getStats() {
+        return stats;
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/4de339a8/storm-client/src/jvm/org/apache/storm/cluster/IStateStorage.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/cluster/IStateStorage.java b/storm-client/src/jvm/org/apache/storm/cluster/IStateStorage.java
new file mode 100644
index 0000000..aa731ff
--- /dev/null
+++ b/storm-client/src/jvm/org/apache/storm/cluster/IStateStorage.java
@@ -0,0 +1,219 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.cluster;
+
+import java.util.List;
+import java.util.Map;
+
+import org.apache.curator.framework.state.ConnectionStateListener;
+import org.apache.storm.callback.ZKStateChangedCallback;
+import org.apache.zookeeper.data.ACL;
+
+/**
+ * StateStorage provides the API for the pluggable state store used by the
+ * Storm daemons. Data is stored in path/value format, and the store supports
+ * listing sub-paths at a given path.
+ * All data should be available across all nodes with eventual consistency.
+ *
+ * IMPORTANT NOTE:
+ * Heartbeats have different api calls used to interact with them. The root
+ * path (/) may or may not be the same as the root path for the other api calls.
+ *
+ * For example, performing these two calls:
+ *     set_data("/path", data, acls);
+ *     void set_worker_hb("/path", heartbeat, acls);
+ * may or may not cause a collision in "/path".
+ * Never use the same paths with the *_hb* methods as you do with the others.
+ */
+public interface IStateStorage {
+    
+    /**
+     * Registers a callback function that gets called when CuratorEvents happen.
+     * @param callback is a clojure IFn that accepts the type - translated to
+     * clojure keyword as in zookeeper - and the path: (callback type path)
+     * @return is an id that can be passed to unregister(...) to unregister the
+     * callback.
+     */
+    String register(ZKStateChangedCallback callback);
+
+    /**
+     * Unregisters a callback function that was registered with register(...).
+     * @param id is the String id that was returned from register(...).
+     */
+    void unregister(String id);
+
+    /**
+     * Path will be appended with a monotonically increasing integer, a new node
+     * will be created there, and data will be put at that node.
+     * @param path The path that the monotonically increasing integer suffix will
+     * be added to.
+     * @param data The data that will be written at the suffixed path's node.
+     * @param acls The acls to apply to the path. May be null.
+     * @return The path with the integer suffix appended.
+     */
+    String create_sequential(String path, byte[] data, List<ACL> acls);
+
+    /**
+     * Creates nodes for path and all its parents. Path elements are separated by
+     * a "/", as in *nix filesystem notation. Equivalent to mkdir -p in *nix.
+     * @param path The path to create, along with all its parents.
+     * @param acls The acls to apply to the path. May be null.
+     * @return path
+     */
+    void mkdirs(String path, List<ACL> acls);
+
+    /**
+     * Deletes the node at a given path, and any child nodes that may exist.
+     * @param path The path to delete
+     */
+    void delete_node(String path);
+
+    /**
+     * Creates an ephemeral node at path. Ephemeral nodes are destroyed
+     * by the store when the client disconnects.
+     * @param path The path where a node will be created.
+     * @param data The data to be written at the node.
+     * @param acls The acls to apply to the path. May be null.
+     */
+    void set_ephemeral_node(String path, byte[] data, List<ACL> acls);
+
+    /**
+     * Gets the 'version' of the node at a path. Optionally sets a watch
+     * on that node. The version should increase whenever a write happens.
+     * @param path The path to get the version of.
+     * @param watch Whether or not to set a watch on the path. Watched paths
+     * emit events which are consumed by functions registered with the
+     * register method. Very useful for catching updates to nodes.
+     * @return The integer version of this node.
+     */
+    Integer get_version(String path, boolean watch) throws Exception;
+
+    /**
+     * Check if a node exists and optionally set a watch on the path.
+     * @param path The path to check for the existence of a node.
+     * @param watch Whether or not to set a watch on the path. Watched paths
+     * emit events which are consumed by functions registered with the
+     * register method. Very useful for catching updates to nodes.
+     * @return Whether or not a node exists at path.
+     */
+    boolean node_exists(String path, boolean watch);
+
+    /**
+     * Get a list of paths of all the child nodes which exist immediately
+     * under path.
+     * @param path The path to look under
+     * @param watch Whether or not to set a watch on the path. Watched paths
+     * emit events which are consumed by functions registered with the
+     * register method. Very useful for catching updates to nodes.
+     * @return list of string paths under path.
+     */
+    List<String> get_children(String path, boolean watch);
+
+    /**
+     * Close the connection to the data store.
+     */
+    void close();
+
+    /**
+     * Set the value of the node at path to data.
+     * @param path The path whose node we want to set.
+     * @param data The data to put in the node.
+     * @param acls The acls to apply to the path. May be null.
+     */
+    void set_data(String path, byte[] data, List<ACL> acls);
+
+    /**
+     * Get the data from the node at path
+     * @param path The path to look under
+     * @param watch Whether or not to set a watch on the path. Watched paths
+     * emit events which are consumed by functions registered with the
+     * register method. Very useful for catching updates to nodes.
+     * @return The data at the node.
+     */
+    byte[] get_data(String path, boolean watch);
+
+    /**
+     * Get the data at the node along with its version. Data is returned
+     * in an Map with the keys data and version.
+     * @param path The path to look under
+     * @param watch Whether or not to set a watch on the path. Watched paths
+     * emit events which are consumed by functions registered with the
+     * register method. Very useful for catching updates to nodes.
+     * @return the data with a version
+     */
+    VersionedData<byte[]> get_data_with_version(String path, boolean watch);
+
+    /**
+     * Write a worker heartbeat at the path.
+     * @param path The path whose node we want to set.
+     * @param data The data to put in the node.
+     * @param acls The acls to apply to the path. May be null.
+     */
+    void set_worker_hb(String path, byte[] data, List<ACL> acls);
+
+    /**
+     * Get the heartbeat from the node at path
+     * @param path The path to look under
+     * @param watch Whether or not to set a watch on the path. Watched paths
+     * emit events which are consumed by functions registered with the
+     * register method. Very useful for catching updates to nodes.
+     * @return The heartbeat at the node.
+     */
+    byte[] get_worker_hb(String path, boolean watch);
+
+    /**
+     * Get a list of paths of all the child nodes which exist immediately
+     * under path. This is similar to get_children, but must be used for
+     * any nodes
+     * @param path The path to look under
+     * @param watch Whether or not to set a watch on the path. Watched paths
+     * emit events which are consumed by functions registered with the
+     * register method. Very useful for catching updates to nodes.
+     * @return list of string paths under path.
+     */
+    List<String> get_worker_hb_children(String path, boolean watch);
+
+    /**
+     * Deletes the heartbeat at a given path, and any child nodes that may exist.
+     * @param path The path to delete.
+     */
+    void delete_worker_hb(String path);
+
+    /**
+     * Add a StateStorageListener to the connection.
+     * @param listener A StateStorageListener to handle changing cluster state
+     * events.
+     */
+    void add_listener(final ConnectionStateListener listener);
+
+    /**
+     * Force consistency on a path. Any writes committed on the path before
+     * this call will be completely propagated when it returns.
+     * @param path The path to synchronize.
+     */
+    void sync_path(String path);
+
+    /**
+     * Allows us to delete the znodes within /storm/blobstore/key_name
+     * whose znodes start with the corresponding nimbusHostPortInfo
+     * @param path /storm/blobstore/key_name
+     * @param nimbusHostPortInfo Contains the host port information of
+     * a nimbus node.
+     */
+    void delete_node_blobstore(String path, String nimbusHostPortInfo);
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/4de339a8/storm-client/src/jvm/org/apache/storm/cluster/IStormClusterState.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/cluster/IStormClusterState.java b/storm-client/src/jvm/org/apache/storm/cluster/IStormClusterState.java
new file mode 100644
index 0000000..704c9e5
--- /dev/null
+++ b/storm-client/src/jvm/org/apache/storm/cluster/IStormClusterState.java
@@ -0,0 +1,198 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.cluster;
+
+import java.security.NoSuchAlgorithmException;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+
+import org.apache.storm.generated.Assignment;
+import org.apache.storm.generated.ClusterWorkerHeartbeat;
+import org.apache.storm.generated.Credentials;
+import org.apache.storm.generated.ErrorInfo;
+import org.apache.storm.generated.ExecutorInfo;
+import org.apache.storm.generated.LogConfig;
+import org.apache.storm.generated.NimbusSummary;
+import org.apache.storm.generated.NodeInfo;
+import org.apache.storm.generated.ProfileRequest;
+import org.apache.storm.generated.StormBase;
+import org.apache.storm.generated.SupervisorInfo;
+import org.apache.storm.nimbus.NimbusInfo;
+
+public interface IStormClusterState {
+    public List<String> assignments(Runnable callback);
+
+    public Assignment assignmentInfo(String stormId, Runnable callback);
+
+    public VersionedData<Assignment> assignmentInfoWithVersion(String stormId, Runnable callback);
+
+    public Integer assignmentVersion(String stormId, Runnable callback) throws Exception;
+
+    public List<String> blobstoreInfo(String blobKey);
+
+    public List<NimbusSummary> nimbuses();
+
+    public void addNimbusHost(String nimbusId, NimbusSummary nimbusSummary);
+
+    public List<String> activeStorms();
+
+    /**
+     * Get a storm base for a topology
+     * @param stormId the id of the topology
+     * @param callback something to call if the data changes (best effort)
+     * @return the StormBase or null if it is not alive.
+     */
+    public StormBase stormBase(String stormId, Runnable callback);
+
+    public ClusterWorkerHeartbeat getWorkerHeartbeat(String stormId, String node, Long port);
+
+    public List<ProfileRequest> getWorkerProfileRequests(String stormId, NodeInfo nodeInfo);
+
+    public List<ProfileRequest> getTopologyProfileRequests(String stormId);
+
+    public void setWorkerProfileRequest(String stormId, ProfileRequest profileRequest);
+
+    public void deleteTopologyProfileRequests(String stormId, ProfileRequest profileRequest);
+
+    public Map<ExecutorInfo, ExecutorBeat> executorBeats(String stormId, Map<List<Long>, NodeInfo> executorNodePort);
+
+    public List<String> supervisors(Runnable callback);
+
+    public SupervisorInfo supervisorInfo(String supervisorId); // returns nil if doesn't exist
+
+    public void setupHeatbeats(String stormId);
+
+    public void teardownHeartbeats(String stormId);
+
+    public void teardownTopologyErrors(String stormId);
+
+    public List<String> heartbeatStorms();
+
+    public List<String> errorTopologies();
+
+    public List<String> backpressureTopologies();
+
+    public void setTopologyLogConfig(String stormId, LogConfig logConfig);
+
+    public LogConfig topologyLogConfig(String stormId, Runnable cb);
+
+    public void workerHeartbeat(String stormId, String node, Long port, ClusterWorkerHeartbeat info);
+
+    public void removeWorkerHeartbeat(String stormId, String node, Long port);
+
+    public void supervisorHeartbeat(String supervisorId, SupervisorInfo info);
+
+    public void workerBackpressure(String stormId, String node, Long port, boolean on);
+
+    public boolean topologyBackpressure(String stormId, Runnable callback);
+
+    public void setupBackpressure(String stormId);
+
+    public void removeBackpressure(String stormId);
+
+    public void removeWorkerBackpressure(String stormId, String node, Long port);
+
+    public void activateStorm(String stormId, StormBase stormBase);
+
+    public void updateStorm(String stormId, StormBase newElems);
+
+    public void removeStormBase(String stormId);
+
+    public void setAssignment(String stormId, Assignment info);
+
+    public void setupBlobstore(String key, NimbusInfo nimbusInfo, Integer versionInfo);
+
+    public List<String> activeKeys();
+
+    public List<String> blobstore(Runnable callback);
+
+    public void removeStorm(String stormId);
+
+    public void removeBlobstoreKey(String blobKey);
+
+    public void removeKeyVersion(String blobKey);
+
+    public void reportError(String stormId, String componentId, String node, Long port, Throwable error);
+
+    public List<ErrorInfo> errors(String stormId, String componentId);
+
+    public ErrorInfo lastError(String stormId, String componentId);
+
+    public void setCredentials(String stormId, Credentials creds, Map<String, Object> topoConf) throws NoSuchAlgorithmException;
+
+    public Credentials credentials(String stormId, Runnable callback);
+
+    public void disconnect();
+    
+    /**
+     * @return All of the supervisors with the ID as the key
+     */
+    default Map<String, SupervisorInfo> allSupervisorInfo() {
+        return allSupervisorInfo(null);
+    }
+
+    /**
+     * @param callback be alerted if the list of supervisors change
+     * @return All of the supervisors with the ID as the key
+     */
+    default Map<String, SupervisorInfo> allSupervisorInfo(Runnable callback) {
+        Map<String, SupervisorInfo> ret = new HashMap<>();
+        for (String id: supervisors(callback)) {
+            ret.put(id, supervisorInfo(id));
+        }
+        return ret;
+    }
+    
+    /**
+     * Get a topology ID from the name of a topology
+     * @param topologyName the name of the topology to look for
+     * @return the id of the topology or null if it is not alive.
+     */
+    default Optional<String> getTopoId(final String topologyName) {
+        String ret = null;
+        for (String topoId: activeStorms()) {
+            String name = stormBase(topoId, null).get_name();
+            if (topologyName.equals(name)) {
+                ret = topoId;
+                break;
+            }
+        }
+        return Optional.ofNullable(ret);
+    }
+    
+    default Map<String, Assignment> topologyAssignments() {
+        Map<String, Assignment> ret = new HashMap<>();
+        for (String topoId: assignments(null)) {
+            ret.put(topoId, assignmentInfo(topoId, null));
+        }
+        return ret;
+    }
+    
+    default Map<String, StormBase> topologyBases() {
+        Map<String, StormBase> stormBases = new HashMap<>();
+        for (String topologyId : activeStorms()) {
+            StormBase base = stormBase(topologyId, null);
+            if (base != null) { //rece condition with delete
+                stormBases.put(topologyId, base);
+            }
+        }
+        return stormBases;
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/4de339a8/storm-client/src/jvm/org/apache/storm/cluster/PaceMakerStateStorage.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/cluster/PaceMakerStateStorage.java b/storm-client/src/jvm/org/apache/storm/cluster/PaceMakerStateStorage.java
new file mode 100644
index 0000000..9295347
--- /dev/null
+++ b/storm-client/src/jvm/org/apache/storm/cluster/PaceMakerStateStorage.java
@@ -0,0 +1,265 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.cluster;
+
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import org.apache.curator.framework.state.ConnectionStateListener;
+import org.apache.storm.callback.ZKStateChangedCallback;
+import org.apache.storm.generated.*;
+import org.apache.storm.pacemaker.PacemakerClientPool;
+import org.apache.storm.utils.Utils;
+import org.apache.zookeeper.data.ACL;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class PaceMakerStateStorage implements IStateStorage {
+
+    private static Logger LOG = LoggerFactory.getLogger(PaceMakerStateStorage.class);
+
+    private PacemakerClientPool pacemakerClientPool;
+    private IStateStorage stateStorage;
+    private static final int maxRetries = 10;
+
+    public PaceMakerStateStorage(PacemakerClientPool pacemakerClientPool, IStateStorage stateStorage) throws Exception {
+        this.pacemakerClientPool = pacemakerClientPool;
+        this.stateStorage = stateStorage;
+    }
+
+    @Override
+    public String register(ZKStateChangedCallback callback) {
+        return stateStorage.register(callback);
+    }
+
+    @Override
+    public void unregister(String id) {
+        stateStorage.unregister(id);
+    }
+
+    @Override
+    public String create_sequential(String path, byte[] data, List<ACL> acls) {
+        return stateStorage.create_sequential(path, data, acls);
+    }
+
+    @Override
+    public void mkdirs(String path, List<ACL> acls) {
+        stateStorage.mkdirs(path, acls);
+    }
+
+    @Override
+    public void delete_node(String path) {
+        stateStorage.delete_node(path);
+    }
+
+    @Override
+    public void set_ephemeral_node(String path, byte[] data, List<ACL> acls) {
+        stateStorage.set_ephemeral_node(path, data, acls);
+    }
+
+    @Override
+    public Integer get_version(String path, boolean watch) throws Exception {
+        return stateStorage.get_version(path, watch);
+    }
+
+    @Override
+    public boolean node_exists(String path, boolean watch) {
+        return stateStorage.node_exists(path, watch);
+    }
+
+    @Override
+    public List<String> get_children(String path, boolean watch) {
+        return stateStorage.get_children(path, watch);
+    }
+
+    @Override
+    public void close() {
+        stateStorage.close();
+        pacemakerClientPool.close();
+    }
+
+    @Override
+    public void set_data(String path, byte[] data, List<ACL> acls) {
+        stateStorage.set_data(path, data, acls);
+    }
+
+    @Override
+    public byte[] get_data(String path, boolean watch) {
+        return stateStorage.get_data(path, watch);
+    }
+
+    @Override
+    public VersionedData<byte[]> get_data_with_version(String path, boolean watch) {
+        return stateStorage.get_data_with_version(path, watch);
+    }
+
+    @Override
+    public void set_worker_hb(String path, byte[] data, List<ACL> acls) {
+        int retry = maxRetries;
+        while (true) {
+            try {
+                HBPulse hbPulse = new HBPulse();
+                hbPulse.set_id(path);
+                hbPulse.set_details(data);
+                HBMessage message = new HBMessage(HBServerMessageType.SEND_PULSE, HBMessageData.pulse(hbPulse));
+                HBMessage response = pacemakerClientPool.send(message);
+                if (response.get_type() != HBServerMessageType.SEND_PULSE_RESPONSE) {
+                    throw new HBExecutionException("Invalid Response Type");
+                }
+                LOG.debug("Successful set_worker_hb");
+                break;
+            } catch (Exception e) {
+                if (retry <= 0) {
+                    throw Utils.wrapInRuntime(e);
+                }
+                retry--;
+                LOG.error("{} Failed to set_worker_hb. Will make {} more attempts.", e.getMessage(), retry);
+            }
+        }
+    }
+
+    @Override
+    public byte[] get_worker_hb(String path, boolean watch) {
+        int retry = maxRetries;
+        while (true) {
+            try {
+                byte[] ret = null;
+                int latest_time_secs = 0;
+                boolean got_response = false;
+
+                HBMessage message = new HBMessage(HBServerMessageType.GET_PULSE, HBMessageData.path(path));
+                List<HBMessage> responses = pacemakerClientPool.sendAll(message);
+                for(HBMessage response : responses) {
+                    if (response.get_type() != HBServerMessageType.GET_PULSE_RESPONSE) {
+                        LOG.error("get_worker_hb: Invalid Response Type");
+                        continue;
+                    }
+                    // We got at least one GET_PULSE_RESPONSE message.
+                    got_response = true;
+                    byte[] details = response.get_data().get_pulse().get_details();
+                    if(details == null) {
+                        continue;
+                    }
+                    ClusterWorkerHeartbeat cwh = Utils.deserialize(details, ClusterWorkerHeartbeat.class);
+                    if(cwh != null && cwh.get_time_secs() > latest_time_secs) {
+                        latest_time_secs = cwh.get_time_secs();
+                        ret = details;
+                    }
+                }
+                if(!got_response) {
+                    throw new HBExecutionException("Failed to get a response.");
+                }
+                return ret;
+            } catch (Exception e) {
+                if (retry <= 0) {
+                    throw Utils.wrapInRuntime(e);
+                }
+                retry--;
+                LOG.error("{} Failed to get_worker_hb. Will make {} more attempts.", e.getMessage(), retry);
+            }
+        }
+    }
+
+    @Override
+    public List<String> get_worker_hb_children(String path, boolean watch) {
+        int retry = maxRetries;
+        while (true) {
+            try {
+                HashSet<String> retSet = new HashSet<>();
+
+                HBMessage message = new HBMessage(HBServerMessageType.GET_ALL_NODES_FOR_PATH, HBMessageData.path(path));
+                List<HBMessage> responses = pacemakerClientPool.sendAll(message);
+                for(HBMessage response : responses) {
+                    if (response.get_type() != HBServerMessageType.GET_ALL_NODES_FOR_PATH_RESPONSE) {
+                        LOG.error("get_worker_hb_children: Invalid Response Type");
+                        continue;
+                    }
+                    if(response.get_data().get_nodes().get_pulseIds() != null) {
+                        retSet.addAll(response.get_data().get_nodes().get_pulseIds());
+                    }
+                }
+
+                LOG.debug("Successful get_worker_hb");
+                return new ArrayList<>(retSet);
+            } catch (Exception e) {
+                if (retry <= 0) {
+                    throw Utils.wrapInRuntime(e);
+                }
+                retry--;
+                LOG.error("{} Failed to get_worker_hb_children. Will make {} more attempts.", e.getMessage(), retry);
+            }
+        }
+    }
+
+    @Override
+    public void delete_worker_hb(String path) {
+        int retry = maxRetries;
+        boolean someSucceeded;
+        while (true) {
+            someSucceeded = false;
+            try {
+                HBMessage message = new HBMessage(HBServerMessageType.DELETE_PATH, HBMessageData.path(path));
+                List<HBMessage> responses = pacemakerClientPool.sendAll(message);
+                boolean allSucceeded = true;
+                for(HBMessage response : responses) {
+                    if (response.get_type() != HBServerMessageType.DELETE_PATH_RESPONSE) {
+                        LOG.debug("Failed to delete heartbeat {}", response);
+                        allSucceeded = false;
+                    }
+                    else {
+                        someSucceeded = true;
+                    }
+                }
+                if(allSucceeded) {
+                    break;
+                }
+                else {
+                    throw new HBExecutionException("Failed to delete from all pacemakers.");
+                }
+            } catch (Exception e) {
+                if (retry <= 0) {
+                    if(someSucceeded) {
+                        LOG.warn("Unable to delete_worker_hb from every pacemaker.");
+                        break;
+                    }
+                    else {
+                        LOG.error("Unable to delete_worker_hb from any pacemaker.");
+                        throw Utils.wrapInRuntime(e);
+                    }
+                }
+                retry--;
+                LOG.debug("{} Failed to delete_worker_hb. Will make {} more attempts.", e.getMessage(), retry);
+            }
+        }
+    }
+
+    @Override
+    public void add_listener(ConnectionStateListener listener) {
+        stateStorage.add_listener(listener);
+    }
+
+    @Override
+    public void sync_path(String path) {
+        stateStorage.sync_path(path);
+    }
+
+    @Override
+    public void delete_node_blobstore(String path, String nimbusHostPortInfo) {
+        stateStorage.delete_node_blobstore(path, nimbusHostPortInfo);
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/4de339a8/storm-client/src/jvm/org/apache/storm/cluster/PaceMakerStateStorageFactory.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/cluster/PaceMakerStateStorageFactory.java b/storm-client/src/jvm/org/apache/storm/cluster/PaceMakerStateStorageFactory.java
new file mode 100644
index 0000000..3574506
--- /dev/null
+++ b/storm-client/src/jvm/org/apache/storm/cluster/PaceMakerStateStorageFactory.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.cluster;
+
+import java.util.List;
+import java.util.Map;
+import org.apache.storm.pacemaker.PacemakerClientPool;
+import org.apache.storm.utils.Utils;
+import org.apache.zookeeper.data.ACL;
+
+public class PaceMakerStateStorageFactory implements StateStorageFactory {
+    @Override
+    public IStateStorage mkStore(Map config, Map auth_conf, List<ACL> acls, ClusterStateContext context) {
+        try {
+            ZKStateStorageFactory zkfact = new ZKStateStorageFactory();
+            IStateStorage zkState = zkfact.mkStore(config, auth_conf, acls, context);
+            return new PaceMakerStateStorage(new PacemakerClientPool(config), zkState);
+        } catch (Exception e) {
+            throw Utils.wrapInRuntime(e);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/4de339a8/storm-client/src/jvm/org/apache/storm/cluster/StateStorageFactory.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/cluster/StateStorageFactory.java b/storm-client/src/jvm/org/apache/storm/cluster/StateStorageFactory.java
new file mode 100644
index 0000000..0929750
--- /dev/null
+++ b/storm-client/src/jvm/org/apache/storm/cluster/StateStorageFactory.java
@@ -0,0 +1,28 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.cluster;
+
+import java.util.List;
+import java.util.Map;
+
+import org.apache.zookeeper.data.ACL;
+
+public interface StateStorageFactory {
+
+    IStateStorage mkStore(Map config, Map auth_conf, List<ACL> acls, ClusterStateContext context);
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/4de339a8/storm-client/src/jvm/org/apache/storm/cluster/StormClusterStateImpl.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/cluster/StormClusterStateImpl.java b/storm-client/src/jvm/org/apache/storm/cluster/StormClusterStateImpl.java
new file mode 100644
index 0000000..51caad9
--- /dev/null
+++ b/storm-client/src/jvm/org/apache/storm/cluster/StormClusterStateImpl.java
@@ -0,0 +1,735 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.cluster;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.state.*;
+import org.apache.curator.framework.state.ConnectionState;
+import org.apache.storm.callback.ZKStateChangedCallback;
+import org.apache.storm.generated.*;
+import org.apache.storm.nimbus.NimbusInfo;
+import org.apache.storm.utils.Utils;
+import org.apache.storm.utils.Time;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.data.ACL;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.security.NoSuchAlgorithmException;
+import java.util.*;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicReference;
+
+public class StormClusterStateImpl implements IStormClusterState {
+
+    private static Logger LOG = LoggerFactory.getLogger(StormClusterStateImpl.class);
+
+    private IStateStorage stateStorage;
+
+    private ConcurrentHashMap<String, Runnable> assignmentInfoCallback;
+    private ConcurrentHashMap<String, Runnable> assignmentInfoWithVersionCallback;
+    private ConcurrentHashMap<String, Runnable> assignmentVersionCallback;
+    private AtomicReference<Runnable> supervisorsCallback;
+    // we want to reigister a topo directory getChildren callback for all workers of this dir
+    private ConcurrentHashMap<String, Runnable> backPressureCallback;
+    private AtomicReference<Runnable> assignmentsCallback;
+    private ConcurrentHashMap<String, Runnable> stormBaseCallback;
+    private AtomicReference<Runnable> blobstoreCallback;
+    private ConcurrentHashMap<String, Runnable> credentialsCallback;
+    private ConcurrentHashMap<String, Runnable> logConfigCallback;
+
+    private List<ACL> acls;
+    private String stateId;
+    private boolean solo;
+
+    public StormClusterStateImpl(IStateStorage StateStorage, List<ACL> acls, ClusterStateContext context, boolean solo) throws Exception {
+
+        this.stateStorage = StateStorage;
+        this.solo = solo;
+        this.acls = acls;
+
+        assignmentInfoCallback = new ConcurrentHashMap<>();
+        assignmentInfoWithVersionCallback = new ConcurrentHashMap<>();
+        assignmentVersionCallback = new ConcurrentHashMap<>();
+        supervisorsCallback = new AtomicReference<>();
+        backPressureCallback = new ConcurrentHashMap<>();
+        assignmentsCallback = new AtomicReference<>();
+        stormBaseCallback = new ConcurrentHashMap<>();
+        credentialsCallback = new ConcurrentHashMap<>();
+        logConfigCallback = new ConcurrentHashMap<>();
+        blobstoreCallback = new AtomicReference<>();
+
+        stateId = this.stateStorage.register(new ZKStateChangedCallback() {
+
+            public void changed(Watcher.Event.EventType type, String path) {
+                List<String> toks = tokenizePath(path);
+                int size = toks.size();
+                if (size >= 1) {
+                    String root = toks.get(0);
+                    if (root.equals(ClusterUtils.ASSIGNMENTS_ROOT)) {
+                        if (size == 1) {
+                            // set null and get the old value
+                            issueCallback(assignmentsCallback);
+                        } else {
+                            issueMapCallback(assignmentInfoCallback, toks.get(1));
+                            issueMapCallback(assignmentVersionCallback, toks.get(1));
+                            issueMapCallback(assignmentInfoWithVersionCallback, toks.get(1));
+                        }
+
+                    } else if (root.equals(ClusterUtils.SUPERVISORS_ROOT)) {
+                        issueCallback(supervisorsCallback);
+                    } else if (root.equals(ClusterUtils.BLOBSTORE_ROOT)) {
+                        issueCallback(blobstoreCallback);
+                    } else if (root.equals(ClusterUtils.STORMS_ROOT) && size > 1) {
+                        issueMapCallback(stormBaseCallback, toks.get(1));
+                    } else if (root.equals(ClusterUtils.CREDENTIALS_ROOT) && size > 1) {
+                        issueMapCallback(credentialsCallback, toks.get(1));
+                    } else if (root.equals(ClusterUtils.LOGCONFIG_ROOT) && size > 1) {
+                        issueMapCallback(logConfigCallback, toks.get(1));
+                    } else if (root.equals(ClusterUtils.BACKPRESSURE_ROOT) && size > 1) {
+                        issueMapCallback(backPressureCallback, toks.get(1));
+                    } else {
+                        LOG.error("{} Unknown callback for subtree {}", new RuntimeException("Unknown callback for this path"), path);
+                        Runtime.getRuntime().exit(30);
+                    }
+
+                }
+
+                return;
+            }
+
+        });
+
+        String[] pathlist = { ClusterUtils.ASSIGNMENTS_SUBTREE, 
+                              ClusterUtils.STORMS_SUBTREE, 
+                              ClusterUtils.SUPERVISORS_SUBTREE, 
+                              ClusterUtils.WORKERBEATS_SUBTREE,
+                              ClusterUtils.ERRORS_SUBTREE, 
+                              ClusterUtils.BLOBSTORE_SUBTREE, 
+                              ClusterUtils.NIMBUSES_SUBTREE, 
+                              ClusterUtils.LOGCONFIG_SUBTREE,
+                              ClusterUtils.BACKPRESSURE_SUBTREE };
+        for (String path : pathlist) {
+            this.stateStorage.mkdirs(path, acls);
+        }
+
+    }
+
+    protected void issueCallback(AtomicReference<Runnable> cb) {
+        Runnable callback = cb.getAndSet(null);
+        if (callback != null)
+            callback.run();
+    }
+
+    protected void issueMapCallback(ConcurrentHashMap<String, Runnable> callbackConcurrentHashMap, String key) {
+        Runnable callback = callbackConcurrentHashMap.remove(key);
+        if (callback != null)
+            callback.run();
+    }
+
+    @Override
+    public List<String> assignments(Runnable callback) {
+        if (callback != null) {
+            assignmentsCallback.set(callback);
+        }
+        return stateStorage.get_children(ClusterUtils.ASSIGNMENTS_SUBTREE, callback != null);
+    }
+
+    @Override
+    public Assignment assignmentInfo(String stormId, Runnable callback) {
+        if (callback != null) {
+            assignmentInfoCallback.put(stormId, callback);
+        }
+        byte[] serialized = stateStorage.get_data(ClusterUtils.assignmentPath(stormId), callback != null);
+        return ClusterUtils.maybeDeserialize(serialized, Assignment.class);
+    }
+
+    @Override
+    public VersionedData<Assignment> assignmentInfoWithVersion(String stormId, Runnable callback) {
+        if (callback != null) {
+            assignmentInfoWithVersionCallback.put(stormId, callback);
+        }
+        Assignment assignment = null;
+        Integer version = 0;
+        VersionedData<byte[]> dataWithVersion = stateStorage.get_data_with_version(ClusterUtils.assignmentPath(stormId), callback != null);
+        if (dataWithVersion != null) {
+            assignment = ClusterUtils.maybeDeserialize(dataWithVersion.getData(), Assignment.class);
+            version = dataWithVersion.getVersion();
+        }
+        return new VersionedData<Assignment>(version, assignment);
+    }
+
+    @Override
+    public Integer assignmentVersion(String stormId, Runnable callback) throws Exception {
+        if (callback != null) {
+            assignmentVersionCallback.put(stormId, callback);
+        }
+        return stateStorage.get_version(ClusterUtils.assignmentPath(stormId), callback != null);
+    }
+
+    // blobstore state
+    @Override
+    public List<String> blobstoreInfo(String blobKey) {
+        String path = ClusterUtils.blobstorePath(blobKey);
+        stateStorage.sync_path(path);
+        return stateStorage.get_children(path, false);
+    }
+
+    @Override
+    public List<NimbusSummary> nimbuses() {
+        List<NimbusSummary> nimbusSummaries = new ArrayList<>();
+        List<String> nimbusIds = stateStorage.get_children(ClusterUtils.NIMBUSES_SUBTREE, false);
+        for (String nimbusId : nimbusIds) {
+            byte[] serialized = stateStorage.get_data(ClusterUtils.nimbusPath(nimbusId), false);
+            // check for null which can exist because of a race condition in which nimbus nodes in zk may have been
+            // removed when connections are reconnected after getting children in the above line
+            if (serialized != null) {
+                NimbusSummary nimbusSummary = ClusterUtils.maybeDeserialize(serialized, NimbusSummary.class);
+                nimbusSummaries.add(nimbusSummary);
+            }
+        }
+        return nimbusSummaries;
+    }
+
+    @Override
+    public void addNimbusHost(final String nimbusId, final NimbusSummary nimbusSummary) {
+        // explicit delete for ephmeral node to ensure this session creates the entry.
+        stateStorage.delete_node(ClusterUtils.nimbusPath(nimbusId));
+        stateStorage.add_listener(new ConnectionStateListener() {
+            @Override
+            public void stateChanged(CuratorFramework curatorFramework, ConnectionState connectionState) {
+                LOG.info("Connection state listener invoked, zookeeper connection state has changed to {}", connectionState);
+                if (connectionState.equals(ConnectionState.RECONNECTED)) {
+                    LOG.info("Connection state has changed to reconnected so setting nimbuses entry one more time");
+                    // explicit delete for ephmeral node to ensure this session creates the entry.
+                    stateStorage.delete_node(ClusterUtils.nimbusPath(nimbusId));
+                    stateStorage.set_ephemeral_node(ClusterUtils.nimbusPath(nimbusId), Utils.serialize(nimbusSummary), acls);
+                }
+
+            }
+        });
+
+        stateStorage.set_ephemeral_node(ClusterUtils.nimbusPath(nimbusId), Utils.serialize(nimbusSummary), acls);
+    }
+
+    @Override
+    public List<String> activeStorms() {
+        return stateStorage.get_children(ClusterUtils.STORMS_SUBTREE, false);
+    }
+
+    @Override
+    public StormBase stormBase(String stormId, Runnable callback) {
+        if (callback != null) {
+            stormBaseCallback.put(stormId, callback);
+        }
+        return ClusterUtils.maybeDeserialize(stateStorage.get_data(ClusterUtils.stormPath(stormId), callback != null), StormBase.class);
+    }
+
+    @Override
+    public ClusterWorkerHeartbeat getWorkerHeartbeat(String stormId, String node, Long port) {
+        byte[] bytes = stateStorage.get_worker_hb(ClusterUtils.workerbeatPath(stormId, node, port), false);
+        return ClusterUtils.maybeDeserialize(bytes, ClusterWorkerHeartbeat.class);
+
+    }
+
+    @Override
+    public List<ProfileRequest> getWorkerProfileRequests(String stormId, NodeInfo nodeInfo) {
+        List<ProfileRequest> requests = new ArrayList<>();
+        List<ProfileRequest> profileRequests = getTopologyProfileRequests(stormId);
+        for (ProfileRequest profileRequest : profileRequests) {
+            NodeInfo nodeInfo1 = profileRequest.get_nodeInfo();
+            if (nodeInfo1.equals(nodeInfo))
+                requests.add(profileRequest);
+        }
+        return requests;
+    }
+
+    @Override
+    public List<ProfileRequest> getTopologyProfileRequests(String stormId) {
+        List<ProfileRequest> profileRequests = new ArrayList<>();
+        String path = ClusterUtils.profilerConfigPath(stormId);
+        if (stateStorage.node_exists(path, false)) {
+            List<String> strs = stateStorage.get_children(path, false);
+            for (String str : strs) {
+                String childPath = path + ClusterUtils.ZK_SEPERATOR + str;
+                byte[] raw = stateStorage.get_data(childPath, false);
+                ProfileRequest request = ClusterUtils.maybeDeserialize(raw, ProfileRequest.class);
+                if (request != null)
+                    profileRequests.add(request);
+            }
+        }
+        return profileRequests;
+    }
+
+    @Override
+    public void setWorkerProfileRequest(String stormId, ProfileRequest profileRequest) {
+        ProfileAction profileAction = profileRequest.get_action();
+        String host = profileRequest.get_nodeInfo().get_node();
+        Long port = profileRequest.get_nodeInfo().get_port_iterator().next();
+        String path = ClusterUtils.profilerConfigPath(stormId, host, port, profileAction);
+        stateStorage.set_data(path, Utils.serialize(profileRequest), acls);
+    }
+
+    @Override
+    public void deleteTopologyProfileRequests(String stormId, ProfileRequest profileRequest) {
+        ProfileAction profileAction = profileRequest.get_action();
+        String host = profileRequest.get_nodeInfo().get_node();
+        Long port = profileRequest.get_nodeInfo().get_port_iterator().next();
+        String path = ClusterUtils.profilerConfigPath(stormId, host, port, profileAction);
+        stateStorage.delete_node(path);
+    }
+
+    /**
+     * need to take executor->node+port in explicitly so that we don't run into a situation where a long dead worker with a skewed clock overrides all the
+     * timestamps. By only checking heartbeats with an assigned node+port, and only reading executors from that heartbeat that are actually assigned, we avoid
+     * situations like that
+     * 
+     * @param stormId
+     * @param executorNodePort
+     * @return
+     */
+    @Override
+    public Map<ExecutorInfo, ExecutorBeat> executorBeats(String stormId, Map<List<Long>, NodeInfo> executorNodePort) {
+        Map<ExecutorInfo, ExecutorBeat> executorWhbs = new HashMap<>();
+
+        Map<NodeInfo, List<List<Long>>> nodePortExecutors = Utils.reverseMap(executorNodePort);
+
+        for (Map.Entry<NodeInfo, List<List<Long>>> entry : nodePortExecutors.entrySet()) {
+
+            String node = entry.getKey().get_node();
+            Long port = entry.getKey().get_port_iterator().next();
+            ClusterWorkerHeartbeat whb = getWorkerHeartbeat(stormId, node, port);
+            List<ExecutorInfo> executorInfoList = new ArrayList<>();
+            for (List<Long> list : entry.getValue()) {
+                executorInfoList.add(new ExecutorInfo(list.get(0).intValue(), list.get(list.size() - 1).intValue()));
+            }
+            if (whb != null)
+                executorWhbs.putAll(ClusterUtils.convertExecutorBeats(executorInfoList, whb));
+        }
+        return executorWhbs;
+    }
+
+    @Override
+    public List<String> supervisors(Runnable callback) {
+        if (callback != null) {
+            supervisorsCallback.set(callback);
+        }
+        return stateStorage.get_children(ClusterUtils.SUPERVISORS_SUBTREE, callback != null);
+    }
+
+    @Override
+    public SupervisorInfo supervisorInfo(String supervisorId) {
+        String path = ClusterUtils.supervisorPath(supervisorId);
+        return ClusterUtils.maybeDeserialize(stateStorage.get_data(path, false), SupervisorInfo.class);
+    }
+
+    @Override
+    public void setupHeatbeats(String stormId) {
+        stateStorage.mkdirs(ClusterUtils.workerbeatStormRoot(stormId), acls);
+    }
+
+    @Override
+    public void teardownHeartbeats(String stormId) {
+        try {
+            stateStorage.delete_worker_hb(ClusterUtils.workerbeatStormRoot(stormId));
+        } catch (Exception e) {
+            if (Utils.exceptionCauseIsInstanceOf(KeeperException.class, e)) {
+                // do nothing
+                LOG.warn("Could not teardown heartbeats for {}.", stormId);
+            } else {
+                throw e;
+            }
+        }
+    }
+
+    @Override
+    public void teardownTopologyErrors(String stormId) {
+        try {
+            stateStorage.delete_node(ClusterUtils.errorStormRoot(stormId));
+        } catch (Exception e) {
+            if (Utils.exceptionCauseIsInstanceOf(KeeperException.class, e)) {
+                // do nothing
+                LOG.warn("Could not teardown errors for {}.", stormId);
+            } else {
+                throw e;
+            }
+        }
+    }
+
+    @Override
+    public List<String> heartbeatStorms() {
+        return stateStorage.get_worker_hb_children(ClusterUtils.WORKERBEATS_SUBTREE, false);
+    }
+
+    @Override
+    public List<String> errorTopologies() {
+        return stateStorage.get_children(ClusterUtils.ERRORS_SUBTREE, false);
+    }
+
+    @Override
+    public List<String> backpressureTopologies() {
+        return stateStorage.get_children(ClusterUtils.BACKPRESSURE_SUBTREE, false);
+    }
+
+    @Override
+    public void setTopologyLogConfig(String stormId, LogConfig logConfig) {
+        stateStorage.set_data(ClusterUtils.logConfigPath(stormId), Utils.serialize(logConfig), acls);
+    }
+
+    @Override
+    public LogConfig topologyLogConfig(String stormId, Runnable cb) {
+        if (cb != null){
+            logConfigCallback.put(stormId, cb);
+        }
+        String path = ClusterUtils.logConfigPath(stormId);
+        return ClusterUtils.maybeDeserialize(stateStorage.get_data(path, cb != null), LogConfig.class);
+    }
+
+    @Override
+    public void workerHeartbeat(String stormId, String node, Long port, ClusterWorkerHeartbeat info) {
+        if (info != null) {
+            String path = ClusterUtils.workerbeatPath(stormId, node, port);
+            stateStorage.set_worker_hb(path, Utils.serialize(info), acls);
+        }
+    }
+
+    @Override
+    public void removeWorkerHeartbeat(String stormId, String node, Long port) {
+        String path = ClusterUtils.workerbeatPath(stormId, node, port);
+        stateStorage.delete_worker_hb(path);
+    }
+
+    @Override
+    public void supervisorHeartbeat(String supervisorId, SupervisorInfo info) {
+        String path = ClusterUtils.supervisorPath(supervisorId);
+        stateStorage.set_ephemeral_node(path, Utils.serialize(info), acls);
+    }
+
+    /**
+     * if znode exists and to be not on?, delete; if exists and on?, do nothing; if not exists and to be on?, create; if not exists and not on?, do nothing;
+     * 
+     * @param stormId
+     * @param node
+     * @param port
+     * @param on
+     */
+    @Override
+    public void workerBackpressure(String stormId, String node, Long port, boolean on) {
+        String path = ClusterUtils.backpressurePath(stormId, node, port);
+        boolean existed = stateStorage.node_exists(path, false);
+        if (existed) {
+            if (on == false)
+                stateStorage.delete_node(path);
+
+        } else {
+            if (on == true) {
+                stateStorage.set_ephemeral_node(path, null, acls);
+            }
+        }
+    }
+
+    /**
+     * Check whether a topology is in throttle-on status or not:
+     * if the backpresure/storm-id dir is not empty, this topology has throttle-on, otherwise throttle-off.
+     * 
+     * @param stormId
+     * @param callback
+     * @return
+     */
+    @Override
+    public boolean topologyBackpressure(String stormId, Runnable callback) {
+        if (callback != null) {
+            backPressureCallback.put(stormId, callback);
+        }
+        String path = ClusterUtils.backpressureStormRoot(stormId);
+        List<String> childrens = null;
+        if(stateStorage.node_exists(path, false)) {
+            childrens = stateStorage.get_children(path, callback != null);
+        } else {
+            childrens = new ArrayList<>();
+        }
+        return childrens.size() > 0;
+
+    }
+
+    @Override
+    public void setupBackpressure(String stormId) {
+        stateStorage.mkdirs(ClusterUtils.backpressureStormRoot(stormId), acls);
+    }
+
+    @Override
+    public void removeBackpressure(String stormId) {
+        try {
+            stateStorage.delete_node(ClusterUtils.backpressureStormRoot(stormId));
+        } catch (Exception e) {
+            if (Utils.exceptionCauseIsInstanceOf(KeeperException.class, e)) {
+                // do nothing
+                LOG.warn("Could not teardown backpressure node for {}.", stormId);
+            } else {
+                throw e;
+            }
+        }
+    }
+
+    @Override
+    public void removeWorkerBackpressure(String stormId, String node, Long port) {
+        String path = ClusterUtils.backpressurePath(stormId, node, port);
+        boolean existed = stateStorage.node_exists(path, false);
+        if (existed) {
+            stateStorage.delete_node(path);
+        }
+    }
+
+    @Override
+    public void activateStorm(String stormId, StormBase stormBase) {
+        String path = ClusterUtils.stormPath(stormId);
+        stateStorage.set_data(path, Utils.serialize(stormBase), acls);
+    }
+
+    /**
+     * To update this function due to APersistentMap/APersistentSet is clojure's structure
+     * 
+     * @param stormId
+     * @param newElems
+     */
+    @Override
+    public void updateStorm(String stormId, StormBase newElems) {
+
+        StormBase stormBase = stormBase(stormId, null);
+        if (stormBase.get_component_executors() != null) {
+
+            Map<String, Integer> newComponentExecutors = new HashMap<>();
+            Map<String, Integer> componentExecutors = newElems.get_component_executors();
+            // componentExecutors maybe be APersistentMap, which don't support "put"
+            for (Map.Entry<String, Integer> entry : componentExecutors.entrySet()) {
+                newComponentExecutors.put(entry.getKey(), entry.getValue());
+            }
+            for (Map.Entry<String, Integer> entry : stormBase.get_component_executors().entrySet()) {
+                if (!componentExecutors.containsKey(entry.getKey())) {
+                    newComponentExecutors.put(entry.getKey(), entry.getValue());
+                }
+            }
+            if (newComponentExecutors.size() > 0)
+                newElems.set_component_executors(newComponentExecutors);
+        }
+
+        Map<String, DebugOptions> ComponentDebug = new HashMap<>();
+        Map<String, DebugOptions> oldComponentDebug = stormBase.get_component_debug();
+
+        Map<String, DebugOptions> newComponentDebug = newElems.get_component_debug();
+        /// oldComponentDebug.keySet()/ newComponentDebug.keySet() maybe be APersistentSet, which don't support addAll
+        Set<String> debugOptionsKeys = new HashSet<>();
+        debugOptionsKeys.addAll(oldComponentDebug.keySet());
+        debugOptionsKeys.addAll(newComponentDebug.keySet());
+        for (String key : debugOptionsKeys) {
+            boolean enable = false;
+            double samplingpct = 0;
+            if (oldComponentDebug.containsKey(key)) {
+                enable = oldComponentDebug.get(key).is_enable();
+                samplingpct = oldComponentDebug.get(key).get_samplingpct();
+            }
+            if (newComponentDebug.containsKey(key)) {
+                enable = newComponentDebug.get(key).is_enable();
+                samplingpct += newComponentDebug.get(key).get_samplingpct();
+            }
+            DebugOptions debugOptions = new DebugOptions();
+            debugOptions.set_enable(enable);
+            debugOptions.set_samplingpct(samplingpct);
+            ComponentDebug.put(key, debugOptions);
+        }
+        if (ComponentDebug.size() > 0) {
+            newElems.set_component_debug(ComponentDebug);
+        }
+
+        if (StringUtils.isBlank(newElems.get_name())) {
+            newElems.set_name(stormBase.get_name());
+        }
+        if (newElems.get_status() == null) {
+            newElems.set_status(stormBase.get_status());
+        }
+        if (newElems.get_num_workers() == 0) {
+            newElems.set_num_workers(stormBase.get_num_workers());
+        }
+        if (newElems.get_launch_time_secs() == 0) {
+            newElems.set_launch_time_secs(stormBase.get_launch_time_secs());
+        }
+        if (StringUtils.isBlank(newElems.get_owner())) {
+            newElems.set_owner(stormBase.get_owner());
+        }
+        if (newElems.get_topology_action_options() == null) {
+            newElems.set_topology_action_options(stormBase.get_topology_action_options());
+        }
+        if (newElems.get_status() == null) {
+            newElems.set_status(stormBase.get_status());
+        }
+        stateStorage.set_data(ClusterUtils.stormPath(stormId), Utils.serialize(newElems), acls);
+    }
+
+    @Override
+    public void removeStormBase(String stormId) {
+        stateStorage.delete_node(ClusterUtils.stormPath(stormId));
+    }
+
+    @Override
+    public void setAssignment(String stormId, Assignment info) {
+        stateStorage.set_data(ClusterUtils.assignmentPath(stormId), Utils.serialize(info), acls);
+    }
+
+    @Override
+    public void setupBlobstore(String key, NimbusInfo nimbusInfo, Integer versionInfo) {
+        String path = ClusterUtils.blobstorePath(key) + ClusterUtils.ZK_SEPERATOR + nimbusInfo.toHostPortString() + "-" + versionInfo;
+        LOG.info("set-path: {}", path);
+        stateStorage.mkdirs(ClusterUtils.blobstorePath(key), acls);
+        stateStorage.delete_node_blobstore(ClusterUtils.blobstorePath(key), nimbusInfo.toHostPortString());
+        stateStorage.set_ephemeral_node(path, null, acls);
+    }
+
+    @Override
+    public List<String> activeKeys() {
+        return stateStorage.get_children(ClusterUtils.BLOBSTORE_SUBTREE, false);
+    }
+
+    // blobstore state
+    @Override
+    public List<String> blobstore(Runnable callback) {
+        if (callback != null) {
+            blobstoreCallback.set(callback);
+        }
+        stateStorage.sync_path(ClusterUtils.BLOBSTORE_SUBTREE);
+        return stateStorage.get_children(ClusterUtils.BLOBSTORE_SUBTREE, callback != null);
+
+    }
+
+    @Override
+    public void removeStorm(String stormId) {
+        stateStorage.delete_node(ClusterUtils.assignmentPath(stormId));
+        stateStorage.delete_node(ClusterUtils.credentialsPath(stormId));
+        stateStorage.delete_node(ClusterUtils.logConfigPath(stormId));
+        stateStorage.delete_node(ClusterUtils.profilerConfigPath(stormId));
+        removeStormBase(stormId);
+    }
+
+    @Override
+    public void removeBlobstoreKey(String blobKey) {
+        LOG.debug("remove key {}", blobKey);
+        stateStorage.delete_node(ClusterUtils.blobstorePath(blobKey));
+    }
+
+    @Override
+    public void removeKeyVersion(String blobKey) {
+        stateStorage.delete_node(ClusterUtils.blobstoreMaxKeySequenceNumberPath(blobKey));
+    }
+
+    @Override
+    public void reportError(String stormId, String componentId, String node, Long port, Throwable error) {
+
+        String path = ClusterUtils.errorPath(stormId, componentId);
+        String lastErrorPath = ClusterUtils.lastErrorPath(stormId, componentId);
+        ErrorInfo errorInfo = new ErrorInfo(ClusterUtils.stringifyError(error), Time.currentTimeSecs());
+        errorInfo.set_host(node);
+        errorInfo.set_port(port.intValue());
+        byte[] serData = Utils.serialize(errorInfo);
+        stateStorage.mkdirs(path, acls);
+        stateStorage.create_sequential(path + ClusterUtils.ZK_SEPERATOR + "e", serData, acls);
+        stateStorage.set_data(lastErrorPath, serData, acls);
+        List<String> childrens = stateStorage.get_children(path, false);
+
+        Collections.sort(childrens, new Comparator<String>() {
+            public int compare(String arg0, String arg1) {
+                return Long.compare(Long.parseLong(arg0.substring(1)), Long.parseLong(arg1.substring(1)));
+            }
+        });
+
+        while (childrens.size() > 10) {
+            stateStorage.delete_node(path + ClusterUtils.ZK_SEPERATOR + childrens.remove(0));
+        }
+    }
+
+    @Override
+    public List<ErrorInfo> errors(String stormId, String componentId) {
+        List<ErrorInfo> errorInfos = new ArrayList<>();
+        String path = ClusterUtils.errorPath(stormId, componentId);
+        if (stateStorage.node_exists(path, false)) {
+            List<String> childrens = stateStorage.get_children(path, false);
+            for (String child : childrens) {
+                String childPath = path + ClusterUtils.ZK_SEPERATOR + child;
+                ErrorInfo errorInfo = ClusterUtils.maybeDeserialize(stateStorage.get_data(childPath, false), ErrorInfo.class);
+                if (errorInfo != null)
+                    errorInfos.add(errorInfo);
+            }
+        }
+        Collections.sort(errorInfos, new Comparator<ErrorInfo>() {
+            public int compare(ErrorInfo arg0, ErrorInfo arg1) {
+                return Integer.compare(arg1.get_error_time_secs(), arg0.get_error_time_secs());
+            }
+        });
+
+        return errorInfos;
+    }
+
+    @Override
+    public ErrorInfo lastError(String stormId, String componentId) {
+
+        String path = ClusterUtils.lastErrorPath(stormId, componentId);
+        if (stateStorage.node_exists(path, false)) {
+            ErrorInfo errorInfo = ClusterUtils.maybeDeserialize(stateStorage.get_data(path, false), ErrorInfo.class);
+            return errorInfo;
+        }
+
+        return null;
+    }
+
+    @Override
+    public void setCredentials(String stormId, Credentials creds, Map<String, Object> topoConf) throws NoSuchAlgorithmException {
+        List<ACL> aclList = ClusterUtils.mkTopoOnlyAcls(topoConf);
+        String path = ClusterUtils.credentialsPath(stormId);
+        stateStorage.set_data(path, Utils.serialize(creds), aclList);
+
+    }
+
+    @Override
+    public Credentials credentials(String stormId, Runnable callback) {
+        if (callback != null) {
+            credentialsCallback.put(stormId, callback);
+        }
+        String path = ClusterUtils.credentialsPath(stormId);
+        return ClusterUtils.maybeDeserialize(stateStorage.get_data(path, callback != null), Credentials.class);
+
+    }
+
+    @Override
+    public void disconnect() {
+        stateStorage.unregister(stateId);
+        if (solo)
+            stateStorage.close();
+    }
+
+    private List<String> tokenizePath(String path) {
+        String[] toks = path.split("/");
+        java.util.ArrayList<String> rtn = new ArrayList<String>();
+        for (String str : toks) {
+            if (!str.isEmpty()) {
+                rtn.add(str);
+            }
+        }
+        return rtn;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/4de339a8/storm-client/src/jvm/org/apache/storm/cluster/VersionedData.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/cluster/VersionedData.java b/storm-client/src/jvm/org/apache/storm/cluster/VersionedData.java
new file mode 100644
index 0000000..3de2a88
--- /dev/null
+++ b/storm-client/src/jvm/org/apache/storm/cluster/VersionedData.java
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.cluster;
+
+public class VersionedData<D> {
+    private final int version;
+    private final D data;
+    
+    public VersionedData(int version, D data) {
+        this.version = version;
+        this.data = data;
+    }
+    
+    public int getVersion() {
+        return version;
+    }
+    
+    public D getData() {
+        return data;
+    }
+}