You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by bo...@apache.org on 2016/02/24 17:17:18 UTC

[08/27] storm git commit: update class hierarchy about cluster

http://git-wip-us.apache.org/repos/asf/storm/blob/55b86ca4/storm-core/src/jvm/org/apache/storm/cluster/StormClusterStateImpl.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/cluster/StormClusterStateImpl.java b/storm-core/src/jvm/org/apache/storm/cluster/StormClusterStateImpl.java
new file mode 100644
index 0000000..cd2bc4a
--- /dev/null
+++ b/storm-core/src/jvm/org/apache/storm/cluster/StormClusterStateImpl.java
@@ -0,0 +1,664 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.cluster;
+
+import clojure.lang.*;
+import org.apache.commons.lang.StringUtils;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.state.*;
+import org.apache.curator.framework.state.ConnectionState;
+import org.apache.storm.callback.ZKStateChangedCallback;
+import org.apache.storm.generated.*;
+import org.apache.storm.nimbus.NimbusInfo;
+import org.apache.storm.utils.Time;
+import org.apache.storm.utils.Utils;
+import org.apache.storm.zookeeper.Zookeeper;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.data.ACL;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.security.NoSuchAlgorithmException;
+import java.util.*;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicReference;
+
+public class StormClusterStateImpl implements StormClusterState {
+
+    private static Logger LOG = LoggerFactory.getLogger(StormClusterStateImpl.class);
+
+    private StateStorage stateStorage;
+
+    private ConcurrentHashMap<String, IFn> assignmentInfoCallback;
+    private ConcurrentHashMap<String, IFn> assignmentInfoWithVersionCallback;
+    private ConcurrentHashMap<String, IFn> assignmentVersionCallback;
+    private AtomicReference<IFn> supervisorsCallback;
+    // we want to reigister a topo directory getChildren callback for all workers of this dir
+    private ConcurrentHashMap<String, IFn> backPressureCallback;
+    private AtomicReference<IFn> assignmentsCallback;
+    private ConcurrentHashMap<String, IFn> stormBaseCallback;
+    private AtomicReference<IFn> blobstoreCallback;
+    private ConcurrentHashMap<String, IFn> credentialsCallback;
+    private ConcurrentHashMap<String, IFn> logConfigCallback;
+
+    private List<ACL> acls;
+    private String stateId;
+    private boolean solo;
+
+    public StormClusterStateImpl(StateStorage StateStorage, List<ACL> acls, ClusterStateContext context, boolean solo) throws Exception {
+
+        this.stateStorage = StateStorage;
+        this.solo = solo;
+
+        assignmentInfoCallback = new ConcurrentHashMap<>();
+        assignmentInfoWithVersionCallback = new ConcurrentHashMap<>();
+        assignmentVersionCallback = new ConcurrentHashMap<>();
+        supervisorsCallback = new AtomicReference<>();
+        backPressureCallback = new ConcurrentHashMap<>();
+        assignmentsCallback = new AtomicReference<>();
+        stormBaseCallback = new ConcurrentHashMap<>();
+        credentialsCallback = new ConcurrentHashMap<>();
+        logConfigCallback = new ConcurrentHashMap<>();
+        blobstoreCallback = new AtomicReference<>();
+
+        stateId = this.stateStorage.register(new ZKStateChangedCallback() {
+
+            public void changed(Watcher.Event.EventType type, String path) {
+                List<String> toks = Zookeeper.tokenizePath(path);
+                int size = toks.size();
+                if (size >= 1) {
+                    String params = null;
+                    String root = toks.get(0);
+                    IFn fn = null;
+                    if (root.equals(ClusterUtils.ASSIGNMENTS_ROOT)) {
+                        if (size == 1) {
+                            // set null and get the old value
+                            issueCallback(assignmentsCallback);
+                        } else {
+                            issueMapCallback(assignmentInfoCallback, toks.get(1));
+                            issueMapCallback(assignmentVersionCallback, toks.get(1));
+                            issueMapCallback(assignmentInfoWithVersionCallback, toks.get(1));
+                        }
+
+                    } else if (root.equals(ClusterUtils.SUPERVISORS_ROOT)) {
+                        issueCallback(supervisorsCallback);
+                    } else if (root.equals(ClusterUtils.BLOBSTORE_ROOT)) {
+                        issueCallback(blobstoreCallback);
+                    } else if (root.equals(ClusterUtils.STORMS_ROOT) && size > 1) {
+                        issueMapCallback(stormBaseCallback, toks.get(1));
+                    } else if (root.equals(ClusterUtils.CREDENTIALS_ROOT) && size > 1) {
+                        issueMapCallback(credentialsCallback, toks.get(1));
+                    } else if (root.equals(ClusterUtils.LOGCONFIG_ROOT) && size > 1) {
+                        issueMapCallback(logConfigCallback, toks.get(1));
+                    } else if (root.equals(ClusterUtils.BACKPRESSURE_ROOT) && size > 1) {
+                        issueMapCallback(logConfigCallback, toks.get(1));
+                    } else {
+                        LOG.error("{} Unknown callback for subtree {}", new RuntimeException("Unknown callback for this path"), path);
+                        Runtime.getRuntime().exit(30);
+                    }
+
+                }
+
+                return;
+            }
+
+        });
+
+        String[] pathlist = { ClusterUtils.ASSIGNMENTS_SUBTREE, ClusterUtils.STORMS_SUBTREE, ClusterUtils.SUPERVISORS_SUBTREE, ClusterUtils.WORKERBEATS_SUBTREE,
+                ClusterUtils.ERRORS_SUBTREE, ClusterUtils.BLOBSTORE_SUBTREE, ClusterUtils.NIMBUSES_SUBTREE, ClusterUtils.LOGCONFIG_SUBTREE };
+        for (String path : pathlist) {
+            this.stateStorage.mkdirs(path, acls);
+        }
+
+    }
+
+    protected void issueCallback(AtomicReference<IFn> cb) {
+        IFn callback = cb.getAndSet(null);
+        if (callback != null)
+            callback.invoke();
+    }
+
+    protected void issueMapCallback(ConcurrentHashMap<String, IFn> callbackConcurrentHashMap, String key) {
+        IFn callback = callbackConcurrentHashMap.remove(key);
+        if (callback != null)
+            callback.invoke();
+    }
+
+    @Override
+    public List<String> assignments(IFn callback) {
+        if (callback != null) {
+            assignmentsCallback.set(callback);
+        }
+        return stateStorage.get_children(ClusterUtils.ASSIGNMENTS_SUBTREE, callback != null);
+    }
+
+    @Override
+    public Assignment assignmentInfo(String stormId, IFn callback) {
+        if (callback != null) {
+            assignmentInfoCallback.put(stormId, callback);
+        }
+        byte[] serialized = stateStorage.get_data(ClusterUtils.assignmentPath(stormId), callback != null);
+        return ClusterUtils.maybeDeserialize(serialized, Assignment.class);
+    }
+
+    @Override
+    public APersistentMap assignmentInfoWithVersion(String stormId, IFn callback) {
+        if (callback != null) {
+            assignmentInfoWithVersionCallback.put(stormId, callback);
+        }
+        Assignment assignment = null;
+        Integer version = 0;
+        APersistentMap aPersistentMap = stateStorage.get_data_with_version(ClusterUtils.assignmentPath(stormId), callback != null);
+        if (aPersistentMap != null) {
+            assignment = ClusterUtils.maybeDeserialize((byte[]) aPersistentMap.get(RT.keyword(null, "data")), Assignment.class);
+            version = (Integer) aPersistentMap.get(RT.keyword(null, "version"));
+        }
+        APersistentMap map = new PersistentArrayMap(new Object[] { RT.keyword(null, "data"), assignment, RT.keyword(null, "version"), version });
+        return map;
+    }
+
+    @Override
+    public Integer assignmentVersion(String stormId, IFn callback) throws Exception {
+        if (callback != null) {
+            assignmentVersionCallback.put(stormId, callback);
+        }
+        return stateStorage.get_version(ClusterUtils.assignmentPath(stormId), callback != null);
+    }
+
+    // blobstore state
+    @Override
+    public List<String> blobstoreInfo(String blobKey) {
+        String path = ClusterUtils.blobstorePath(blobKey);
+        stateStorage.sync_path(path);
+        return stateStorage.get_children(path, false);
+    }
+
+    @Override
+    public List nimbuses() {
+        List<NimbusSummary> nimbusSummaries = new ArrayList<>();
+        List<String> nimbusIds = stateStorage.get_children(ClusterUtils.NIMBUSES_SUBTREE, false);
+        for (String nimbusId : nimbusIds) {
+            byte[] serialized = stateStorage.get_data(ClusterUtils.nimbusPath(nimbusId), false);
+            NimbusSummary nimbusSummary = ClusterUtils.maybeDeserialize(serialized, NimbusSummary.class);
+            nimbusSummaries.add(nimbusSummary);
+        }
+        return nimbusSummaries;
+    }
+
+    @Override
+    public void addNimbusHost(final String nimbusId, final NimbusSummary nimbusSummary) {
+        // explicit delete for ephmeral node to ensure this session creates the entry.
+        stateStorage.delete_node(ClusterUtils.nimbusPath(nimbusId));
+        stateStorage.add_listener(new ConnectionStateListener() {
+            @Override
+            public void stateChanged(CuratorFramework curatorFramework, ConnectionState connectionState) {
+                LOG.info("Connection state listener invoked, zookeeper connection state has changed to {}", connectionState);
+                if (connectionState.equals(ConnectionState.RECONNECTED)) {
+                    LOG.info("Connection state has changed to reconnected so setting nimbuses entry one more time");
+                    stateStorage.set_ephemeral_node(ClusterUtils.nimbusPath(nimbusId), Utils.serialize(nimbusSummary), acls);
+                }
+
+            }
+        });
+
+        stateStorage.set_ephemeral_node(ClusterUtils.nimbusPath(nimbusId), Utils.serialize(nimbusSummary), acls);
+    }
+
+    @Override
+    public List<String> activeStorms() {
+        return stateStorage.get_children(ClusterUtils.STORMS_SUBTREE, false);
+    }
+
+    @Override
+    public StormBase stormBase(String stormId, IFn callback) {
+        if (callback != null) {
+            stormBaseCallback.put(stormId, callback);
+        }
+        return ClusterUtils.maybeDeserialize(stateStorage.get_data(ClusterUtils.stormPath(stormId), callback != null), StormBase.class);
+    }
+
+    @Override
+    public ClusterWorkerHeartbeat getWorkerHeartbeat(String stormId, String node, Long port) {
+        byte[] bytes = stateStorage.get_worker_hb(ClusterUtils.workerbeatPath(stormId, node, port), false);
+        return ClusterUtils.maybeDeserialize(bytes, ClusterWorkerHeartbeat.class);
+
+    }
+
+    @Override
+    public List<ProfileRequest> getWorkerProfileRequests(String stormId, NodeInfo nodeInfo, boolean isThrift) {
+        List<ProfileRequest> requests = new ArrayList<>();
+        List<ProfileRequest> profileRequests = getTopologyProfileRequests(stormId, isThrift);
+        for (ProfileRequest profileRequest : profileRequests) {
+            NodeInfo nodeInfo1 = profileRequest.get_nodeInfo();
+            if (nodeInfo1.equals(nodeInfo))
+                requests.add(profileRequest);
+        }
+        return requests;
+    }
+
+    @Override
+    public List<ProfileRequest> getTopologyProfileRequests(String stormId, boolean isThrift) {
+        List<ProfileRequest> profileRequests = new ArrayList<>();
+        String path = ClusterUtils.profilerConfigPath(stormId);
+        if (stateStorage.node_exists(path, false)) {
+            List<String> strs = stateStorage.get_children(path, false);
+            for (String str : strs) {
+                String childPath = path + ClusterUtils.ZK_SEPERATOR + str;
+                byte[] raw = stateStorage.get_data(childPath, false);
+                ProfileRequest request = ClusterUtils.maybeDeserialize(raw, ProfileRequest.class);
+                if (request != null)
+                    profileRequests.add(request);
+            }
+        }
+        return profileRequests;
+    }
+
+    @Override
+    public void setWorkerProfileRequest(String stormId, ProfileRequest profileRequest) {
+        ProfileAction profileAction = profileRequest.get_action();
+        String host = profileRequest.get_nodeInfo().get_node();
+        Long port = profileRequest.get_nodeInfo().get_port_iterator().next();
+        String path = ClusterUtils.profilerConfigPath(stormId, host, port, profileAction);
+        stateStorage.set_data(path, Utils.serialize(profileRequest), acls);
+    }
+
+    @Override
+    public void deleteTopologyProfileRequests(String stormId, ProfileRequest profileRequest) {
+        ProfileAction profileAction = profileRequest.get_action();
+        String host = profileRequest.get_nodeInfo().get_node();
+        Long port = profileRequest.get_nodeInfo().get_port_iterator().next();
+        String path = ClusterUtils.profilerConfigPath(stormId, host, port, profileAction);
+        stateStorage.delete_node(path);
+    }
+
+    // need to take executor->node+port in explicitly so that we don't run into a situation where a
+    // long dead worker with a skewed clock overrides all the timestamps. By only checking heartbeats
+    // with an assigned node+port, and only reading executors from that heartbeat that are actually assigned,
+    // we avoid situations like that
+    @Override
+    public Map<ExecutorInfo, ClusterWorkerHeartbeat> executorBeats(String stormId, Map<List<Long>, NodeInfo> executorNodePort) {
+        Map<ExecutorInfo, ClusterWorkerHeartbeat> executorWhbs = new HashMap<>();
+
+        Map<NodeInfo, List<List<Long>>> nodePortExecutors = ClusterUtils.reverseMap(executorNodePort);
+
+        for (Map.Entry<NodeInfo, List<List<Long>>> entry : nodePortExecutors.entrySet()) {
+
+            String node = entry.getKey().get_node();
+            Long port = entry.getKey().get_port_iterator().next();
+            ClusterWorkerHeartbeat whb = getWorkerHeartbeat(stormId, node, port);
+            List<ExecutorInfo> executorInfoList = new ArrayList<>();
+            for (List<Long> list : entry.getValue()) {
+                executorInfoList.add(new ExecutorInfo(list.get(0).intValue(), list.get(list.size() - 1).intValue()));
+            }
+            if (whb != null)
+                executorWhbs.putAll(ClusterUtils.convertExecutorBeats(executorInfoList, whb));
+        }
+        return executorWhbs;
+    }
+
+    @Override
+    public List<String> supervisors(IFn callback) {
+        if (callback != null) {
+            supervisorsCallback.set(callback);
+        }
+        return stateStorage.get_children(ClusterUtils.SUPERVISORS_SUBTREE, callback != null);
+    }
+
+    @Override
+    public SupervisorInfo supervisorInfo(String supervisorId) {
+        String path = ClusterUtils.supervisorPath(supervisorId);
+        return ClusterUtils.maybeDeserialize(stateStorage.get_data(path, false), SupervisorInfo.class);
+    }
+
+    @Override
+    public void setupHeatbeats(String stormId) {
+        stateStorage.mkdirs(ClusterUtils.workerbeatStormRoot(stormId), acls);
+    }
+
+    @Override
+    public void teardownHeartbeats(String stormId) {
+        try {
+            stateStorage.delete_worker_hb(ClusterUtils.workerbeatStormRoot(stormId));
+        } catch (Exception e) {
+            if (Zookeeper.exceptionCause(KeeperException.class, e)) {
+                // do nothing
+                LOG.warn("Could not teardown heartbeats for {}.", stormId);
+            } else {
+                throw e;
+            }
+        }
+    }
+
+    @Override
+    public void teardownTopologyErrors(String stormId) {
+        try {
+            stateStorage.delete_node(ClusterUtils.errorStormRoot(stormId));
+        } catch (Exception e) {
+            if (Zookeeper.exceptionCause(KeeperException.class, e)) {
+                // do nothing
+                LOG.warn("Could not teardown errors for {}.", stormId);
+            } else {
+                throw e;
+            }
+        }
+    }
+
+    @Override
+    public List<String> heartbeatStorms() {
+        return stateStorage.get_worker_hb_children(ClusterUtils.WORKERBEATS_SUBTREE, false);
+    }
+
+    @Override
+    public List<String> errorTopologies() {
+        return stateStorage.get_children(ClusterUtils.ERRORS_SUBTREE, false);
+    }
+
+    @Override
+    public void setTopologyLogConfig(String stormId, LogConfig logConfig) {
+        stateStorage.set_data(ClusterUtils.logConfigPath(stormId), Utils.serialize(logConfig), acls);
+    }
+
+    @Override
+    public LogConfig topologyLogConfig(String stormId, IFn cb) {
+        String path = ClusterUtils.logConfigPath(stormId);
+        return ClusterUtils.maybeDeserialize(stateStorage.get_data(path, cb != null), LogConfig.class);
+    }
+
+    @Override
+    public void workerHeartbeat(String stormId, String node, Long port, ClusterWorkerHeartbeat info) {
+        if (info != null) {
+            String path = ClusterUtils.workerbeatPath(stormId, node, port);
+            stateStorage.set_worker_hb(path, Utils.serialize(info), acls);
+        }
+    }
+
+    @Override
+    public void removeWorkerHeartbeat(String stormId, String node, Long port) {
+        String path = ClusterUtils.workerbeatPath(stormId, node, port);
+        stateStorage.delete_worker_hb(path);
+    }
+
+    @Override
+    public void supervisorHeartbeat(String supervisorId, SupervisorInfo info) {
+        String path = ClusterUtils.supervisorPath(supervisorId);
+        stateStorage.set_ephemeral_node(path, Utils.serialize(info), acls);
+    }
+
+    // if znode exists and to be not on?, delete; if exists and on?, do nothing;
+    // if not exists and to be on?, create; if not exists and not on?, do nothing;
+    @Override
+    public void workerBackpressure(String stormId, String node, Long port, boolean on) {
+        String path = ClusterUtils.backpressurePath(stormId, node, port);
+        boolean existed = stateStorage.node_exists(path, false);
+        if (existed) {
+            if (on == false)
+                stateStorage.delete_node(path);
+
+        } else {
+            if (on == true) {
+                stateStorage.set_ephemeral_node(path, null, acls);
+            }
+        }
+    }
+
+    // if the backpresure/storm-id dir is empty, this topology has throttle-on, otherwise not.
+    @Override
+    public boolean topologyBackpressure(String stormId, IFn callback) {
+        if (callback != null) {
+            backPressureCallback.put(stormId, callback);
+        }
+        String path = ClusterUtils.backpressureStormRoot(stormId);
+        List<String> childrens = stateStorage.get_children(path, callback != null);
+        return childrens.size() > 0;
+
+    }
+
+    @Override
+    public void setupBackpressure(String stormId) {
+        stateStorage.mkdirs(ClusterUtils.backpressureStormRoot(stormId), acls);
+    }
+
+    @Override
+    public void removeWorkerBackpressure(String stormId, String node, Long port) {
+        stateStorage.delete_node(ClusterUtils.backpressurePath(stormId, node, port));
+    }
+
+    @Override
+    public void activateStorm(String stormId, StormBase stormBase) {
+        String path = ClusterUtils.stormPath(stormId);
+        stateStorage.set_data(path, Utils.serialize(stormBase), acls);
+    }
+
+    // To update this function due to APersistentMap/APersistentSet is clojure's structure
+    @Override
+    public void updateStorm(String stormId, StormBase newElems) {
+
+        StormBase stormBase = stormBase(stormId, null);
+        if (stormBase.get_component_executors() != null) {
+
+            Map<String, Integer> newComponentExecutors = new HashMap<>();
+            Map<String, Integer> componentExecutors = newElems.get_component_executors();
+            // componentExecutors maybe be APersistentMap, which don't support "put"
+            for (Map.Entry<String, Integer> entry : componentExecutors.entrySet()) {
+                newComponentExecutors.put(entry.getKey(), entry.getValue());
+            }
+            for (Map.Entry<String, Integer> entry : stormBase.get_component_executors().entrySet()) {
+                if (!componentExecutors.containsKey(entry.getKey())) {
+                    newComponentExecutors.put(entry.getKey(), entry.getValue());
+                }
+            }
+            if (newComponentExecutors.size() > 0)
+                newElems.set_component_executors(newComponentExecutors);
+        }
+
+        Map<String, DebugOptions> ComponentDebug = new HashMap<>();
+        Map<String, DebugOptions> oldComponentDebug = stormBase.get_component_debug();
+
+        Map<String, DebugOptions> newComponentDebug = newElems.get_component_debug();
+        /// oldComponentDebug.keySet()/ newComponentDebug.keySet() maybe be APersistentSet, which don't support addAll
+        Set<String> debugOptionsKeys = new HashSet<>();
+        debugOptionsKeys.addAll(oldComponentDebug.keySet());
+        debugOptionsKeys.addAll(newComponentDebug.keySet());
+        for (String key : debugOptionsKeys) {
+            boolean enable = false;
+            double samplingpct = 0;
+            if (oldComponentDebug.containsKey(key)) {
+                enable = oldComponentDebug.get(key).is_enable();
+                samplingpct = oldComponentDebug.get(key).get_samplingpct();
+            }
+            if (newComponentDebug.containsKey(key)) {
+                enable = newComponentDebug.get(key).is_enable();
+                samplingpct += newComponentDebug.get(key).get_samplingpct();
+            }
+            DebugOptions debugOptions = new DebugOptions();
+            debugOptions.set_enable(enable);
+            debugOptions.set_samplingpct(samplingpct);
+            ComponentDebug.put(key, debugOptions);
+        }
+        if (ComponentDebug.size() > 0) {
+            newElems.set_component_debug(ComponentDebug);
+        }
+
+        if (StringUtils.isBlank(newElems.get_name())) {
+            newElems.set_name(stormBase.get_name());
+        }
+        if (newElems.get_status() == null) {
+            newElems.set_status(stormBase.get_status());
+        }
+        if (newElems.get_num_workers() == 0) {
+            newElems.set_num_workers(stormBase.get_num_workers());
+        }
+        if (newElems.get_launch_time_secs() == 0) {
+            newElems.set_launch_time_secs(stormBase.get_launch_time_secs());
+        }
+        if (StringUtils.isBlank(newElems.get_owner())) {
+            newElems.set_owner(stormBase.get_owner());
+        }
+        if (newElems.get_topology_action_options() == null) {
+            newElems.set_topology_action_options(stormBase.get_topology_action_options());
+        }
+        if (newElems.get_status() == null) {
+            newElems.set_status(stormBase.get_status());
+        }
+        stateStorage.set_data(ClusterUtils.stormPath(stormId), Utils.serialize(newElems), acls);
+    }
+
+    @Override
+    public void removeStormBase(String stormId) {
+        stateStorage.delete_node(ClusterUtils.stormPath(stormId));
+    }
+
+    @Override
+    public void setAssignment(String stormId, Assignment info) {
+        stateStorage.set_data(ClusterUtils.assignmentPath(stormId), Utils.serialize(info), acls);
+    }
+
+    @Override
+    public void setupBlobstore(String key, NimbusInfo nimbusInfo, Integer versionInfo) {
+        String path = ClusterUtils.blobstorePath(key) + ClusterUtils.ZK_SEPERATOR + nimbusInfo.toHostPortString() + "-" + versionInfo;
+        LOG.info("set-path: {}", path);
+        stateStorage.mkdirs(ClusterUtils.blobstorePath(key), acls);
+        stateStorage.delete_node_blobstore(ClusterUtils.blobstorePath(key), nimbusInfo.toHostPortString());
+        stateStorage.set_ephemeral_node(path, null, acls);
+    }
+
+    @Override
+    public List<String> activeKeys() {
+        return stateStorage.get_children(ClusterUtils.BLOBSTORE_SUBTREE, false);
+    }
+
+    // blobstore state
+    @Override
+    public List<String> blobstore(IFn callback) {
+        if (callback != null) {
+            blobstoreCallback.set(callback);
+        }
+        stateStorage.sync_path(ClusterUtils.BLOBSTORE_SUBTREE);
+        return stateStorage.get_children(ClusterUtils.BLOBSTORE_SUBTREE, callback != null);
+
+    }
+
+    @Override
+    public void removeStorm(String stormId) {
+        stateStorage.delete_node(ClusterUtils.assignmentPath(stormId));
+        stateStorage.delete_node(ClusterUtils.credentialsPath(stormId));
+        stateStorage.delete_node(ClusterUtils.logConfigPath(stormId));
+        stateStorage.delete_node(ClusterUtils.profilerConfigPath(stormId));
+        removeStormBase(stormId);
+    }
+
+    @Override
+    public void removeBlobstoreKey(String blobKey) {
+        LOG.debug("remove key {}", blobKey);
+        stateStorage.delete_node(ClusterUtils.blobstorePath(blobKey));
+    }
+
+    @Override
+    public void removeKeyVersion(String blobKey) {
+        stateStorage.delete_node(ClusterUtils.blobstoreMaxKeySequenceNumberPath(blobKey));
+    }
+
+    @Override
+    public void reportError(String stormId, String componentId, String node, Long port, String error) {
+
+        String path = ClusterUtils.errorPath(stormId, componentId);
+        String lastErrorPath = ClusterUtils.lastErrorPath(stormId, componentId);
+        ErrorInfo errorInfo = new ErrorInfo(error, Time.currentTimeSecs());
+        errorInfo.set_host(node);
+        errorInfo.set_port(port.intValue());
+        byte[] serData = Utils.serialize(errorInfo);
+        stateStorage.mkdirs(path, acls);
+        stateStorage.create_sequential(path + ClusterUtils.ZK_SEPERATOR + "e", serData, acls);
+        stateStorage.set_data(lastErrorPath, serData, acls);
+        List<String> childrens = stateStorage.get_children(path, false);
+
+        Collections.sort(childrens, new Comparator<String>() {
+            public int compare(String arg0, String arg1) {
+                return Long.compare(Long.parseLong(arg0.substring(1)), Long.parseLong(arg1.substring(1)));
+            }
+        });
+
+        while (childrens.size() > 10) {
+            stateStorage.delete_node(path + ClusterUtils.ZK_SEPERATOR + childrens.remove(0));
+        }
+    }
+
+    @Override
+    public List<ErrorInfo> errors(String stormId, String componentId) {
+        List<ErrorInfo> errorInfos = new ArrayList<>();
+        try {
+            String path = ClusterUtils.errorPath(stormId, componentId);
+            if (stateStorage.node_exists(path, false)) {
+                List<String> childrens = stateStorage.get_children(path, false);
+                for (String child : childrens) {
+                    String childPath = path + ClusterUtils.ZK_SEPERATOR + child;
+                    ErrorInfo errorInfo = ClusterUtils.maybeDeserialize(stateStorage.get_data(childPath, false), ErrorInfo.class);
+                    if (errorInfo != null)
+                        errorInfos.add(errorInfo);
+                }
+            }
+            Collections.sort(errorInfos, new Comparator<ErrorInfo>() {
+                public int compare(ErrorInfo arg0, ErrorInfo arg1) {
+                    return -Integer.compare(arg0.get_error_time_secs(), arg1.get_error_time_secs());
+                }
+            });
+        } catch (Exception e) {
+            throw Utils.wrapInRuntime(e);
+        }
+
+        return errorInfos;
+    }
+
+    @Override
+    public ErrorInfo lastError(String stormId, String componentId) {
+
+        String path = ClusterUtils.lastErrorPath(stormId, componentId);
+        if (stateStorage.node_exists(path, false)) {
+            ErrorInfo errorInfo = ClusterUtils.maybeDeserialize(stateStorage.get_data(path, false), ErrorInfo.class);
+            return errorInfo;
+        }
+
+        return null;
+    }
+
+    @Override
+    public void setCredentials(String stormId, Credentials creds, Map topoConf) throws NoSuchAlgorithmException {
+        List<ACL> aclList = ClusterUtils.mkTopoOnlyAcls(topoConf);
+        String path = ClusterUtils.credentialsPath(stormId);
+        stateStorage.set_data(path, Utils.serialize(creds), aclList);
+
+    }
+
+    @Override
+    public Credentials credentials(String stormId, IFn callback) {
+        if (callback != null) {
+            credentialsCallback.put(stormId, callback);
+        }
+        String path = ClusterUtils.credentialsPath(stormId);
+        return ClusterUtils.maybeDeserialize(stateStorage.get_data(path, callback != null), Credentials.class);
+
+    }
+
+    @Override
+    public void disconnect() {
+        stateStorage.unregister(stateId);
+        if (solo)
+            stateStorage.close();
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/55b86ca4/storm-core/src/jvm/org/apache/storm/cluster/StormZkClusterState.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/cluster/StormZkClusterState.java b/storm-core/src/jvm/org/apache/storm/cluster/StormZkClusterState.java
deleted file mode 100644
index 3a4205b..0000000
--- a/storm-core/src/jvm/org/apache/storm/cluster/StormZkClusterState.java
+++ /dev/null
@@ -1,683 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.storm.cluster;
-
-import clojure.lang.APersistentMap;
-import clojure.lang.IFn;
-import clojure.lang.PersistentArrayMap;
-import clojure.lang.RT;
-import org.apache.commons.lang.StringUtils;
-import org.apache.curator.framework.CuratorFramework;
-import org.apache.curator.framework.state.*;
-import org.apache.curator.framework.state.ConnectionState;
-import org.apache.storm.callback.Callback;
-import org.apache.storm.generated.*;
-import org.apache.storm.nimbus.NimbusInfo;
-import org.apache.storm.utils.Time;
-import org.apache.storm.utils.Utils;
-import org.apache.storm.zookeeper.Zookeeper;
-import org.apache.zookeeper.KeeperException;
-import org.apache.zookeeper.data.ACL;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.UnsupportedEncodingException;
-import java.security.NoSuchAlgorithmException;
-import java.util.*;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.atomic.AtomicReference;
-
-public class StormZkClusterState implements StormClusterState {
-
-    private static Logger LOG = LoggerFactory.getLogger(StormZkClusterState.class);
-
-    private ClusterState clusterState;
-
-    private ConcurrentHashMap<String, IFn> assignmentInfoCallback;
-    private ConcurrentHashMap<String, IFn> assignmentInfoWithVersionCallback;
-    private ConcurrentHashMap<String, IFn> assignmentVersionCallback;
-    private AtomicReference<IFn> supervisorsCallback;
-    // we want to reigister a topo directory getChildren callback for all workers of this dir
-    private ConcurrentHashMap<String, IFn> backPressureCallback;
-    private AtomicReference<IFn> assignmentsCallback;
-    private ConcurrentHashMap<String, IFn> stormBaseCallback;
-    private AtomicReference<IFn> blobstoreCallback;
-    private ConcurrentHashMap<String, IFn> credentialsCallback;
-    private ConcurrentHashMap<String, IFn> logConfigCallback;
-
-    private List<ACL> acls;
-    private String stateId;
-    private boolean solo;
-
-    public StormZkClusterState(Object clusterState, List<ACL> acls, ClusterStateContext context) throws Exception {
-
-        if (clusterState instanceof ClusterState) {
-            solo = false;
-            this.clusterState = (ClusterState) clusterState;
-        } else {
-
-            solo = true;
-            this.clusterState = new DistributedClusterState((Map) clusterState, (Map) clusterState, acls, context);
-        }
-
-        assignmentInfoCallback = new ConcurrentHashMap<>();
-        assignmentInfoWithVersionCallback = new ConcurrentHashMap<>();
-        assignmentVersionCallback = new ConcurrentHashMap<>();
-        supervisorsCallback = new AtomicReference<>();
-        backPressureCallback = new ConcurrentHashMap<>();
-        assignmentsCallback = new AtomicReference<>();
-        stormBaseCallback = new ConcurrentHashMap<>();
-        credentialsCallback = new ConcurrentHashMap<>();
-        logConfigCallback = new ConcurrentHashMap<>();
-        blobstoreCallback = new AtomicReference<>();
-
-        stateId = this.clusterState.register(new Callback() {
-
-            public <T> Object execute(T... args) {
-                if (args == null) {
-                    LOG.warn("Input args is null");
-                    return null;
-                } else if (args.length < 2) {
-                    LOG.warn("Input args is invalid, args length:" + args.length);
-                    return null;
-                }
-                String path = (String) args[1];
-
-                List<String> toks = Zookeeper.tokenizePath(path);
-                int size = toks.size();
-                if (size >= 1) {
-                    String params = null;
-                    String root = toks.get(0);
-                    IFn fn = null;
-                    if (root.equals(Cluster.ASSIGNMENTS_ROOT)) {
-                        if (size == 1) {
-                            // set null and get the old value
-                            issueCallback(assignmentsCallback);
-                        } else {
-                            issueMapCallback(assignmentInfoCallback, toks.get(1));
-                            issueMapCallback(assignmentVersionCallback, toks.get(1));
-                            issueMapCallback(assignmentInfoWithVersionCallback, toks.get(1));
-                        }
-
-                    } else if (root.equals(Cluster.SUPERVISORS_ROOT)) {
-                        issueCallback(supervisorsCallback);
-                    } else if (root.equals(Cluster.BLOBSTORE_ROOT)) {
-                        issueCallback(blobstoreCallback);
-                    } else if (root.equals(Cluster.STORMS_ROOT) && size > 1) {
-                        issueMapCallback(stormBaseCallback, toks.get(1));
-                    } else if (root.equals(Cluster.CREDENTIALS_ROOT) && size > 1) {
-                        issueMapCallback(credentialsCallback, toks.get(1));
-                    } else if (root.equals(Cluster.LOGCONFIG_ROOT) && size > 1) {
-                        issueMapCallback(logConfigCallback, toks.get(1));
-                    } else if (root.equals(Cluster.BACKPRESSURE_ROOT) && size > 1) {
-                        issueMapCallback(logConfigCallback, toks.get(1));
-                    } else {
-                        LOG.error("{} Unknown callback for subtree {}", new RuntimeException("Unknown callback for this path"), path);
-                        Runtime.getRuntime().exit(30);
-                    }
-
-                }
-
-                return null;
-            }
-
-        });
-
-        String[] pathlist = { Cluster.ASSIGNMENTS_SUBTREE, Cluster.STORMS_SUBTREE, Cluster.SUPERVISORS_SUBTREE, Cluster.WORKERBEATS_SUBTREE,
-                Cluster.ERRORS_SUBTREE, Cluster.BLOBSTORE_SUBTREE, Cluster.NIMBUSES_SUBTREE, Cluster.LOGCONFIG_SUBTREE };
-        for (String path : pathlist) {
-            this.clusterState.mkdirs(path, acls);
-        }
-
-    }
-
-    protected void issueCallback(AtomicReference<IFn> cb) {
-        IFn callback = cb.getAndSet(null);
-        if (callback != null)
-            callback.invoke();
-    }
-
-    protected void issueMapCallback(ConcurrentHashMap<String, IFn> callbackConcurrentHashMap, String key) {
-        IFn callback = callbackConcurrentHashMap.remove(key);
-        if (callback != null)
-            callback.invoke();
-    }
-
-    @Override
-    public List<String> assignments(IFn callback) {
-        if (callback != null) {
-            assignmentsCallback.set(callback);
-        }
-        return clusterState.get_children(Cluster.ASSIGNMENTS_SUBTREE, callback != null);
-    }
-
-    @Override
-    public Assignment assignmentInfo(String stormId, IFn callback) {
-        if (callback != null) {
-            assignmentInfoCallback.put(stormId, callback);
-        }
-        byte[] serialized = clusterState.get_data(Cluster.assignmentPath(stormId), callback != null);
-        return Cluster.maybeDeserialize(serialized, Assignment.class);
-    }
-
-    @Override
-    public APersistentMap assignmentInfoWithVersion(String stormId, IFn callback) {
-        if (callback != null) {
-            assignmentInfoWithVersionCallback.put(stormId, callback);
-        }
-        APersistentMap aPersistentMap = clusterState.get_data_with_version(Cluster.assignmentPath(stormId), callback != null);
-        Assignment assignment = Cluster.maybeDeserialize((byte[]) aPersistentMap.get("data"), Assignment.class);
-        Integer version = (Integer) aPersistentMap.get("version");
-        APersistentMap map = new PersistentArrayMap(new Object[] { RT.keyword(null, "data"), assignment, RT.keyword(null, "version"), version });
-        return map;
-    }
-
-    @Override
-    public Integer assignmentVersion(String stormId, IFn callback) throws Exception {
-        if (callback != null) {
-            assignmentVersionCallback.put(stormId, callback);
-        }
-        return clusterState.get_version(Cluster.assignmentPath(stormId), callback != null);
-    }
-
-    // blobstore state
-    @Override
-    public List<String> blobstoreInfo(String blobKey) {
-        String path = Cluster.blobstorePath(blobKey);
-        clusterState.sync_path(path);
-        return clusterState.get_children(path, false);
-    }
-
-    @Override
-    public List nimbuses() {
-        List<NimbusSummary> nimbusSummaries = new ArrayList<>();
-        List<String> nimbusIds = clusterState.get_children(Cluster.NIMBUSES_SUBTREE, false);
-        for (String nimbusId : nimbusIds) {
-            byte[] serialized = clusterState.get_data(Cluster.nimbusPath(nimbusId), false);
-            NimbusSummary nimbusSummary = Cluster.maybeDeserialize(serialized, NimbusSummary.class);
-            nimbusSummaries.add(nimbusSummary);
-        }
-        return nimbusSummaries;
-    }
-
-    @Override
-    public void addNimbusHost(final String nimbusId, final NimbusSummary nimbusSummary) {
-        // explicit delete for ephmeral node to ensure this session creates the entry.
-        clusterState.delete_node(Cluster.nimbusPath(nimbusId));
-        clusterState.add_listener(new ConnectionStateListener() {
-            @Override
-            public void stateChanged(CuratorFramework curatorFramework, ConnectionState connectionState) {
-                LOG.info("Connection state listener invoked, zookeeper connection state has changed to {}", connectionState);
-                if (connectionState.equals(ConnectionState.RECONNECTED)) {
-                    LOG.info("Connection state has changed to reconnected so setting nimbuses entry one more time");
-                    clusterState.set_ephemeral_node(Cluster.nimbusPath(nimbusId), Utils.serialize(nimbusSummary), acls);
-                }
-
-            }
-        });
-
-        clusterState.set_ephemeral_node(Cluster.nimbusPath(nimbusId), Utils.serialize(nimbusSummary), acls);
-    }
-
-    @Override
-    public List<String> activeStorms() {
-        return clusterState.get_children(Cluster.STORMS_SUBTREE, false);
-    }
-
-    @Override
-    public StormBase stormBase(String stormId, IFn callback) {
-        if (callback != null) {
-            stormBaseCallback.put(stormId, callback);
-        }
-        return Cluster.maybeDeserialize(clusterState.get_data(Cluster.stormPath(stormId), callback != null), StormBase.class);
-    }
-
-    @Override
-    public ClusterWorkerHeartbeat getWorkerHeartbeat(String stormId, String node, Long port) {
-        byte[] bytes = clusterState.get_worker_hb(Cluster.workerbeatPath(stormId, node, port), false);
-        if (bytes != null) {
-            return Cluster.maybeDeserialize(bytes, ClusterWorkerHeartbeat.class);
-        }
-        return null;
-    }
-
-    @Override
-    public List<ProfileRequest> getWorkerProfileRequests(String stormId, NodeInfo nodeInfo, boolean isThrift) {
-        List<ProfileRequest> requests = new ArrayList<>();
-        List<ProfileRequest> profileRequests = getTopologyProfileRequests(stormId, isThrift);
-        for (ProfileRequest profileRequest : profileRequests) {
-            NodeInfo nodeInfo1 = profileRequest.get_nodeInfo();
-            if (nodeInfo1.equals(nodeInfo))
-                requests.add(profileRequest);
-        }
-        return requests;
-    }
-
-    @Override
-    public List<ProfileRequest> getTopologyProfileRequests(String stormId, boolean isThrift) {
-        List<ProfileRequest> profileRequests = new ArrayList<>();
-        String path = Cluster.profilerConfigPath(stormId);
-        if (clusterState.node_exists(path, false)) {
-            List<String> strs = clusterState.get_children(path, false);
-            for (String str : strs) {
-                String childPath = path + Cluster.ZK_SEPERATOR + str;
-                byte[] raw = clusterState.get_data(childPath, false);
-                ProfileRequest request = Cluster.maybeDeserialize(raw, ProfileRequest.class);
-                if (request != null)
-                    profileRequests.add(request);
-            }
-        }
-        return profileRequests;
-    }
-
-    @Override
-    public void setWorkerProfileRequest(String stormId, ProfileRequest profileRequest) {
-        ProfileAction profileAction = profileRequest.get_action();
-        String host = profileRequest.get_nodeInfo().get_node();
-        Long port = profileRequest.get_nodeInfo().get_port_iterator().next();
-        String path = Cluster.profilerConfigPath(stormId, host, port, profileAction);
-        clusterState.set_data(path, Utils.serialize(profileRequest), acls);
-    }
-
-    @Override
-    public void deleteTopologyProfileRequests(String stormId, ProfileRequest profileRequest) {
-        ProfileAction profileAction = profileRequest.get_action();
-        String host = profileRequest.get_nodeInfo().get_node();
-        Long port = profileRequest.get_nodeInfo().get_port_iterator().next();
-        String path = Cluster.profilerConfigPath(stormId, host, port, profileAction);
-        clusterState.delete_node(path);
-    }
-
-    // need to take executor->node+port in explicitly so that we don't run into a situation where a
-    // long dead worker with a skewed clock overrides all the timestamps. By only checking heartbeats
-    // with an assigned node+port, and only reading executors from that heartbeat that are actually assigned,
-    // we avoid situations like that
-    @Override
-    public Map<ExecutorInfo, ClusterWorkerHeartbeat> executorBeats(String stormId, Map<List<Long>, NodeInfo> executorNodePort) {
-        Map<ExecutorInfo, ClusterWorkerHeartbeat> executorWhbs = new HashMap<>();
-
-        LOG.info(executorNodePort.toString());
-        Map<NodeInfo, List<List<Long>>> nodePortExecutors = Cluster.reverseMap(executorNodePort);
-        LOG.info(nodePortExecutors.toString());
-
-        for (Map.Entry<NodeInfo, List<List<Long>>> entry : nodePortExecutors.entrySet()) {
-
-            String node = entry.getKey().get_node();
-            Long port = entry.getKey().get_port_iterator().next();
-            ClusterWorkerHeartbeat whb = getWorkerHeartbeat(stormId, node, port);
-            List<ExecutorInfo> executorInfoList = new ArrayList<>();
-            for (List<Long> list : entry.getValue()) {
-                executorInfoList.add(new ExecutorInfo(list.get(0).intValue(), list.get(list.size() - 1).intValue()));
-            }
-            executorWhbs.putAll(Cluster.convertExecutorBeats(executorInfoList, whb));
-        }
-        return executorWhbs;
-    }
-
-    @Override
-    public List<String> supervisors(IFn callback) {
-        if (callback != null) {
-            supervisorsCallback.set(callback);
-        }
-        return clusterState.get_children(Cluster.SUPERVISORS_SUBTREE, callback != null);
-    }
-
-    @Override
-    public SupervisorInfo supervisorInfo(String supervisorId) {
-        String path = Cluster.supervisorPath(supervisorId);
-        return Cluster.maybeDeserialize(clusterState.get_data(path, false), SupervisorInfo.class);
-    }
-
-    @Override
-    public void setupHeatbeats(String stormId) {
-        clusterState.mkdirs(Cluster.workerbeatStormRoot(stormId), acls);
-    }
-
-    @Override
-    public void teardownHeartbeats(String stormId) {
-        try {
-            clusterState.delete_worker_hb(Cluster.workerbeatStormRoot(stormId));
-        } catch (Exception e) {
-            if (Zookeeper.exceptionCause(KeeperException.class, e)) {
-                // do nothing
-                LOG.warn("Could not teardown heartbeats for {}.", stormId);
-            } else {
-                throw e;
-            }
-        }
-    }
-
-    @Override
-    public void teardownTopologyErrors(String stormId) {
-        try {
-            clusterState.delete_node(Cluster.errorStormRoot(stormId));
-        } catch (Exception e) {
-            if (Zookeeper.exceptionCause(KeeperException.class, e)) {
-                // do nothing
-                LOG.warn("Could not teardown errors for {}.", stormId);
-            } else {
-                throw e;
-            }
-        }
-    }
-
-    @Override
-    public List<String> heartbeatStorms() {
-        return clusterState.get_worker_hb_children(Cluster.WORKERBEATS_SUBTREE, false);
-    }
-
-    @Override
-    public List<String> errorTopologies() {
-        return clusterState.get_children(Cluster.ERRORS_SUBTREE, false);
-    }
-
-    @Override
-    public void setTopologyLogConfig(String stormId, LogConfig logConfig) {
-        clusterState.set_data(Cluster.logConfigPath(stormId), Utils.serialize(logConfig), acls);
-    }
-
-    @Override
-    public LogConfig topologyLogConfig(String stormId, IFn cb) {
-        String path = Cluster.logConfigPath(stormId);
-        return Cluster.maybeDeserialize(clusterState.get_data(path, cb != null), LogConfig.class);
-    }
-
-    @Override
-    public void workerHeartbeat(String stormId, String node, Long port, ClusterWorkerHeartbeat info) {
-        if (info != null) {
-            String path = Cluster.workerbeatPath(stormId, node, port);
-            clusterState.set_worker_hb(path, Utils.serialize(info), acls);
-        }
-    }
-
-    @Override
-    public void removeWorkerHeartbeat(String stormId, String node, Long port) {
-        String path = Cluster.workerbeatPath(stormId, node, port);
-        clusterState.delete_worker_hb(path);
-    }
-
-    @Override
-    public void supervisorHeartbeat(String supervisorId, SupervisorInfo info) {
-        String path = Cluster.supervisorPath(supervisorId);
-        clusterState.set_ephemeral_node(path, Utils.serialize(info), acls);
-    }
-
-    // if znode exists and to be not on?, delete; if exists and on?, do nothing;
-    // if not exists and to be on?, create; if not exists and not on?, do nothing;
-    @Override
-    public void workerBackpressure(String stormId, String node, Long port, boolean on) {
-        String path = Cluster.backpressurePath(stormId, node, port);
-        boolean existed = clusterState.node_exists(path, false);
-        if (existed) {
-            if (on == false)
-                clusterState.delete_node(path);
-
-        } else {
-            if (on == true) {
-                clusterState.set_ephemeral_node(path, null, acls);
-            }
-        }
-    }
-
-    // if the backpresure/storm-id dir is empty, this topology has throttle-on, otherwise not.
-    @Override
-    public boolean topologyBackpressure(String stormId, IFn callback) {
-        if (callback != null) {
-            backPressureCallback.put(stormId, callback);
-        }
-        String path = Cluster.backpressureStormRoot(stormId);
-        List<String> childrens = clusterState.get_children(path, callback != null);
-        return childrens.size() > 0;
-
-    }
-
-    @Override
-    public void setupBackpressure(String stormId) {
-        clusterState.mkdirs(Cluster.backpressureStormRoot(stormId), acls);
-    }
-
-    @Override
-    public void removeWorkerBackpressure(String stormId, String node, Long port) {
-        clusterState.delete_node(Cluster.backpressurePath(stormId, node, port));
-    }
-
-    @Override
-    public void activateStorm(String stormId, StormBase stormBase) {
-        String path = Cluster.stormPath(stormId);
-        clusterState.set_data(path, Utils.serialize(stormBase), acls);
-    }
-
-    // maybe exit some questions for updateStorm
-    @Override
-    public void updateStorm(String stormId, StormBase newElems) {
-
-        StormBase stormBase = stormBase(stormId, null);
-        if (stormBase.get_component_executors() != null) {
-
-            Map<String, Integer> newComponentExecutors = new HashMap<>();
-            Map<String, Integer> componentExecutors = newElems.get_component_executors();
-            //componentExecutors maybe be APersistentMap, which don't support put
-            for (Map.Entry<String, Integer> entry : componentExecutors.entrySet()) {
-                    newComponentExecutors.put(entry.getKey(), entry.getValue());
-            }
-            for (Map.Entry<String, Integer> entry : stormBase.get_component_executors().entrySet()) {
-                if (!componentExecutors.containsKey(entry.getKey())) {
-                    newComponentExecutors.put(entry.getKey(), entry.getValue());
-                }
-            }
-            if (newComponentExecutors.size() > 0)
-                newElems.set_component_executors(newComponentExecutors);
-        }
-
-        Map<String, DebugOptions> ComponentDebug = new HashMap<>();
-        Map<String, DebugOptions> oldComponentDebug = stormBase.get_component_debug();
-
-        Map<String, DebugOptions> newComponentDebug = newElems.get_component_debug();
-
-        Set<String> debugOptionsKeys = oldComponentDebug.keySet();
-        debugOptionsKeys.addAll(newComponentDebug.keySet());
-        for (String key : debugOptionsKeys) {
-            boolean enable = false;
-            double samplingpct = 0;
-            if (oldComponentDebug.containsKey(key)) {
-                enable = oldComponentDebug.get(key).is_enable();
-                samplingpct = oldComponentDebug.get(key).get_samplingpct();
-            }
-            if (newComponentDebug.containsKey(key)) {
-                enable = newComponentDebug.get(key).is_enable();
-                samplingpct += newComponentDebug.get(key).get_samplingpct();
-            }
-            DebugOptions debugOptions = new DebugOptions();
-            debugOptions.set_enable(enable);
-            debugOptions.set_samplingpct(samplingpct);
-            ComponentDebug.put(key, debugOptions);
-        }
-        if (ComponentDebug.size() > 0) {
-            newElems.set_component_debug(ComponentDebug);
-        }
-
-
-        if (StringUtils.isBlank(newElems.get_name())) {
-            newElems.set_name(stormBase.get_name());
-        }
-        if (newElems.get_status() == null){
-            newElems.set_status(stormBase.get_status());
-        }
-        if (newElems.get_num_workers() == 0){
-            newElems.set_num_workers(stormBase.get_num_workers());
-        }
-        if (newElems.get_launch_time_secs() == 0) {
-            newElems.set_launch_time_secs(stormBase.get_launch_time_secs());
-        }
-        if (StringUtils.isBlank(newElems.get_owner())) {
-            newElems.set_owner(stormBase.get_owner());
-        }
-        if (newElems.get_topology_action_options() == null) {
-            newElems.set_topology_action_options(stormBase.get_topology_action_options());
-        }
-        if (newElems.get_status() == null) {
-            newElems.set_status(stormBase.get_status());
-        }
-        clusterState.set_data(Cluster.stormPath(stormId), Utils.serialize(newElems), acls);
-    }
-
-    @Override
-    public void removeStormBase(String stormId) {
-        clusterState.delete_node(Cluster.stormPath(stormId));
-    }
-
-    @Override
-    public void setAssignment(String stormId, Assignment info) {
-        clusterState.set_data(Cluster.assignmentPath(stormId), Utils.serialize(info), acls);
-    }
-
-    @Override
-    public void setupBlobstore(String key, NimbusInfo nimbusInfo, Integer versionInfo) {
-        String path = Cluster.blobstorePath(key) + Cluster.ZK_SEPERATOR + nimbusInfo.toHostPortString() + "-" + versionInfo;
-        LOG.info("set-path: {}", path);
-        clusterState.mkdirs(Cluster.blobstorePath(key), acls);
-        clusterState.delete_node_blobstore(Cluster.blobstorePath(key), nimbusInfo.toHostPortString());
-        clusterState.set_ephemeral_node(path, null, acls);
-    }
-
-    @Override
-    public List<String> activeKeys() {
-        return clusterState.get_children(Cluster.BLOBSTORE_SUBTREE, false);
-    }
-
-    // blobstore state
-    @Override
-    public List<String> blobstore(IFn callback) {
-        if (callback != null) {
-            blobstoreCallback.set(callback);
-        }
-        clusterState.sync_path(Cluster.BLOBSTORE_SUBTREE);
-        return clusterState.get_children(Cluster.BLOBSTORE_SUBTREE, callback != null);
-
-    }
-
-    @Override
-    public void removeStorm(String stormId) {
-        clusterState.delete_node(Cluster.assignmentPath(stormId));
-        clusterState.delete_node(Cluster.credentialsPath(stormId));
-        clusterState.delete_node(Cluster.logConfigPath(stormId));
-        clusterState.delete_node(Cluster.profilerConfigPath(stormId));
-        removeStormBase(stormId);
-    }
-
-    @Override
-    public void removeBlobstoreKey(String blobKey) {
-        LOG.debug("remove key {}", blobKey);
-        clusterState.delete_node(Cluster.blobstorePath(blobKey));
-    }
-
-    @Override
-    public void removeKeyVersion(String blobKey) {
-        clusterState.delete_node(Cluster.blobstoreMaxKeySequenceNumberPath(blobKey));
-    }
-
-    @Override
-    public void reportError(String stormId, String componentId, String node, Integer port, String error) {
-
-        try {
-            String path = Cluster.errorPath(stormId, componentId);
-            String lastErrorPath = Cluster.lastErrorPath(stormId, componentId);
-            ErrorInfo errorInfo = new ErrorInfo(error, Time.currentTimeSecs());
-            errorInfo.set_host(node);
-            errorInfo.set_port(port.intValue());
-            byte[] serData = Utils.serialize(errorInfo);
-            clusterState.mkdirs(path, acls);
-            clusterState.create_sequential(path + Cluster.ZK_SEPERATOR + "e", serData, acls);
-            clusterState.set_data(lastErrorPath, serData, acls);
-            List<String> childrens = clusterState.get_children(path, false);
-
-            Collections.sort(childrens);
-
-            while (childrens.size() >= 10) {
-                clusterState.delete_node(path + Cluster.ZK_SEPERATOR + childrens.remove(0));
-            }
-        } catch (UnsupportedEncodingException e) {
-            throw Utils.wrapInRuntime(e);
-        }
-    }
-
-    @Override
-    public List<ErrorInfo> errors(String stormId, String componentId) {
-        List<ErrorInfo> errorInfos = new ArrayList<>();
-        try {
-            String path = Cluster.errorPath(stormId, componentId);
-            if (clusterState.node_exists(path, false)) {
-                List<String> childrens = clusterState.get_children(path, false);
-                for (String child : childrens) {
-                    String childPath = path + Cluster.ZK_SEPERATOR + child;
-                    ErrorInfo errorInfo = Cluster.maybeDeserialize(clusterState.get_data(childPath, false), ErrorInfo.class);
-                    if (errorInfo != null)
-                        errorInfos.add(errorInfo);
-                }
-            }
-            Collections.sort(errorInfos, new Comparator<ErrorInfo>() {
-                public int compare(ErrorInfo arg0, ErrorInfo arg1) {
-                    return Integer.compare(arg0.get_error_time_secs(), arg1.get_error_time_secs());
-                }
-            });
-        } catch (Exception e) {
-            throw Utils.wrapInRuntime(e);
-        }
-
-        return errorInfos;
-    }
-
-    @Override
-    public ErrorInfo lastError(String stormId, String componentId) {
-        try {
-            String path = Cluster.lastErrorPath(stormId, componentId);
-            if (clusterState.node_exists(path, false)) {
-                ErrorInfo errorInfo = Cluster.maybeDeserialize(clusterState.get_data(path, false), ErrorInfo.class);
-                return errorInfo;
-            }
-        } catch (UnsupportedEncodingException e) {
-            throw Utils.wrapInRuntime(e);
-        }
-        return null;
-    }
-
-    @Override
-    public void setCredentials(String stormId, Credentials creds, Map topoConf) throws NoSuchAlgorithmException {
-        List<ACL> aclList = Cluster.mkTopoOnlyAcls(topoConf);
-        String path = Cluster.credentialsPath(stormId);
-        clusterState.set_data(path, Utils.serialize(creds), aclList);
-
-    }
-
-    @Override
-    public Credentials credentials(String stormId, IFn callback) {
-        if (callback != null) {
-            credentialsCallback.put(stormId, callback);
-        }
-        String path = Cluster.credentialsPath(stormId);
-        return Cluster.maybeDeserialize(clusterState.get_data(path, callback != null), Credentials.class);
-
-    }
-
-    @Override
-    public void disconnect() {
-        clusterState.unregister(stateId);
-        if (solo)
-            clusterState.close();
-    }
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/55b86ca4/storm-core/src/jvm/org/apache/storm/cluster/ZKStateStorage.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/cluster/ZKStateStorage.java b/storm-core/src/jvm/org/apache/storm/cluster/ZKStateStorage.java
new file mode 100644
index 0000000..8ac0adc
--- /dev/null
+++ b/storm-core/src/jvm/org/apache/storm/cluster/ZKStateStorage.java
@@ -0,0 +1,271 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.cluster;
+
+import clojure.lang.APersistentMap;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.state.*;
+import org.apache.curator.framework.state.ConnectionState;
+import org.apache.storm.Config;
+import org.apache.storm.callback.DefaultWatcherCallBack;
+import org.apache.storm.callback.WatcherCallBack;
+import org.apache.storm.callback.ZKStateChangedCallback;
+import org.apache.storm.utils.Utils;
+import org.apache.storm.zookeeper.Zookeeper;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.data.ACL;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+public class ZKStateStorage implements StateStorage {
+
+    private static Logger LOG = LoggerFactory.getLogger(ZKStateStorage.class);
+
+    private ConcurrentHashMap<String, ZKStateChangedCallback> callbacks = new ConcurrentHashMap<String, ZKStateChangedCallback>();
+    private CuratorFramework zkWriter;
+    private CuratorFramework zkReader;
+    private AtomicBoolean active;
+
+    private boolean isNimbus;
+    private Map authConf;
+    private Map<Object, Object> conf;
+
+    public ZKStateStorage(Map<Object, Object> conf, Map authConf, List<ACL> acls, ClusterStateContext context) throws Exception {
+        this.conf = conf;
+        this.authConf = authConf;
+        if (context.getDaemonType().equals(DaemonType.NIMBUS))
+            this.isNimbus = true;
+
+        // just mkdir STORM_ZOOKEEPER_ROOT dir
+        CuratorFramework zkTemp = mkZk();
+        String rootPath = String.valueOf(conf.get(Config.STORM_ZOOKEEPER_ROOT));
+        Zookeeper.mkdirs(zkTemp, rootPath, acls);
+        zkTemp.close();
+
+        active = new AtomicBoolean(true);
+        zkWriter = mkZk(new WatcherCallBack() {
+            @Override
+            public void execute(Watcher.Event.KeeperState state, Watcher.Event.EventType type, String path) {
+                if (active.get()) {
+                    if (!(state.equals(Watcher.Event.KeeperState.SyncConnected))) {
+                        LOG.warn("Received event {} : {}: {} with disconnected Zookeeper.", state, type, path);
+                    } else {
+                        LOG.info("Received event {} : {} : {}", state, type, path);
+                    }
+
+                    if (!type.equals(Watcher.Event.EventType.None)) {
+                        for (Map.Entry<String, ZKStateChangedCallback> e : callbacks.entrySet()) {
+                            ZKStateChangedCallback fn = e.getValue();
+                            fn.changed(type, path);
+                        }
+                    }
+                }
+            }
+        });
+        if (isNimbus) {
+            zkReader = mkZk(new WatcherCallBack() {
+                @Override
+                public void execute(Watcher.Event.KeeperState state, Watcher.Event.EventType type, String path) {
+                    if (active.get()) {
+                        if (!(state.equals(Watcher.Event.KeeperState.SyncConnected))) {
+                            LOG.warn("Received event {} : {}: {} with disconnected Zookeeper.", state, type, path);
+                        } else {
+                            LOG.debug("Received event {} : {} : {}", state, type, path);
+                        }
+
+                        if (!type.equals(Watcher.Event.EventType.None)) {
+                            for (Map.Entry<String, ZKStateChangedCallback> e : callbacks.entrySet()) {
+                                ZKStateChangedCallback fn = e.getValue();
+                                fn.changed(type, path);
+                            }
+                        }
+                    }
+                }
+            });
+        } else {
+            zkReader = zkWriter;
+        }
+
+    }
+
+    @SuppressWarnings("unchecked")
+    private CuratorFramework mkZk() throws IOException {
+        return Zookeeper.mkClient(conf, (List<String>) conf.get(Config.STORM_ZOOKEEPER_SERVERS), conf.get(Config.STORM_ZOOKEEPER_PORT), "",
+                new DefaultWatcherCallBack(), authConf);
+    }
+
+    @SuppressWarnings("unchecked")
+    private CuratorFramework mkZk(WatcherCallBack watcher) throws NumberFormatException, IOException {
+        return Zookeeper.mkClient(conf, (List<String>) conf.get(Config.STORM_ZOOKEEPER_SERVERS), conf.get(Config.STORM_ZOOKEEPER_PORT),
+                String.valueOf(conf.get(Config.STORM_ZOOKEEPER_ROOT)), watcher, authConf);
+    }
+
+    @Override
+    public void delete_node_blobstore(String path, String nimbusHostPortInfo) {
+
+    }
+
+    @Override
+    public String register(ZKStateChangedCallback callback) {
+        String id = UUID.randomUUID().toString();
+        this.callbacks.put(id, callback);
+        return id;
+    }
+
+    @Override
+    public void unregister(String id) {
+        this.callbacks.remove(id);
+    }
+
+    @Override
+    public String create_sequential(String path, byte[] data, List<ACL> acls) {
+        return Zookeeper.createNode(zkWriter, path, data, CreateMode.EPHEMERAL_SEQUENTIAL, acls);
+    }
+
+    @Override
+    public void mkdirs(String path, List<ACL> acls) {
+        Zookeeper.mkdirs(zkWriter, path, acls);
+    }
+
+    @Override
+    public void delete_node(String path) {
+        Zookeeper.deleteNode(zkWriter, path);
+    }
+
+    @Override
+    public void set_ephemeral_node(String path, byte[] data, List<ACL> acls) {
+        Zookeeper.mkdirs(zkWriter, parentPath(path), acls);
+        if (Zookeeper.exists(zkWriter, path, false)) {
+            try {
+                Zookeeper.setData(zkWriter, path, data);
+            } catch (Exception e) {
+                if (Utils.exceptionCauseIsInstanceOf(KeeperException.NoNodeException.class, e)) {
+                    Zookeeper.createNode(zkWriter, path, data, CreateMode.EPHEMERAL, acls);
+                } else {
+                    throw Utils.wrapInRuntime(e);
+                }
+            }
+
+        } else {
+            Zookeeper.createNode(zkWriter, path, data, CreateMode.EPHEMERAL, acls);
+        }
+    }
+
+    @Override
+    public Integer get_version(String path, boolean watch) throws Exception {
+        Integer ret = Zookeeper.getVersion(zkReader, path, watch);
+        return ret;
+    }
+
+    @Override
+    public boolean node_exists(String path, boolean watch) {
+        return Zookeeper.existsNode(zkWriter, path, watch);
+    }
+
+    @Override
+    public List<String> get_children(String path, boolean watch) {
+        return Zookeeper.getChildren(zkReader, path, watch);
+    }
+
+    @Override
+    public void close() {
+        this.active.set(false);
+        zkWriter.close();
+        if (isNimbus) {
+            zkReader.close();
+        }
+    }
+
+    @Override
+    public void set_data(String path, byte[] data, List<ACL> acls) {
+        if (Zookeeper.exists(zkWriter, path, false)) {
+            Zookeeper.setData(zkWriter, path, data);
+        } else {
+            Zookeeper.mkdirs(zkWriter, parentPath(path), acls);
+            Zookeeper.createNode(zkWriter, path, data, CreateMode.PERSISTENT, acls);
+        }
+    }
+
+    @Override
+    public byte[] get_data(String path, boolean watch) {
+        byte[] ret = null;
+
+        ret = Zookeeper.getData(zkReader, path, watch);
+
+        return ret;
+    }
+
+    @Override
+    public APersistentMap get_data_with_version(String path, boolean watch) {
+        return Zookeeper.getDataWithVersion(zkReader, path, watch);
+    }
+
+    @Override
+    public void set_worker_hb(String path, byte[] data, List<ACL> acls) {
+        set_data(path, data, acls);
+    }
+
+    @Override
+    public byte[] get_worker_hb(String path, boolean watch) {
+        return Zookeeper.getData(zkReader, path, watch);
+    }
+
+    @Override
+    public List<String> get_worker_hb_children(String path, boolean watch) {
+        return get_children(path, watch);
+    }
+
+    @Override
+    public void delete_worker_hb(String path) {
+        delete_node(path);
+    }
+
+    @Override
+    public void add_listener(final ConnectionStateListener listener) {
+        Zookeeper.addListener(zkReader, new ConnectionStateListener() {
+            @Override
+            public void stateChanged(CuratorFramework curatorFramework, ConnectionState connectionState) {
+                listener.stateChanged(curatorFramework, connectionState);
+            }
+        });
+    }
+
+    @Override
+    public void sync_path(String path) {
+        Zookeeper.syncPath(zkWriter, path);
+    }
+
+    // To be remove when finished port Util.clj
+    public static String parentPath(String path) {
+        List<String> toks = Zookeeper.tokenizePath(path);
+        int size = toks.size();
+        if (size > 0) {
+            toks.remove(size - 1);
+        }
+        return Zookeeper.toksToPath(toks);
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/55b86ca4/storm-core/src/jvm/org/apache/storm/cluster/ZKStateStorageFactory.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/cluster/ZKStateStorageFactory.java b/storm-core/src/jvm/org/apache/storm/cluster/ZKStateStorageFactory.java
new file mode 100644
index 0000000..19b04f2
--- /dev/null
+++ b/storm-core/src/jvm/org/apache/storm/cluster/ZKStateStorageFactory.java
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.cluster;
+
+import clojure.lang.APersistentMap;
+import org.apache.storm.utils.Utils;
+import org.apache.zookeeper.data.ACL;
+
+import java.util.List;
+
+public class ZKStateStorageFactory implements StateStorageFactory{
+
+    @Override
+    public StateStorage mkState(APersistentMap config, APersistentMap auth_conf, List<ACL> acls, ClusterStateContext context) {
+        try {
+            return new ZKStateStorage(config, auth_conf, acls, context);
+        }catch (Exception e){
+            throw Utils.wrapInRuntime(e);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/55b86ca4/storm-core/src/jvm/org/apache/storm/testing/staticmocking/MockedCluster.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/testing/staticmocking/MockedCluster.java b/storm-core/src/jvm/org/apache/storm/testing/staticmocking/MockedCluster.java
index 5d67a54..2f1440c 100644
--- a/storm-core/src/jvm/org/apache/storm/testing/staticmocking/MockedCluster.java
+++ b/storm-core/src/jvm/org/apache/storm/testing/staticmocking/MockedCluster.java
@@ -16,16 +16,16 @@
  */
 package org.apache.storm.testing.staticmocking;
 
-import org.apache.storm.cluster.Cluster;
+import org.apache.storm.cluster.ClusterUtils;
 
 public class MockedCluster implements AutoCloseable  {
 
-    public MockedCluster(Cluster inst) {
-        Cluster.setInstance(inst);
+    public MockedCluster(ClusterUtils inst) {
+        ClusterUtils.setInstance(inst);
     }
 
     @Override
     public void close() throws Exception {
-        Cluster.resetInstance();
+        ClusterUtils.resetInstance();
     }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/55b86ca4/storm-core/src/jvm/org/apache/storm/zookeeper/Zookeeper.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/zookeeper/Zookeeper.java b/storm-core/src/jvm/org/apache/storm/zookeeper/Zookeeper.java
index f1c7f32..c280515 100644
--- a/storm-core/src/jvm/org/apache/storm/zookeeper/Zookeeper.java
+++ b/storm-core/src/jvm/org/apache/storm/zookeeper/Zookeeper.java
@@ -86,19 +86,23 @@ public class Zookeeper {
         _instance = INSTANCE;
     }
 
-    public static CuratorFramework mkClient(Map conf, List<String> servers, Object port, String root) {
-        return mkClient(conf, servers, port, root, new DefaultWatcherCallBack());
+    public  CuratorFramework mkClientImpl(Map conf, List<String> servers, Object port, String root) {
+        return mkClientImpl(conf, servers, port, root, new DefaultWatcherCallBack());
     }
 
-    public static CuratorFramework mkClient(Map conf, List<String> servers, Object port, Map authConf) {
-        return mkClient(conf, servers, port, "", new DefaultWatcherCallBack(), authConf);
+    public  CuratorFramework mkClientImpl(Map conf, List<String> servers, Object port, Map authConf) {
+        return mkClientImpl(conf, servers, port, "", new DefaultWatcherCallBack(), authConf);
     }
 
-    public static CuratorFramework mkClient(Map conf, List<String> servers, Object port, String root, Map authConf) {
-        return mkClient(conf, servers, port, root, new DefaultWatcherCallBack(), authConf);
+    public  CuratorFramework mkClientImpl(Map conf, List<String> servers, Object port, String root, Map authConf) {
+        return mkClientImpl(conf, servers, port, root, new DefaultWatcherCallBack(), authConf);
     }
 
     public static CuratorFramework mkClient(Map conf, List<String> servers, Object port, String root, final WatcherCallBack watcher, Map authConf) {
+        return _instance.mkClientImpl(conf, servers, port, root, watcher, authConf);
+    }
+
+    public  CuratorFramework mkClientImpl(Map conf, List<String> servers, Object port, String root, final WatcherCallBack watcher, Map authConf) {
         CuratorFramework fk;
         if (authConf != null) {
             fk = Utils.newCurator(conf, servers, port, root, new ZookeeperAuthInfo(authConf));
@@ -124,8 +128,8 @@ public class Zookeeper {
      *
      * @return
      */
-    public static CuratorFramework mkClient(Map conf, List<String> servers, Object port, String root, final WatcherCallBack watcher) {
-        return mkClient(conf, servers, port, root, watcher, null);
+    public  CuratorFramework mkClientImpl(Map conf, List<String> servers, Object port, String root, final WatcherCallBack watcher) {
+        return mkClientImpl(conf, servers, port, root, watcher, null);
     }
 
     public static String createNode(CuratorFramework zk, String path, byte[] data, org.apache.zookeeper.CreateMode mode, List<ACL> acls) {
@@ -347,7 +351,7 @@ public class Zookeeper {
     protected ILeaderElector zkLeaderElectorImpl(Map conf) throws UnknownHostException {
         List<String> servers = (List<String>) conf.get(Config.STORM_ZOOKEEPER_SERVERS);
         Object port = conf.get(Config.STORM_ZOOKEEPER_PORT);
-        CuratorFramework zk = mkClient(conf, servers, port, "", conf);
+        CuratorFramework zk = mkClientImpl(conf, servers, port, "", conf);
         String leaderLockPath = conf.get(Config.STORM_ZOOKEEPER_ROOT) + "/leader-lock";
         String id = NimbusInfo.fromConf(conf).toHostPortString();
         AtomicReference<LeaderLatch> leaderLatchAtomicReference = new AtomicReference<>(new LeaderLatch(zk, leaderLockPath, id));

http://git-wip-us.apache.org/repos/asf/storm/blob/55b86ca4/storm-core/test/clj/integration/org/apache/storm/integration_test.clj
----------------------------------------------------------------------
diff --git a/storm-core/test/clj/integration/org/apache/storm/integration_test.clj b/storm-core/test/clj/integration/org/apache/storm/integration_test.clj
index d374511..d4fab3f 100644
--- a/storm-core/test/clj/integration/org/apache/storm/integration_test.clj
+++ b/storm-core/test/clj/integration/org/apache/storm/integration_test.clj
@@ -21,7 +21,7 @@
   (:import [org.apache.storm.testing TestWordCounter TestWordSpout TestGlobalCount
             TestAggregatesCounter TestConfBolt AckFailMapTracker AckTracker TestPlannerSpout])
   (:import [org.apache.storm.tuple Fields])
-  (:import [org.apache.storm.cluster StormZkClusterState])
+  (:import [org.apache.storm.cluster StormClusterStateImpl])
   (:use [org.apache.storm testing config clojure util converter])
   (:use [org.apache.storm.daemon common])
   (:require [org.apache.storm [thrift :as thrift]]))
@@ -576,7 +576,7 @@
                                              (:topology tracked))
             _ (advance-cluster-time cluster 11)
             storm-id (get-storm-id state "test-errors")
-            errors-count (fn [] (count (clojurify-error (.errors state storm-id "2"))))]
+            errors-count (fn [] (count (.errors state storm-id "2")))]
 
         (is (nil? (clojurify-error (.lastError state storm-id "2"))))
 

http://git-wip-us.apache.org/repos/asf/storm/blob/55b86ca4/storm-core/test/clj/org/apache/storm/cluster_test.clj
----------------------------------------------------------------------
diff --git a/storm-core/test/clj/org/apache/storm/cluster_test.clj b/storm-core/test/clj/org/apache/storm/cluster_test.clj
index d0b9882..fa34355 100644
--- a/storm-core/test/clj/org/apache/storm/cluster_test.clj
+++ b/storm-core/test/clj/org/apache/storm/cluster_test.clj
@@ -23,9 +23,10 @@
   (:import [org.mockito.exceptions.base MockitoAssertionError])
   (:import [org.apache.curator.framework CuratorFramework CuratorFrameworkFactory CuratorFrameworkFactory$Builder])
   (:import [org.apache.storm.utils Utils TestUtils ZookeeperAuthInfo ConfigUtils])
-  (:import [org.apache.storm.cluster ClusterState DistributedClusterState ClusterStateContext StormZkClusterState])
+  (:import [org.apache.storm.cluster StateStorage ZKStateStorage ClusterStateContext StormClusterStateImpl ClusterUtils])
   (:import [org.apache.storm.zookeeper Zookeeper])
-  (:import [org.apache.storm.testing.staticmocking MockedZookeeper])
+  (:import [org.apache.storm.callback ZKStateChangedCallback])
+  (:import [org.apache.storm.testing.staticmocking MockedZookeeper MockedCluster])
   (:require [conjure.core])
   (:use [conjure core])
   (:use [clojure test])
@@ -33,18 +34,18 @@
 
 (defn mk-config [zk-port]
   (merge (clojurify-structure (ConfigUtils/readStormConfig))
-         {STORM-ZOOKEEPER-PORT zk-port
-          STORM-ZOOKEEPER-SERVERS ["localhost"]}))
+    {STORM-ZOOKEEPER-PORT zk-port
+     STORM-ZOOKEEPER-SERVERS ["localhost"]}))
 
 (defn mk-state
   ([zk-port] (let [conf (mk-config zk-port)]
-               (DistributedClusterState. conf conf nil (ClusterStateContext.))))
+               (ClusterUtils/mkDistributedClusterState conf conf nil (ClusterStateContext.))))
   ([zk-port cb]
-     (let [ret (mk-state zk-port)]
-       (.register ret cb)
-       ret )))
+    (let [ret (mk-state zk-port)]
+      (.register ret cb)
+      ret)))
 
-(defn mk-storm-state [zk-port] (StormZkClusterState. (mk-config zk-port) nil (ClusterStateContext.)))
+(defn mk-storm-state [zk-port] (ClusterUtils/mkStormClusterState (mk-config zk-port) nil (ClusterStateContext.)))
 
 (deftest test-basics
   (with-inprocess-zookeeper zk-port
@@ -99,24 +100,27 @@
 
 (defn mk-callback-tester []
   (let [last (atom nil)
-        cb (fn [type path]
-              (reset! last {:type type :path path}))]
+        cb (reify
+             ZKStateChangedCallback
+             (changed
+               [this type path]
+               (reset! last {:type type :path path})))]
     [last cb]
     ))
 
 (defn read-and-reset! [aatom]
   (let [time (System/currentTimeMillis)]
-  (loop []
-    (if-let [val @aatom]
-      (do
-        (reset! aatom nil)
-        val)
-      (do
-        (when (> (- (System/currentTimeMillis) time) 30000)
-          (throw (RuntimeException. "Waited too long for atom to change state")))
-        (Thread/sleep 10)
-        (recur))
-      ))))
+    (loop []
+      (if-let [val @aatom]
+        (do
+          (reset! aatom nil)
+          val)
+        (do
+          (when (> (- (System/currentTimeMillis) time) 30000)
+            (throw (RuntimeException. "Waited too long for atom to change state")))
+          (Thread/sleep 10)
+          (recur))
+        ))))
 
 (deftest test-callbacks
   (with-inprocess-zookeeper zk-port
@@ -189,35 +193,35 @@
       (is (= #{"storm1" "storm3"} (set (.assignments state nil))))
       (is (= assignment2 (clojurify-assignment (.assignmentInfo state "storm1" nil))))
       (is (= assignment1 (clojurify-assignment (.assignmentInfo state "storm3" nil))))
-      
-      (is (= [] (.active-storms state)))
+
+      (is (= [] (.activeStorms state)))
       (.activateStorm state "storm1" (thriftify-storm-base base1))
-      (is (= ["storm1"] (.active-storms state)))
+      (is (= ["storm1"] (.activeStorms state)))
       (is (= base1 (clojurify-storm-base (.stormBase state "storm1" nil))))
       (is (= nil (clojurify-storm-base (.stormBase state "storm2" nil))))
       (.activateStorm state "storm2" (thriftify-storm-base base2))
       (is (= base1 (clojurify-storm-base (.stormBase state "storm1" nil))))
       (is (= base2 (clojurify-storm-base (.stormBase state "storm2" nil))))
-      (is (= #{"storm1" "storm2"} (set (.active-storms state))))
+      (is (= #{"storm1" "storm2"} (set (.activeStorms state))))
       (.removeStormBase state "storm1")
       (is (= base2 (clojurify-storm-base (.stormBase state "storm2" nil))))
-      (is (= #{"storm2"} (set (.active-storms state))))
+      (is (= #{"storm2"} (set (.activeStorms state))))
 
       (is (nil? (clojurify-crdentials (.credentials state "storm1" nil))))
-      (.setCredentials! state "storm1" (thriftify-credentials {"a" "a"}) {})
+      (.setCredentials state "storm1" (thriftify-credentials {"a" "a"}) {})
       (is (= {"a" "a"} (clojurify-crdentials (.credentials state "storm1" nil))))
       (.setCredentials state "storm1" (thriftify-credentials {"b" "b"}) {})
       (is (= {"b" "b"} (clojurify-crdentials (.credentials state "storm1" nil))))
 
-      (is (= [] (.blobstoreInfo state nil)))
-      (.setupBlobstore state "key1" nimbusInfo1 "1")
-      (is (= ["key1"] (.blobstoreInfo state nil)))
+      (is (= [] (.blobstoreInfo state "")))
+      (.setupBlobstore state "key1" nimbusInfo1 (Integer/parseInt "1"))
+      (is (= ["key1"] (.blobstoreInfo state "")))
       (is (= [(str (.toHostPortString nimbusInfo1) "-1")] (.blobstoreInfo state "key1")))
-      (.setupBlobstore state "key1" nimbusInfo2 "1")
+      (.setupBlobstore state "key1" nimbusInfo2 (Integer/parseInt "1"))
       (is (= #{(str (.toHostPortString nimbusInfo1) "-1")
                (str (.toHostPortString nimbusInfo2) "-1")} (set (.blobstoreInfo state "key1"))))
       (.removeBlobstoreKey state "key1")
-      (is (= [] (.blobstoreInfo state nil)))
+      (is (= [] (.blobstoreInfo state "")))
 
       (is (= [] (.nimbuses state)))
       (.addNimbusHost state "nimbus1:port" nimbusSummary1)
@@ -230,11 +234,10 @@
       )))
 
 (defn- validate-errors! [state storm-id component errors-list]
-  (let [errors (clojurify-error (.errors state storm-id component))]
-    ;;(println errors)
+  (let [errors (map clojurify-error (.errors state storm-id component))]
     (is (= (count errors) (count errors-list)))
     (doseq [[error target] (map vector errors errors-list)]
-      (when-not  (.contains (:error error) target)
+      (when-not (.contains (:error error) target)
         (println target " => " (:error error)))
       (is (.contains (:error error) target))
       )))
@@ -257,8 +260,9 @@
           (.reportError state "a" "2" (local-hostname) 6700 (stringify-error (IllegalArgumentException.)))
           (advance-time-secs! 2))
         (validate-errors! state "a" "2" (concat (repeat 5 "IllegalArgumentException")
-                                                (repeat 5 "RuntimeException")
-                                                ))
+                                          (repeat 5 "RuntimeException")
+                                          ))
+
         (.disconnect state)
         ))))
 
@@ -285,23 +289,23 @@
   (with-inprocess-zookeeper zk-port
     (let [builder (Mockito/mock CuratorFrameworkFactory$Builder)
           conf (merge
-                (mk-config zk-port)
-                {STORM-ZOOKEEPER-CONNECTION-TIMEOUT 10
-                 STORM-ZOOKEEPER-SESSION-TIMEOUT 10
-                 STORM-ZOOKEEPER-RETRY-INTERVAL 5
-                 STORM-ZOOKEEPER-RETRY-TIMES 2
-                 STORM-ZOOKEEPER-RETRY-INTERVAL-CEILING 15
-                 STORM-ZOOKEEPER-AUTH-SCHEME "digest"
-                 STORM-ZOOKEEPER-AUTH-PAYLOAD "storm:thisisapoorpassword"})]
+                 (mk-config zk-port)
+                 {STORM-ZOOKEEPER-CONNECTION-TIMEOUT 10
+                  STORM-ZOOKEEPER-SESSION-TIMEOUT 10
+                  STORM-ZOOKEEPER-RETRY-INTERVAL 5
+                  STORM-ZOOKEEPER-RETRY-TIMES 2
+                  STORM-ZOOKEEPER-RETRY-INTERVAL-CEILING 15
+                  STORM-ZOOKEEPER-AUTH-SCHEME "digest"
+                  STORM-ZOOKEEPER-AUTH-PAYLOAD "storm:thisisapoorpassword"})]
       (. (Mockito/when (.connectString builder (Mockito/anyString))) (thenReturn builder))
       (. (Mockito/when (.connectionTimeoutMs builder (Mockito/anyInt))) (thenReturn builder))
       (. (Mockito/when (.sessionTimeoutMs builder (Mockito/anyInt))) (thenReturn builder))
       (TestUtils/testSetupBuilder builder (str zk-port "/") conf (ZookeeperAuthInfo. conf))
       (is (nil?
-           (try
-             (. (Mockito/verify builder) (authorization "digest" (.getBytes (conf STORM-ZOOKEEPER-AUTH-PAYLOAD))))
-             (catch MockitoAssertionError e
-               e)))))))
+            (try
+              (. (Mockito/verify builder) (authorization "digest" (.getBytes (conf STORM-ZOOKEEPER-AUTH-PAYLOAD))))
+              (catch MockitoAssertionError e
+                e)))))))
 
 (deftest test-storm-state-callbacks
   ;; TODO finish
@@ -309,13 +313,17 @@
 
 (deftest test-cluster-state-default-acls
   (testing "The default ACLs are empty."
-    (let [zk-mock (Mockito/mock Zookeeper)]
+    (let [zk-mock (Mockito/mock Zookeeper)
+          curator-frameworke (reify CuratorFramework (^void close [this] nil))]
       ;; No need for when clauses because we just want to return nil
       (with-open [_ (MockedZookeeper. zk-mock)]
-          (. (Mockito/when (Mockito/mock Zookeeper)) (thenReturn (reify CuratorFramework (^void close [this] nil))))
-          (. (Mockito/when (Mockito/mock DistributedClusterState)) (thenReturn {}))
-          (. (Mockito/when (Mockito/mock StormZkClusterState)) (thenReturn (reify ClusterState
-                                                                             (register [this callback] nil)
-                                                                             (mkdirs [this path acls] nil))))
-          (.mkdirsImpl (Mockito/verify zk-mock (Mockito/times 1)) (Mockito/any) (Mockito/anyString) (Mockito/eq nil))))))
-
+        (. (Mockito/when (.mkClientImpl zk-mock (Mockito/anyMap) (Mockito/anyList) (Mockito/any) (Mockito/anyString) (Mockito/any) (Mockito/anyMap))) (thenReturn curator-frameworke))
+        (ClusterUtils/mkDistributedClusterState {} nil nil (ClusterStateContext.))
+        (.mkdirsImpl (Mockito/verify zk-mock (Mockito/times 1)) (Mockito/any) (Mockito/anyString) (Mockito/eq nil))))
+    (let [distributed-state-storage (reify StateStorage
+                                      (register [this callback] nil)
+                                      (mkdirs [this path acls] nil))
+          cluster-utils (Mockito/mock ClusterUtils)]
+      (with-open [mocked-cluster (MockedCluster. cluster-utils)]
+        (. (Mockito/when (.mkDistributedClusterStateImpl cluster-utils (Mockito/any) (Mockito/any) (Mockito/eq nil) (Mockito/any))) (thenReturn distributed-state-storage))
+        (ClusterUtils/mkStormClusterState {} nil (ClusterStateContext.))))))
\ No newline at end of file