You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by bo...@apache.org on 2016/02/24 17:17:23 UTC

[13/27] storm git commit: Merge branch 'master' into ClusterUtils

http://git-wip-us.apache.org/repos/asf/storm/blob/924f9a29/storm-core/src/jvm/org/apache/storm/cluster/StormClusterStateImpl.java
----------------------------------------------------------------------
diff --cc storm-core/src/jvm/org/apache/storm/cluster/StormClusterStateImpl.java
index 8df5885,0000000..17c8641
mode 100644,000000..100644
--- a/storm-core/src/jvm/org/apache/storm/cluster/StormClusterStateImpl.java
+++ b/storm-core/src/jvm/org/apache/storm/cluster/StormClusterStateImpl.java
@@@ -1,664 -1,0 +1,687 @@@
 +/**
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + * http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.storm.cluster;
 +
 +import clojure.lang.*;
 +import org.apache.commons.lang.StringUtils;
 +import org.apache.curator.framework.CuratorFramework;
 +import org.apache.curator.framework.state.*;
 +import org.apache.curator.framework.state.ConnectionState;
 +import org.apache.storm.callback.ZKStateChangedCallback;
 +import org.apache.storm.generated.*;
 +import org.apache.storm.nimbus.NimbusInfo;
 +import org.apache.storm.utils.Time;
 +import org.apache.storm.utils.Utils;
 +import org.apache.storm.zookeeper.Zookeeper;
 +import org.apache.zookeeper.KeeperException;
 +import org.apache.zookeeper.Watcher;
 +import org.apache.zookeeper.data.ACL;
 +import org.slf4j.Logger;
 +import org.slf4j.LoggerFactory;
 +
++import java.io.PrintWriter;
++import java.io.StringWriter;
 +import java.security.NoSuchAlgorithmException;
 +import java.util.*;
 +import java.util.concurrent.ConcurrentHashMap;
 +import java.util.concurrent.atomic.AtomicReference;
 +
 +public class StormClusterStateImpl implements IStormClusterState {
 +
 +    private static Logger LOG = LoggerFactory.getLogger(StormClusterStateImpl.class);
 +
 +    private IStateStorage stateStorage;
 +
 +    private ConcurrentHashMap<String, IFn> assignmentInfoCallback;
 +    private ConcurrentHashMap<String, IFn> assignmentInfoWithVersionCallback;
 +    private ConcurrentHashMap<String, IFn> assignmentVersionCallback;
 +    private AtomicReference<IFn> supervisorsCallback;
 +    // we want to reigister a topo directory getChildren callback for all workers of this dir
 +    private ConcurrentHashMap<String, IFn> backPressureCallback;
 +    private AtomicReference<IFn> assignmentsCallback;
 +    private ConcurrentHashMap<String, IFn> stormBaseCallback;
 +    private AtomicReference<IFn> blobstoreCallback;
 +    private ConcurrentHashMap<String, IFn> credentialsCallback;
 +    private ConcurrentHashMap<String, IFn> logConfigCallback;
 +
 +    private List<ACL> acls;
 +    private String stateId;
 +    private boolean solo;
 +
 +    public StormClusterStateImpl(IStateStorage StateStorage, List<ACL> acls, ClusterStateContext context, boolean solo) throws Exception {
 +
 +        this.stateStorage = StateStorage;
 +        this.solo = solo;
++        this.acls = acls;
 +
 +        assignmentInfoCallback = new ConcurrentHashMap<>();
 +        assignmentInfoWithVersionCallback = new ConcurrentHashMap<>();
 +        assignmentVersionCallback = new ConcurrentHashMap<>();
 +        supervisorsCallback = new AtomicReference<>();
 +        backPressureCallback = new ConcurrentHashMap<>();
 +        assignmentsCallback = new AtomicReference<>();
 +        stormBaseCallback = new ConcurrentHashMap<>();
 +        credentialsCallback = new ConcurrentHashMap<>();
 +        logConfigCallback = new ConcurrentHashMap<>();
 +        blobstoreCallback = new AtomicReference<>();
 +
 +        stateId = this.stateStorage.register(new ZKStateChangedCallback() {
 +
 +            public void changed(Watcher.Event.EventType type, String path) {
 +                List<String> toks = Zookeeper.tokenizePath(path);
 +                int size = toks.size();
 +                if (size >= 1) {
-                     String params = null;
 +                    String root = toks.get(0);
-                     IFn fn = null;
 +                    if (root.equals(ClusterUtils.ASSIGNMENTS_ROOT)) {
 +                        if (size == 1) {
 +                            // set null and get the old value
 +                            issueCallback(assignmentsCallback);
 +                        } else {
 +                            issueMapCallback(assignmentInfoCallback, toks.get(1));
 +                            issueMapCallback(assignmentVersionCallback, toks.get(1));
 +                            issueMapCallback(assignmentInfoWithVersionCallback, toks.get(1));
 +                        }
 +
 +                    } else if (root.equals(ClusterUtils.SUPERVISORS_ROOT)) {
 +                        issueCallback(supervisorsCallback);
 +                    } else if (root.equals(ClusterUtils.BLOBSTORE_ROOT)) {
 +                        issueCallback(blobstoreCallback);
 +                    } else if (root.equals(ClusterUtils.STORMS_ROOT) && size > 1) {
 +                        issueMapCallback(stormBaseCallback, toks.get(1));
 +                    } else if (root.equals(ClusterUtils.CREDENTIALS_ROOT) && size > 1) {
 +                        issueMapCallback(credentialsCallback, toks.get(1));
 +                    } else if (root.equals(ClusterUtils.LOGCONFIG_ROOT) && size > 1) {
 +                        issueMapCallback(logConfigCallback, toks.get(1));
 +                    } else if (root.equals(ClusterUtils.BACKPRESSURE_ROOT) && size > 1) {
 +                        issueMapCallback(logConfigCallback, toks.get(1));
 +                    } else {
 +                        LOG.error("{} Unknown callback for subtree {}", new RuntimeException("Unknown callback for this path"), path);
 +                        Runtime.getRuntime().exit(30);
 +                    }
 +
 +                }
 +
 +                return;
 +            }
 +
 +        });
 +
 +        String[] pathlist = { ClusterUtils.ASSIGNMENTS_SUBTREE, ClusterUtils.STORMS_SUBTREE, ClusterUtils.SUPERVISORS_SUBTREE, ClusterUtils.WORKERBEATS_SUBTREE,
 +                ClusterUtils.ERRORS_SUBTREE, ClusterUtils.BLOBSTORE_SUBTREE, ClusterUtils.NIMBUSES_SUBTREE, ClusterUtils.LOGCONFIG_SUBTREE };
 +        for (String path : pathlist) {
 +            this.stateStorage.mkdirs(path, acls);
 +        }
 +
 +    }
 +
 +    protected void issueCallback(AtomicReference<IFn> cb) {
 +        IFn callback = cb.getAndSet(null);
 +        if (callback != null)
 +            callback.invoke();
 +    }
 +
 +    protected void issueMapCallback(ConcurrentHashMap<String, IFn> callbackConcurrentHashMap, String key) {
 +        IFn callback = callbackConcurrentHashMap.remove(key);
 +        if (callback != null)
 +            callback.invoke();
 +    }
 +
 +    @Override
 +    public List<String> assignments(IFn callback) {
 +        if (callback != null) {
 +            assignmentsCallback.set(callback);
 +        }
 +        return stateStorage.get_children(ClusterUtils.ASSIGNMENTS_SUBTREE, callback != null);
 +    }
 +
 +    @Override
 +    public Assignment assignmentInfo(String stormId, IFn callback) {
 +        if (callback != null) {
 +            assignmentInfoCallback.put(stormId, callback);
 +        }
 +        byte[] serialized = stateStorage.get_data(ClusterUtils.assignmentPath(stormId), callback != null);
 +        return ClusterUtils.maybeDeserialize(serialized, Assignment.class);
 +    }
 +
 +    @Override
 +    public APersistentMap assignmentInfoWithVersion(String stormId, IFn callback) {
 +        if (callback != null) {
 +            assignmentInfoWithVersionCallback.put(stormId, callback);
 +        }
 +        Assignment assignment = null;
 +        Integer version = 0;
 +        APersistentMap aPersistentMap = stateStorage.get_data_with_version(ClusterUtils.assignmentPath(stormId), callback != null);
 +        if (aPersistentMap != null) {
 +            assignment = ClusterUtils.maybeDeserialize((byte[]) aPersistentMap.get(RT.keyword(null, "data")), Assignment.class);
 +            version = (Integer) aPersistentMap.get(RT.keyword(null, "version"));
 +        }
 +        APersistentMap map = new PersistentArrayMap(new Object[] { RT.keyword(null, "data"), assignment, RT.keyword(null, "version"), version });
 +        return map;
 +    }
 +
 +    @Override
 +    public Integer assignmentVersion(String stormId, IFn callback) throws Exception {
 +        if (callback != null) {
 +            assignmentVersionCallback.put(stormId, callback);
 +        }
 +        return stateStorage.get_version(ClusterUtils.assignmentPath(stormId), callback != null);
 +    }
 +
 +    // blobstore state
 +    @Override
 +    public List<String> blobstoreInfo(String blobKey) {
 +        String path = ClusterUtils.blobstorePath(blobKey);
 +        stateStorage.sync_path(path);
 +        return stateStorage.get_children(path, false);
 +    }
 +
 +    @Override
 +    public List nimbuses() {
 +        List<NimbusSummary> nimbusSummaries = new ArrayList<>();
 +        List<String> nimbusIds = stateStorage.get_children(ClusterUtils.NIMBUSES_SUBTREE, false);
 +        for (String nimbusId : nimbusIds) {
 +            byte[] serialized = stateStorage.get_data(ClusterUtils.nimbusPath(nimbusId), false);
 +            NimbusSummary nimbusSummary = ClusterUtils.maybeDeserialize(serialized, NimbusSummary.class);
 +            nimbusSummaries.add(nimbusSummary);
 +        }
 +        return nimbusSummaries;
 +    }
 +
 +    @Override
 +    public void addNimbusHost(final String nimbusId, final NimbusSummary nimbusSummary) {
 +        // explicit delete for ephmeral node to ensure this session creates the entry.
 +        stateStorage.delete_node(ClusterUtils.nimbusPath(nimbusId));
 +        stateStorage.add_listener(new ConnectionStateListener() {
 +            @Override
 +            public void stateChanged(CuratorFramework curatorFramework, ConnectionState connectionState) {
 +                LOG.info("Connection state listener invoked, zookeeper connection state has changed to {}", connectionState);
 +                if (connectionState.equals(ConnectionState.RECONNECTED)) {
 +                    LOG.info("Connection state has changed to reconnected so setting nimbuses entry one more time");
 +                    stateStorage.set_ephemeral_node(ClusterUtils.nimbusPath(nimbusId), Utils.serialize(nimbusSummary), acls);
 +                }
 +
 +            }
 +        });
 +
 +        stateStorage.set_ephemeral_node(ClusterUtils.nimbusPath(nimbusId), Utils.serialize(nimbusSummary), acls);
 +    }
 +
 +    @Override
 +    public List<String> activeStorms() {
 +        return stateStorage.get_children(ClusterUtils.STORMS_SUBTREE, false);
 +    }
 +
 +    @Override
 +    public StormBase stormBase(String stormId, IFn callback) {
 +        if (callback != null) {
 +            stormBaseCallback.put(stormId, callback);
 +        }
 +        return ClusterUtils.maybeDeserialize(stateStorage.get_data(ClusterUtils.stormPath(stormId), callback != null), StormBase.class);
 +    }
 +
 +    @Override
 +    public ClusterWorkerHeartbeat getWorkerHeartbeat(String stormId, String node, Long port) {
 +        byte[] bytes = stateStorage.get_worker_hb(ClusterUtils.workerbeatPath(stormId, node, port), false);
 +        return ClusterUtils.maybeDeserialize(bytes, ClusterWorkerHeartbeat.class);
 +
 +    }
 +
 +    @Override
 +    public List<ProfileRequest> getWorkerProfileRequests(String stormId, NodeInfo nodeInfo, boolean isThrift) {
 +        List<ProfileRequest> requests = new ArrayList<>();
 +        List<ProfileRequest> profileRequests = getTopologyProfileRequests(stormId, isThrift);
 +        for (ProfileRequest profileRequest : profileRequests) {
 +            NodeInfo nodeInfo1 = profileRequest.get_nodeInfo();
 +            if (nodeInfo1.equals(nodeInfo))
 +                requests.add(profileRequest);
 +        }
 +        return requests;
 +    }
 +
 +    @Override
 +    public List<ProfileRequest> getTopologyProfileRequests(String stormId, boolean isThrift) {
 +        List<ProfileRequest> profileRequests = new ArrayList<>();
 +        String path = ClusterUtils.profilerConfigPath(stormId);
 +        if (stateStorage.node_exists(path, false)) {
 +            List<String> strs = stateStorage.get_children(path, false);
 +            for (String str : strs) {
 +                String childPath = path + ClusterUtils.ZK_SEPERATOR + str;
 +                byte[] raw = stateStorage.get_data(childPath, false);
 +                ProfileRequest request = ClusterUtils.maybeDeserialize(raw, ProfileRequest.class);
 +                if (request != null)
 +                    profileRequests.add(request);
 +            }
 +        }
 +        return profileRequests;
 +    }
 +
 +    @Override
 +    public void setWorkerProfileRequest(String stormId, ProfileRequest profileRequest) {
 +        ProfileAction profileAction = profileRequest.get_action();
 +        String host = profileRequest.get_nodeInfo().get_node();
 +        Long port = profileRequest.get_nodeInfo().get_port_iterator().next();
 +        String path = ClusterUtils.profilerConfigPath(stormId, host, port, profileAction);
 +        stateStorage.set_data(path, Utils.serialize(profileRequest), acls);
 +    }
 +
 +    @Override
 +    public void deleteTopologyProfileRequests(String stormId, ProfileRequest profileRequest) {
 +        ProfileAction profileAction = profileRequest.get_action();
 +        String host = profileRequest.get_nodeInfo().get_node();
 +        Long port = profileRequest.get_nodeInfo().get_port_iterator().next();
 +        String path = ClusterUtils.profilerConfigPath(stormId, host, port, profileAction);
 +        stateStorage.delete_node(path);
 +    }
 +
-     // need to take executor->node+port in explicitly so that we don't run into a situation where a
-     // long dead worker with a skewed clock overrides all the timestamps. By only checking heartbeats
-     // with an assigned node+port, and only reading executors from that heartbeat that are actually assigned,
-     // we avoid situations like that
++    /**
++     * need to take executor->node+port in explicitly so that we don't run into a situation where a long dead worker with a skewed clock overrides all the
++     * timestamps. By only checking heartbeats with an assigned node+port, and only reading executors from that heartbeat that are actually assigned, we avoid
++     * situations like that
++     * 
++     * @param stormId
++     * @param executorNodePort
++     * @return
++     */
 +    @Override
-     public Map<ExecutorInfo, ClusterWorkerHeartbeat> executorBeats(String stormId, Map<List<Long>, NodeInfo> executorNodePort) {
-         Map<ExecutorInfo, ClusterWorkerHeartbeat> executorWhbs = new HashMap<>();
++    public Map<ExecutorInfo, APersistentMap> executorBeats(String stormId, Map<List<Long>, NodeInfo> executorNodePort) {
++        Map<ExecutorInfo, APersistentMap> executorWhbs = new HashMap<>();
 +
 +        Map<NodeInfo, List<List<Long>>> nodePortExecutors = ClusterUtils.reverseMap(executorNodePort);
 +
 +        for (Map.Entry<NodeInfo, List<List<Long>>> entry : nodePortExecutors.entrySet()) {
 +
 +            String node = entry.getKey().get_node();
 +            Long port = entry.getKey().get_port_iterator().next();
 +            ClusterWorkerHeartbeat whb = getWorkerHeartbeat(stormId, node, port);
 +            List<ExecutorInfo> executorInfoList = new ArrayList<>();
 +            for (List<Long> list : entry.getValue()) {
 +                executorInfoList.add(new ExecutorInfo(list.get(0).intValue(), list.get(list.size() - 1).intValue()));
 +            }
 +            if (whb != null)
 +                executorWhbs.putAll(ClusterUtils.convertExecutorBeats(executorInfoList, whb));
 +        }
 +        return executorWhbs;
 +    }
 +
 +    @Override
 +    public List<String> supervisors(IFn callback) {
 +        if (callback != null) {
 +            supervisorsCallback.set(callback);
 +        }
 +        return stateStorage.get_children(ClusterUtils.SUPERVISORS_SUBTREE, callback != null);
 +    }
 +
 +    @Override
 +    public SupervisorInfo supervisorInfo(String supervisorId) {
 +        String path = ClusterUtils.supervisorPath(supervisorId);
 +        return ClusterUtils.maybeDeserialize(stateStorage.get_data(path, false), SupervisorInfo.class);
 +    }
 +
 +    @Override
 +    public void setupHeatbeats(String stormId) {
 +        stateStorage.mkdirs(ClusterUtils.workerbeatStormRoot(stormId), acls);
 +    }
 +
 +    @Override
 +    public void teardownHeartbeats(String stormId) {
 +        try {
 +            stateStorage.delete_worker_hb(ClusterUtils.workerbeatStormRoot(stormId));
 +        } catch (Exception e) {
 +            if (Zookeeper.exceptionCause(KeeperException.class, e)) {
 +                // do nothing
 +                LOG.warn("Could not teardown heartbeats for {}.", stormId);
 +            } else {
 +                throw e;
 +            }
 +        }
 +    }
 +
 +    @Override
 +    public void teardownTopologyErrors(String stormId) {
 +        try {
 +            stateStorage.delete_node(ClusterUtils.errorStormRoot(stormId));
 +        } catch (Exception e) {
 +            if (Zookeeper.exceptionCause(KeeperException.class, e)) {
 +                // do nothing
 +                LOG.warn("Could not teardown errors for {}.", stormId);
 +            } else {
 +                throw e;
 +            }
 +        }
 +    }
 +
 +    @Override
 +    public List<String> heartbeatStorms() {
 +        return stateStorage.get_worker_hb_children(ClusterUtils.WORKERBEATS_SUBTREE, false);
 +    }
 +
 +    @Override
 +    public List<String> errorTopologies() {
 +        return stateStorage.get_children(ClusterUtils.ERRORS_SUBTREE, false);
 +    }
 +
 +    @Override
 +    public void setTopologyLogConfig(String stormId, LogConfig logConfig) {
 +        stateStorage.set_data(ClusterUtils.logConfigPath(stormId), Utils.serialize(logConfig), acls);
 +    }
 +
 +    @Override
 +    public LogConfig topologyLogConfig(String stormId, IFn cb) {
 +        String path = ClusterUtils.logConfigPath(stormId);
 +        return ClusterUtils.maybeDeserialize(stateStorage.get_data(path, cb != null), LogConfig.class);
 +    }
 +
 +    @Override
 +    public void workerHeartbeat(String stormId, String node, Long port, ClusterWorkerHeartbeat info) {
 +        if (info != null) {
 +            String path = ClusterUtils.workerbeatPath(stormId, node, port);
 +            stateStorage.set_worker_hb(path, Utils.serialize(info), acls);
 +        }
 +    }
 +
 +    @Override
 +    public void removeWorkerHeartbeat(String stormId, String node, Long port) {
 +        String path = ClusterUtils.workerbeatPath(stormId, node, port);
 +        stateStorage.delete_worker_hb(path);
 +    }
 +
 +    @Override
 +    public void supervisorHeartbeat(String supervisorId, SupervisorInfo info) {
 +        String path = ClusterUtils.supervisorPath(supervisorId);
 +        stateStorage.set_ephemeral_node(path, Utils.serialize(info), acls);
 +    }
 +
-     // if znode exists and to be not on?, delete; if exists and on?, do nothing;
-     // if not exists and to be on?, create; if not exists and not on?, do nothing;
++    /**
++     * if znode exists and to be not on?, delete; if exists and on?, do nothing; if not exists and to be on?, create; if not exists and not on?, do nothing;
++     * 
++     * @param stormId
++     * @param node
++     * @param port
++     * @param on
++     */
 +    @Override
 +    public void workerBackpressure(String stormId, String node, Long port, boolean on) {
 +        String path = ClusterUtils.backpressurePath(stormId, node, port);
 +        boolean existed = stateStorage.node_exists(path, false);
 +        if (existed) {
 +            if (on == false)
 +                stateStorage.delete_node(path);
 +
 +        } else {
 +            if (on == true) {
 +                stateStorage.set_ephemeral_node(path, null, acls);
 +            }
 +        }
 +    }
 +
-     // if the backpresure/storm-id dir is empty, this topology has throttle-on, otherwise not.
++    /**
++     * if the backpresure/storm-id dir is empty, this topology has throttle-on, otherwise not.
++     * 
++     * @param stormId
++     * @param callback
++     * @return
++     */
 +    @Override
 +    public boolean topologyBackpressure(String stormId, IFn callback) {
 +        if (callback != null) {
 +            backPressureCallback.put(stormId, callback);
 +        }
 +        String path = ClusterUtils.backpressureStormRoot(stormId);
 +        List<String> childrens = stateStorage.get_children(path, callback != null);
 +        return childrens.size() > 0;
 +
 +    }
 +
 +    @Override
 +    public void setupBackpressure(String stormId) {
 +        stateStorage.mkdirs(ClusterUtils.backpressureStormRoot(stormId), acls);
 +    }
 +
 +    @Override
 +    public void removeWorkerBackpressure(String stormId, String node, Long port) {
 +        stateStorage.delete_node(ClusterUtils.backpressurePath(stormId, node, port));
 +    }
 +
 +    @Override
 +    public void activateStorm(String stormId, StormBase stormBase) {
 +        String path = ClusterUtils.stormPath(stormId);
 +        stateStorage.set_data(path, Utils.serialize(stormBase), acls);
 +    }
 +
-     // To update this function due to APersistentMap/APersistentSet is clojure's structure
++    /**
++     * To update this function due to APersistentMap/APersistentSet is clojure's structure
++     * 
++     * @param stormId
++     * @param newElems
++     */
 +    @Override
 +    public void updateStorm(String stormId, StormBase newElems) {
 +
 +        StormBase stormBase = stormBase(stormId, null);
 +        if (stormBase.get_component_executors() != null) {
 +
 +            Map<String, Integer> newComponentExecutors = new HashMap<>();
 +            Map<String, Integer> componentExecutors = newElems.get_component_executors();
 +            // componentExecutors maybe be APersistentMap, which don't support "put"
 +            for (Map.Entry<String, Integer> entry : componentExecutors.entrySet()) {
 +                newComponentExecutors.put(entry.getKey(), entry.getValue());
 +            }
 +            for (Map.Entry<String, Integer> entry : stormBase.get_component_executors().entrySet()) {
 +                if (!componentExecutors.containsKey(entry.getKey())) {
 +                    newComponentExecutors.put(entry.getKey(), entry.getValue());
 +                }
 +            }
 +            if (newComponentExecutors.size() > 0)
 +                newElems.set_component_executors(newComponentExecutors);
 +        }
 +
 +        Map<String, DebugOptions> ComponentDebug = new HashMap<>();
 +        Map<String, DebugOptions> oldComponentDebug = stormBase.get_component_debug();
 +
 +        Map<String, DebugOptions> newComponentDebug = newElems.get_component_debug();
 +        /// oldComponentDebug.keySet()/ newComponentDebug.keySet() maybe be APersistentSet, which don't support addAll
 +        Set<String> debugOptionsKeys = new HashSet<>();
 +        debugOptionsKeys.addAll(oldComponentDebug.keySet());
 +        debugOptionsKeys.addAll(newComponentDebug.keySet());
 +        for (String key : debugOptionsKeys) {
 +            boolean enable = false;
 +            double samplingpct = 0;
 +            if (oldComponentDebug.containsKey(key)) {
 +                enable = oldComponentDebug.get(key).is_enable();
 +                samplingpct = oldComponentDebug.get(key).get_samplingpct();
 +            }
 +            if (newComponentDebug.containsKey(key)) {
 +                enable = newComponentDebug.get(key).is_enable();
 +                samplingpct += newComponentDebug.get(key).get_samplingpct();
 +            }
 +            DebugOptions debugOptions = new DebugOptions();
 +            debugOptions.set_enable(enable);
 +            debugOptions.set_samplingpct(samplingpct);
 +            ComponentDebug.put(key, debugOptions);
 +        }
 +        if (ComponentDebug.size() > 0) {
 +            newElems.set_component_debug(ComponentDebug);
 +        }
 +
 +        if (StringUtils.isBlank(newElems.get_name())) {
 +            newElems.set_name(stormBase.get_name());
 +        }
 +        if (newElems.get_status() == null) {
 +            newElems.set_status(stormBase.get_status());
 +        }
 +        if (newElems.get_num_workers() == 0) {
 +            newElems.set_num_workers(stormBase.get_num_workers());
 +        }
 +        if (newElems.get_launch_time_secs() == 0) {
 +            newElems.set_launch_time_secs(stormBase.get_launch_time_secs());
 +        }
 +        if (StringUtils.isBlank(newElems.get_owner())) {
 +            newElems.set_owner(stormBase.get_owner());
 +        }
 +        if (newElems.get_topology_action_options() == null) {
 +            newElems.set_topology_action_options(stormBase.get_topology_action_options());
 +        }
 +        if (newElems.get_status() == null) {
 +            newElems.set_status(stormBase.get_status());
 +        }
 +        stateStorage.set_data(ClusterUtils.stormPath(stormId), Utils.serialize(newElems), acls);
 +    }
 +
 +    @Override
 +    public void removeStormBase(String stormId) {
 +        stateStorage.delete_node(ClusterUtils.stormPath(stormId));
 +    }
 +
 +    @Override
 +    public void setAssignment(String stormId, Assignment info) {
 +        stateStorage.set_data(ClusterUtils.assignmentPath(stormId), Utils.serialize(info), acls);
 +    }
 +
 +    @Override
 +    public void setupBlobstore(String key, NimbusInfo nimbusInfo, Integer versionInfo) {
 +        String path = ClusterUtils.blobstorePath(key) + ClusterUtils.ZK_SEPERATOR + nimbusInfo.toHostPortString() + "-" + versionInfo;
 +        LOG.info("set-path: {}", path);
 +        stateStorage.mkdirs(ClusterUtils.blobstorePath(key), acls);
 +        stateStorage.delete_node_blobstore(ClusterUtils.blobstorePath(key), nimbusInfo.toHostPortString());
 +        stateStorage.set_ephemeral_node(path, null, acls);
 +    }
 +
 +    @Override
 +    public List<String> activeKeys() {
 +        return stateStorage.get_children(ClusterUtils.BLOBSTORE_SUBTREE, false);
 +    }
 +
 +    // blobstore state
 +    @Override
 +    public List<String> blobstore(IFn callback) {
 +        if (callback != null) {
 +            blobstoreCallback.set(callback);
 +        }
 +        stateStorage.sync_path(ClusterUtils.BLOBSTORE_SUBTREE);
 +        return stateStorage.get_children(ClusterUtils.BLOBSTORE_SUBTREE, callback != null);
 +
 +    }
 +
 +    @Override
 +    public void removeStorm(String stormId) {
 +        stateStorage.delete_node(ClusterUtils.assignmentPath(stormId));
 +        stateStorage.delete_node(ClusterUtils.credentialsPath(stormId));
 +        stateStorage.delete_node(ClusterUtils.logConfigPath(stormId));
 +        stateStorage.delete_node(ClusterUtils.profilerConfigPath(stormId));
 +        removeStormBase(stormId);
 +    }
 +
 +    @Override
 +    public void removeBlobstoreKey(String blobKey) {
 +        LOG.debug("remove key {}", blobKey);
 +        stateStorage.delete_node(ClusterUtils.blobstorePath(blobKey));
 +    }
 +
 +    @Override
 +    public void removeKeyVersion(String blobKey) {
 +        stateStorage.delete_node(ClusterUtils.blobstoreMaxKeySequenceNumberPath(blobKey));
 +    }
 +
 +    @Override
-     public void reportError(String stormId, String componentId, String node, Long port, String error) {
++    public void reportError(String stormId, String componentId, String node, Long port, Throwable error) {
 +
 +        String path = ClusterUtils.errorPath(stormId, componentId);
 +        String lastErrorPath = ClusterUtils.lastErrorPath(stormId, componentId);
-         ErrorInfo errorInfo = new ErrorInfo(error, Time.currentTimeSecs());
++        ErrorInfo errorInfo = new ErrorInfo(ClusterUtils.StringifyError(error), Time.currentTimeSecs());
 +        errorInfo.set_host(node);
 +        errorInfo.set_port(port.intValue());
 +        byte[] serData = Utils.serialize(errorInfo);
 +        stateStorage.mkdirs(path, acls);
 +        stateStorage.create_sequential(path + ClusterUtils.ZK_SEPERATOR + "e", serData, acls);
 +        stateStorage.set_data(lastErrorPath, serData, acls);
 +        List<String> childrens = stateStorage.get_children(path, false);
 +
 +        Collections.sort(childrens, new Comparator<String>() {
 +            public int compare(String arg0, String arg1) {
 +                return Long.compare(Long.parseLong(arg0.substring(1)), Long.parseLong(arg1.substring(1)));
 +            }
 +        });
 +
 +        while (childrens.size() > 10) {
 +            stateStorage.delete_node(path + ClusterUtils.ZK_SEPERATOR + childrens.remove(0));
 +        }
 +    }
 +
 +    @Override
 +    public List<ErrorInfo> errors(String stormId, String componentId) {
 +        List<ErrorInfo> errorInfos = new ArrayList<>();
 +        try {
 +            String path = ClusterUtils.errorPath(stormId, componentId);
 +            if (stateStorage.node_exists(path, false)) {
 +                List<String> childrens = stateStorage.get_children(path, false);
 +                for (String child : childrens) {
 +                    String childPath = path + ClusterUtils.ZK_SEPERATOR + child;
 +                    ErrorInfo errorInfo = ClusterUtils.maybeDeserialize(stateStorage.get_data(childPath, false), ErrorInfo.class);
 +                    if (errorInfo != null)
 +                        errorInfos.add(errorInfo);
 +                }
 +            }
 +            Collections.sort(errorInfos, new Comparator<ErrorInfo>() {
 +                public int compare(ErrorInfo arg0, ErrorInfo arg1) {
 +                    return Integer.compare(arg1.get_error_time_secs(), arg0.get_error_time_secs());
 +                }
 +            });
 +        } catch (Exception e) {
 +            throw Utils.wrapInRuntime(e);
 +        }
 +
 +        return errorInfos;
 +    }
 +
 +    @Override
 +    public ErrorInfo lastError(String stormId, String componentId) {
 +
 +        String path = ClusterUtils.lastErrorPath(stormId, componentId);
 +        if (stateStorage.node_exists(path, false)) {
 +            ErrorInfo errorInfo = ClusterUtils.maybeDeserialize(stateStorage.get_data(path, false), ErrorInfo.class);
 +            return errorInfo;
 +        }
 +
 +        return null;
 +    }
 +
 +    @Override
 +    public void setCredentials(String stormId, Credentials creds, Map topoConf) throws NoSuchAlgorithmException {
 +        List<ACL> aclList = ClusterUtils.mkTopoOnlyAcls(topoConf);
 +        String path = ClusterUtils.credentialsPath(stormId);
 +        stateStorage.set_data(path, Utils.serialize(creds), aclList);
 +
 +    }
 +
 +    @Override
 +    public Credentials credentials(String stormId, IFn callback) {
 +        if (callback != null) {
 +            credentialsCallback.put(stormId, callback);
 +        }
 +        String path = ClusterUtils.credentialsPath(stormId);
 +        return ClusterUtils.maybeDeserialize(stateStorage.get_data(path, callback != null), Credentials.class);
 +
 +    }
 +
 +    @Override
 +    public void disconnect() {
 +        stateStorage.unregister(stateId);
 +        if (solo)
 +            stateStorage.close();
 +    }
 +}

http://git-wip-us.apache.org/repos/asf/storm/blob/924f9a29/storm-core/src/jvm/org/apache/storm/cluster/ZKStateStorageFactory.java
----------------------------------------------------------------------
diff --cc storm-core/src/jvm/org/apache/storm/cluster/ZKStateStorageFactory.java
index f3b9253,0000000..956c20e
mode 100644,000000..100644
--- a/storm-core/src/jvm/org/apache/storm/cluster/ZKStateStorageFactory.java
+++ b/storm-core/src/jvm/org/apache/storm/cluster/ZKStateStorageFactory.java
@@@ -1,36 -1,0 +1,36 @@@
 +/**
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + * http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.storm.cluster;
 +
 +import clojure.lang.APersistentMap;
 +import org.apache.storm.utils.Utils;
 +import org.apache.zookeeper.data.ACL;
 +
 +import java.util.List;
 +
- public class ZKStateStorageFactory implements StateStorageFactory{
++public class ZKStateStorageFactory implements StateStorageFactory {
 +
 +    @Override
 +    public IStateStorage mkStore(APersistentMap config, APersistentMap auth_conf, List<ACL> acls, ClusterStateContext context) {
 +        try {
 +            return new ZKStateStorage(config, auth_conf, acls, context);
-         }catch (Exception e){
++        } catch (Exception e) {
 +            throw Utils.wrapInRuntime(e);
 +        }
 +    }
 +}

http://git-wip-us.apache.org/repos/asf/storm/blob/924f9a29/storm-core/src/jvm/org/apache/storm/pacemaker/PacemakerClient.java
----------------------------------------------------------------------
diff --cc storm-core/src/jvm/org/apache/storm/pacemaker/PacemakerClient.java
index af0e8f3,34f3665..20d6deb
--- a/storm-core/src/jvm/org/apache/storm/pacemaker/PacemakerClient.java
+++ b/storm-core/src/jvm/org/apache/storm/pacemaker/PacemakerClient.java
@@@ -60,6 -60,6 +60,10 @@@ public class PacemakerClient implement
      private StormBoundedExponentialBackoffRetry backoff = new StormBoundedExponentialBackoffRetry(100, 5000, 20);
      private int retryTimes = 0;
  
++    //the constructor is invoked by pacemaker-state-factory-test
++    public PacemakerClient() {
++        bootstrap = new ClientBootstrap();
++    }
      public PacemakerClient(Map config) {
  
          String host = (String)config.get(Config.PACEMAKER_HOST);
@@@ -157,6 -157,7 +161,7 @@@
      public String secretKey() {
          return secret;
      }
 -
++    public HBMessage  checkCaptured() {return null;}
      public HBMessage send(HBMessage m) {
          waitUntilReady();
          LOG.debug("Sending message: {}", m.toString());

http://git-wip-us.apache.org/repos/asf/storm/blob/924f9a29/storm-core/src/jvm/org/apache/storm/zookeeper/Zookeeper.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/storm/blob/924f9a29/storm-core/test/clj/integration/org/apache/storm/integration_test.clj
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/storm/blob/924f9a29/storm-core/test/clj/org/apache/storm/cluster_test.clj
----------------------------------------------------------------------
diff --cc storm-core/test/clj/org/apache/storm/cluster_test.clj
index 39adb9e,b146cb0..22c1f80
--- a/storm-core/test/clj/org/apache/storm/cluster_test.clj
+++ b/storm-core/test/clj/org/apache/storm/cluster_test.clj
@@@ -22,11 -22,11 +22,11 @@@
    (:import [org.mockito Mockito])
    (:import [org.mockito.exceptions.base MockitoAssertionError])
    (:import [org.apache.curator.framework CuratorFramework CuratorFrameworkFactory CuratorFrameworkFactory$Builder])
-   (:import [org.apache.storm.utils Utils TestUtils ZookeeperAuthInfo ConfigUtils])
+   (:import [org.apache.storm.utils Time Utils ZookeeperAuthInfo ConfigUtils])
 -  (:import [org.apache.storm.cluster ClusterState])
 +  (:import [org.apache.storm.cluster IStateStorage ZKStateStorage ClusterStateContext StormClusterStateImpl ClusterUtils])
    (:import [org.apache.storm.zookeeper Zookeeper])
 -  (:import [org.apache.storm.testing.staticmocking MockedZookeeper])
 -  (:require [org.apache.storm [zookeeper :as zk]])
 +  (:import [org.apache.storm.callback ZKStateChangedCallback])
 +  (:import [org.apache.storm.testing.staticmocking MockedZookeeper MockedCluster])
    (:require [conjure.core])
    (:use [conjure core])
    (:use [clojure test])
@@@ -39,14 -39,18 +39,18 @@@
  
  (defn mk-state
    ([zk-port] (let [conf (mk-config zk-port)]
 -               (mk-distributed-cluster-state conf :auth-conf conf)))
 +               (ClusterUtils/mkStateStorage conf conf nil (ClusterStateContext.))))
    ([zk-port cb]
 -     (let [ret (mk-state zk-port)]
 -       (.register ret cb)
 -       ret )))
 +    (let [ret (mk-state zk-port)]
 +      (.register ret cb)
 +      ret)))
  
 -(defn mk-storm-state [zk-port] (mk-storm-cluster-state (mk-config zk-port)))
 +(defn mk-storm-state [zk-port] (ClusterUtils/mkStormClusterState (mk-config zk-port) nil (ClusterStateContext.)))
  
+ (defn barr
+   [& vals]
+   (byte-array (map byte vals)))
+ 
  (deftest test-basics
    (with-inprocess-zookeeper zk-port
      (let [state (mk-state zk-port)]
@@@ -242,27 -244,26 +246,32 @@@
        (is (.contains (:error error) target))
        )))
  
++(defn- stringify-error [error]
++  (let [result (java.io.StringWriter.)
++        printer (java.io.PrintWriter. result)]
++    (.printStackTrace error printer)
++    (.toString result)))
  
  (deftest test-storm-cluster-state-errors
    (with-inprocess-zookeeper zk-port
      (with-simulated-time
        (let [state (mk-storm-state zk-port)]
-         (.reportError state "a" "1" (local-hostname) 6700 (stringify-error (RuntimeException.)))
 -        (.report-error state "a" "1" (Utils/localHostname) 6700 (RuntimeException.))
++        (.reportError state "a" "1" (Utils/localHostname) 6700  (RuntimeException.))
          (validate-errors! state "a" "1" ["RuntimeException"])
          (advance-time-secs! 1)
-         (.reportError state "a" "1" (local-hostname) 6700 (stringify-error (IllegalArgumentException.)))
 -        (.report-error state "a" "1" (Utils/localHostname) 6700 (IllegalArgumentException.))
++        (.reportError state "a" "1" (Utils/localHostname) 6700 (IllegalArgumentException.))
          (validate-errors! state "a" "1" ["IllegalArgumentException" "RuntimeException"])
          (doseq [i (range 10)]
-           (.reportError state "a" "2" (local-hostname) 6700 (stringify-error (RuntimeException.)))
 -          (.report-error state "a" "2" (Utils/localHostname) 6700 (RuntimeException.))
++          (.reportError state "a" "2" (Utils/localHostname) 6700 (RuntimeException.))
            (advance-time-secs! 2))
          (validate-errors! state "a" "2" (repeat 10 "RuntimeException"))
          (doseq [i (range 5)]
-           (.reportError state "a" "2" (local-hostname) 6700 (stringify-error (IllegalArgumentException.)))
 -          (.report-error state "a" "2" (Utils/localHostname) 6700 (IllegalArgumentException.))
++          (.reportError state "a" "2" (Utils/localHostname) 6700 (IllegalArgumentException.))
            (advance-time-secs! 2))
          (validate-errors! state "a" "2" (concat (repeat 5 "IllegalArgumentException")
 -                                                (repeat 5 "RuntimeException")
 -                                                ))
 +                                          (repeat 5 "RuntimeException")
 +                                          ))
 +
          (.disconnect state)
          ))))
  
@@@ -300,12 -301,12 +309,12 @@@
        (. (Mockito/when (.connectString builder (Mockito/anyString))) (thenReturn builder))
        (. (Mockito/when (.connectionTimeoutMs builder (Mockito/anyInt))) (thenReturn builder))
        (. (Mockito/when (.sessionTimeoutMs builder (Mockito/anyInt))) (thenReturn builder))
-       (TestUtils/testSetupBuilder builder (str zk-port "/") conf (ZookeeperAuthInfo. conf))
+       (Utils/testSetupBuilder builder (str zk-port "/") conf (ZookeeperAuthInfo. conf))
        (is (nil?
 -           (try
 -             (. (Mockito/verify builder) (authorization "digest" (.getBytes (conf STORM-ZOOKEEPER-AUTH-PAYLOAD))))
 -             (catch MockitoAssertionError e
 -               e)))))))
 +            (try
 +              (. (Mockito/verify builder) (authorization "digest" (.getBytes (conf STORM-ZOOKEEPER-AUTH-PAYLOAD))))
 +              (catch MockitoAssertionError e
 +                e)))))))
  
  (deftest test-storm-state-callbacks
    ;; TODO finish
@@@ -313,17 -314,15 +322,17 @@@
  
  (deftest test-cluster-state-default-acls
    (testing "The default ACLs are empty."
 -    (let [zk-mock (Mockito/mock Zookeeper)]
 +    (let [zk-mock (Mockito/mock Zookeeper)
 +          curator-frameworke (reify CuratorFramework (^void close [this] nil))]
        ;; No need for when clauses because we just want to return nil
        (with-open [_ (MockedZookeeper. zk-mock)]
 -        (stubbing [zk/mk-client (reify CuratorFramework (^void close [this] nil))]
 -          (mk-distributed-cluster-state {})
 -          (.mkdirsImpl (Mockito/verify zk-mock (Mockito/times 1)) (Mockito/any) (Mockito/anyString) (Mockito/eq nil)))))
 -    (stubbing [mk-distributed-cluster-state (reify ClusterState
 -                                              (register [this callback] nil)
 -                                              (mkdirs [this path acls] nil))]
 -     (mk-storm-cluster-state {})
 -     (verify-call-times-for mk-distributed-cluster-state 1)
 -     (verify-first-call-args-for-indices mk-distributed-cluster-state [4] nil))))
 +        (. (Mockito/when (.mkClientImpl zk-mock (Mockito/anyMap) (Mockito/anyList) (Mockito/any) (Mockito/anyString) (Mockito/any) (Mockito/anyMap))) (thenReturn curator-frameworke))
 +        (ClusterUtils/mkStateStorage {} nil nil (ClusterStateContext.))
 +        (.mkdirsImpl (Mockito/verify zk-mock (Mockito/times 1)) (Mockito/any) (Mockito/anyString) (Mockito/eq nil))))
 +    (let [distributed-state-storage (reify IStateStorage
 +                                      (register [this callback] nil)
 +                                      (mkdirs [this path acls] nil))
 +          cluster-utils (Mockito/mock ClusterUtils)]
 +      (with-open [mocked-cluster (MockedCluster. cluster-utils)]
-         (. (Mockito/when (mkStateStorageImpl cluster-utils (Mockito/any) (Mockito/any) (Mockito/eq nil) (Mockito/any))) (thenReturn distributed-state-storage))
++        (. (Mockito/when (.mkStateStorageImpl cluster-utils (Mockito/any) (Mockito/any) (Mockito/eq nil) (Mockito/any))) (thenReturn distributed-state-storage))
 +        (ClusterUtils/mkStormClusterState {} nil (ClusterStateContext.))))))

http://git-wip-us.apache.org/repos/asf/storm/blob/924f9a29/storm-core/test/clj/org/apache/storm/nimbus_test.clj
----------------------------------------------------------------------
diff --cc storm-core/test/clj/org/apache/storm/nimbus_test.clj
index 772a232,70cb885..09c4371
--- a/storm-core/test/clj/org/apache/storm/nimbus_test.clj
+++ b/storm-core/test/clj/org/apache/storm/nimbus_test.clj
@@@ -23,27 -23,33 +23,36 @@@
             [org.apache.storm.nimbus InMemoryTopologyActionNotifier])
    (:import [org.apache.storm.testing.staticmocking MockedZookeeper])
    (:import [org.apache.storm.scheduler INimbus])
 +  (:import [org.mockito Mockito])
 +  (:import [org.mockito.exceptions.base MockitoAssertionError])
    (:import [org.apache.storm.nimbus ILeaderElector NimbusInfo])
-   (:import [org.apache.storm.testing.staticmocking MockedConfigUtils MockedCluster])
++  (:import [org.apache.storm.testing.staticmocking MockedCluster])
    (:import [org.apache.storm.generated Credentials NotAliveException SubmitOptions
              TopologyInitialStatus TopologyStatus AlreadyAliveException KillOptions RebalanceOptions
              InvalidTopologyException AuthorizationException
              LogConfig LogLevel LogLevelAction])
    (:import [java.util HashMap])
    (:import [java.io File])
-   (:import [org.apache.storm.utils Time Utils ConfigUtils])
+   (:import [org.apache.storm.utils Time Utils Utils$UptimeComputer ConfigUtils IPredicate]
+            [org.apache.storm.utils.staticmocking ConfigUtilsInstaller UtilsInstaller])
    (:import [org.apache.storm.zookeeper Zookeeper])
-   (:import [org.apache.commons.io FileUtils])
+   (:import [org.apache.commons.io FileUtils]
+            [org.json.simple JSONValue])
 -  (:use [org.apache.storm testing MockAutoCred util config log timer zookeeper])
 +  (:import [org.apache.storm.cluster StormClusterStateImpl ClusterStateContext ClusterUtils])
 +  (:use [org.apache.storm testing MockAutoCred util config log timer converter])
    (:use [org.apache.storm.daemon common])
    (:require [conjure.core])
    (:require [org.apache.storm
 -             [thrift :as thrift]
 -             [cluster :as cluster]])
 +             [thrift :as thrift]])
    (:use [conjure core]))
  
+ (defn- from-json
+        [^String str]
+        (if str
+          (clojurify-structure
+            (JSONValue/parse str))
+          nil))
+ 
  (defn storm-component->task-info [cluster storm-name]
    (let [storm-id (get-storm-id (:storm-cluster-state cluster) storm-name)
          nimbus (:nimbus cluster)]
@@@ -72,8 -80,8 +83,8 @@@
  
  (defn storm-num-workers [state storm-name]
    (let [storm-id (get-storm-id state storm-name)
 -        assignment (.assignment-info state storm-id nil)]
 +        assignment (clojurify-assignment (.assignmentInfo state storm-id nil))]
-     (count (reverse-map (:executor->node+port assignment)))
+     (count (clojurify-structure (Utils/reverseMap (:executor->node+port assignment))))
      ))
  
  (defn topology-nodes [state storm-name]
@@@ -95,9 -103,11 +106,11 @@@
           set         
           )))
  
+ ;TODO: when translating this function, don't call map-val, but instead use an inline for loop.
+ ; map-val is a temporary kluge for clojure.
  (defn topology-node-distribution [state storm-name]
    (let [storm-id (get-storm-id state storm-name)
 -        assignment (.assignment-info state storm-id nil)]
 +        assignment (clojurify-assignment (.assignmentInfo state storm-id nil))]
      (->> assignment
           :executor->node+port
           vals
@@@ -124,19 -134,18 +137,18 @@@
  
  (defn do-executor-heartbeat [cluster storm-id executor]
    (let [state (:storm-cluster-state cluster)
 -        executor->node+port (:executor->node+port (.assignment-info state storm-id nil))
 +        executor->node+port (:executor->node+port (clojurify-assignment (.assignmentInfo state storm-id nil)))
          [node port] (get executor->node+port executor)
 -        curr-beat (.get-worker-heartbeat state storm-id node port)
 +        curr-beat (clojurify-zk-worker-hb (.getWorkerHeartbeat state storm-id node port))
          stats (:executor-stats curr-beat)]
 -    (.worker-heartbeat! state storm-id node port
 -      {:storm-id storm-id :time-secs (Time/currentTimeSecs) :uptime 10 :executor-stats (merge stats {executor (stats/render-stats! (stats/mk-bolt-stats 20))})}
 +    (.workerHeartbeat state storm-id node port
-       (thriftify-zk-worker-hb {:storm-id storm-id :time-secs (current-time-secs) :uptime 10 :executor-stats (merge stats {executor (stats/render-stats! (stats/mk-bolt-stats 20))})})
++      (thriftify-zk-worker-hb {:storm-id storm-id :time-secs (Time/currentTimeSecs) :uptime 10 :executor-stats (merge stats {executor (stats/render-stats! (stats/mk-bolt-stats 20))})})
        )))
  
  (defn slot-assignments [cluster storm-id]
    (let [state (:storm-cluster-state cluster)
 -        assignment (.assignment-info state storm-id nil)]
 +        assignment (clojurify-assignment (.assignmentInfo state storm-id nil))]
-     (reverse-map (:executor->node+port assignment))
-     ))
+         (clojurify-structure (Utils/reverseMap (:executor->node+port assignment)))))
  
  (defn task-ids [cluster storm-id]
    (let [nimbus (:nimbus cluster)]
@@@ -146,8 -155,10 +158,10 @@@
  
  (defn topology-executors [cluster storm-id]
    (let [state (:storm-cluster-state cluster)
-         assignment (clojurify-assignment (.assignmentInfo state storm-id nil))]
-     (keys (:executor->node+port assignment))
 -        assignment (.assignment-info state storm-id nil)
 -        ret-keys (keys (:executor->node+port assignment))
++        assignment (clojurify-assignment (.assignmentInfo state storm-id nil))
++    ret-keys (keys (:executor->node+port assignment))
+         _ (log-message "ret-keys: " (pr-str ret-keys)) ]
+     ret-keys
      ))
  
  (defn check-distribution [items distribution]
@@@ -1350,23 -1399,27 +1402,27 @@@
                       NIMBUS-THRIFT-PORT 6666})
            expected-acls nimbus/NIMBUS-ZK-ACLS
            fake-inimbus (reify INimbus (getForcedScheduler [this] nil))
+           fake-cu (proxy [ConfigUtils] []
 -                      (nimbusTopoHistoryStateImpl [conf] nil))
++                    (nimbusTopoHistoryStateImpl [conf] nil))
+           fake-utils (proxy [Utils] []
+                        (newInstanceImpl [_])
+                        (makeUptimeComputer [] (proxy [Utils$UptimeComputer] []
 -                                                (upTime [] 0))))]
++                                                (upTime [] 0))))
 +          cluster-utils (Mockito/mock ClusterUtils)]
-       (with-open [_ (proxy [MockedConfigUtils] []
-                       (nimbusTopoHistoryStateImpl [conf] nil))
+       (with-open [_ (ConfigUtilsInstaller. fake-cu)
+                   _ (UtilsInstaller. fake-utils)
                    zk-le (MockedZookeeper. (proxy [Zookeeper] []
 -                          (zkLeaderElectorImpl [conf] nil)))]
 +                          (zkLeaderElectorImpl [conf] nil)))
 +                  mocked-cluster (MockedCluster. cluster-utils)]
          (stubbing [mk-authorization-handler nil
 -                   cluster/mk-storm-cluster-state nil
 -                   nimbus/file-cache-map nil
 -                   nimbus/mk-blob-cache-map nil
 -                   nimbus/mk-bloblist-cache-map nil
 -                   mk-timer nil
 -                   nimbus/mk-scheduler nil]
 -                  (nimbus/nimbus-data auth-conf fake-inimbus)
 -                  (verify-call-times-for cluster/mk-storm-cluster-state 1)
 -                  (verify-first-call-args-for-indices cluster/mk-storm-cluster-state [2]
 -                                                      expected-acls))))))
 +                 nimbus/file-cache-map nil
 +                 nimbus/mk-blob-cache-map nil
 +                 nimbus/mk-bloblist-cache-map nil
-                  uptime-computer nil
-                  new-instance nil
 +                 mk-timer nil
 +                 nimbus/mk-scheduler nil]
 +          (nimbus/nimbus-data auth-conf fake-inimbus)
 +          (.mkStormClusterStateImpl (Mockito/verify cluster-utils (Mockito/times 1)) (Mockito/any) (Mockito/eq expected-acls) (Mockito/any))
 +          )))))
  
  (deftest test-file-bogus-download
    (with-local-cluster [cluster :daemon-conf {SUPERVISOR-ENABLE false TOPOLOGY-ACKER-EXECUTORS 0 TOPOLOGY-EVENTLOGGER-EXECUTORS 0}]
@@@ -1397,9 -1450,9 +1453,9 @@@
                        STORM-CLUSTER-MODE "local"
                        STORM-ZOOKEEPER-PORT zk-port
                        STORM-LOCAL-DIR nimbus-dir}))
 -        (bind cluster-state (cluster/mk-storm-cluster-state conf))
 +        (bind cluster-state (ClusterUtils/mkStormClusterState conf nil (ClusterStateContext.)))
          (bind nimbus (nimbus/service-handler conf (nimbus/standalone-nimbus)))
-         (sleep-secs 1)
+         (Time/sleepSecs 1)
          (bind topology (thrift/mk-topology
                           {"1" (thrift/mk-spout-spec (TestPlannerSpout. true) :parallelism-hint 3)}
                           {}))
@@@ -1429,10 -1482,10 +1485,10 @@@
                          STORM-ZOOKEEPER-PORT zk-port
                          STORM-LOCAL-DIR nimbus-dir
                          NIMBUS-TOPOLOGY-ACTION-NOTIFIER-PLUGIN (.getName InMemoryTopologyActionNotifier)}))
 -          (bind cluster-state (cluster/mk-storm-cluster-state conf))
 +          (bind cluster-state (ClusterUtils/mkStormClusterState conf nil (ClusterStateContext.)))
            (bind nimbus (nimbus/service-handler conf (nimbus/standalone-nimbus)))
            (bind notifier (InMemoryTopologyActionNotifier.))
-           (sleep-secs 1)
+           (Time/sleepSecs 1)
            (bind topology (thrift/mk-topology
                             {"1" (thrift/mk-spout-spec (TestPlannerSpout. true) :parallelism-hint 3)}
                             {}))

http://git-wip-us.apache.org/repos/asf/storm/blob/924f9a29/storm-core/test/clj/org/apache/storm/pacemaker_state_factory_test.clj
----------------------------------------------------------------------
diff --cc storm-core/test/clj/org/apache/storm/pacemaker_state_factory_test.clj
index 1a7bd2c,0925237..1c45266
--- a/storm-core/test/clj/org/apache/storm/pacemaker_state_factory_test.clj
+++ b/storm-core/test/clj/org/apache/storm/pacemaker_state_factory_test.clj
@@@ -19,10 -20,8 +19,11 @@@
    (:import [org.apache.storm.generated
              HBExecutionException HBNodes HBRecords
              HBServerMessageType HBMessage HBMessageData HBPulse]
-            [org.apache.storm.cluster ClusterStateContext  PaceMakerStateStorageFactory]
 -           [org.apache.storm.cluster ClusterStateContext]
 -           [org.mockito Mockito Matchers]))
++           [org.apache.storm.cluster ClusterStateContext  PaceMakerStateStorageFactory PaceMakerStateStorage]
 +           [org.mockito Mockito Matchers])
 +(:import [org.mockito.exceptions.base MockitoAssertionError])
++(:import [org.apache.storm.pacemaker PacemakerClient])
 +(:import [org.apache.storm.testing.staticmocking MockedPaceMakerStateStorageFactory]))
  
  (defn- string-to-bytes [string]
    (byte-array (map int string)))
@@@ -30,26 -29,24 +31,23 @@@
  (defn- bytes-to-string [bytez]
    (apply str (map char bytez)))
  
--(defprotocol send-capture
--  (send [this something])
--  (check-captured [this]))
--
  (defn- make-send-capture [response]
    (let [captured (atom nil)]
--    (reify send-capture
--      (send [this something] (reset! captured something) response)
--      (check-captured [this] @captured))))
 -
 -(defmacro with-mock-pacemaker-client-and-state [client state response & body]
 -  `(let [~client (make-send-capture ~response)]
 -     (stubbing [psf/makeZKState nil
 -                psf/makeClient ~client]
 -               (let [~state (psf/-mkState nil nil nil nil (ClusterStateContext.))]
++    (proxy [PacemakerClient] []
++      (send [m] (reset! captured m) response)
++      (checkCaptured [] @captured))))
 +
 +(defmacro with-mock-pacemaker-client-and-state [client state pacefactory mock response & body]
 +  `(let [~client (make-send-capture ~response)
 +         ~pacefactory (Mockito/mock PaceMakerStateStorageFactory)]
 +
 +     (with-open [~mock (MockedPaceMakerStateStorageFactory. ~pacefactory)]
 +       (. (Mockito/when (.initZKstateImpl ~pacefactory (Mockito/any) (Mockito/any) (Mockito/anyList) (Mockito/any))) (thenReturn nil))
 +       (. (Mockito/when (.initMakeClientImpl ~pacefactory (Mockito/any))) (thenReturn ~client))
-                (let [~state (.mkStore ~pacefactory nil nil nil (ClusterStateContext.))]
++               (let [~state (PaceMakerStateStorage. (PaceMakerStateStorageFactory/initMakeClient nil)
++                   (PaceMakerStateStorageFactory/initZKstate nil  nil nil nil))]
                   ~@body))))
  
 -
  (deftest pacemaker_state_set_worker_hb
    (testing "set_worker_hb"
      (with-mock-pacemaker-client-and-state
@@@ -57,7 -54,7 +55,7 @@@
        (HBMessage. HBServerMessageType/SEND_PULSE_RESPONSE nil)
  
        (.set_worker_hb state "/foo" (string-to-bytes "data") nil)
--      (let [sent (.check-captured client)
++      (let [sent (.checkCaptured client)
              pulse (.get_pulse (.get_data sent))]
          (is (= (.get_type sent) HBServerMessageType/SEND_PULSE))
          (is (= (.get_id pulse) "/foo"))
@@@ -65,13 -62,13 +63,12 @@@
  
    (testing "set_worker_hb"
      (with-mock-pacemaker-client-and-state
 -      client state
 +      client state pacefactory mock
        (HBMessage. HBServerMessageType/SEND_PULSE nil)
  
-       (is (thrown? RuntimeException      
 -      (is (thrown? HBExecutionException      
--                   (.set_worker_hb state "/foo" (string-to-bytes "data") nil))))))
++      (is (thrown? RuntimeException
++            (.set_worker_hb state "/foo" (string-to-bytes "data") nil))))))
  
--      
  
  (deftest pacemaker_state_delete_worker_hb
    (testing "delete_worker_hb"
@@@ -80,74 -77,74 +77,75 @@@
        (HBMessage. HBServerMessageType/DELETE_PATH_RESPONSE nil)
  
        (.delete_worker_hb state "/foo/bar")
--      (let [sent (.check-captured client)]
++      (let [sent (.checkCaptured client)]
          (is (= (.get_type sent) HBServerMessageType/DELETE_PATH))
          (is (= (.get_path (.get_data sent)) "/foo/bar")))))
  
--    (testing "delete_worker_hb"
--      (with-mock-pacemaker-client-and-state
-         client state pacefactory mock
 -        client state
--        (HBMessage. HBServerMessageType/DELETE_PATH nil)
--        
-         (is (thrown? RuntimeException
 -        (is (thrown? HBExecutionException
--                     (.delete_worker_hb state "/foo/bar"))))))
++  (testing "delete_worker_hb"
++    (with-mock-pacemaker-client-and-state
++      client state pacefactory mock
++      (HBMessage. HBServerMessageType/DELETE_PATH nil)
++
++      (is (thrown? RuntimeException
++            (.delete_worker_hb state "/foo/bar"))))))
  
  (deftest pacemaker_state_get_worker_hb
    (testing "get_worker_hb"
      (with-mock-pacemaker-client-and-state
 -      client state
 +      client state pacefactory mock
        (HBMessage. HBServerMessageType/GET_PULSE_RESPONSE
--                (HBMessageData/pulse
--                 (doto (HBPulse.)
--                   (.set_id "/foo")
--                   (.set_details (string-to-bytes "some data")))))
++        (HBMessageData/pulse
++          (doto (HBPulse.)
++            (.set_id "/foo")
++            (.set_details (string-to-bytes "some data")))))
  
        (.get_worker_hb state "/foo" false)
--      (let [sent (.check-captured client)]
++      (let [sent (.checkCaptured client)]
          (is (= (.get_type sent) HBServerMessageType/GET_PULSE))
          (is (= (.get_path (.get_data sent)) "/foo")))))
  
    (testing "get_worker_hb - fail (bad response)"
      (with-mock-pacemaker-client-and-state
 -      client state
 +      client state pacefactory mock
        (HBMessage. HBServerMessageType/GET_PULSE nil)
--      
 -      (is (thrown? HBExecutionException
 -                   (.get_worker_hb state "/foo" false)))))
 -  
++
 +      (is (thrown? RuntimeException
-                    (.get_worker_hb state "/foo" false)))))
-   
++            (.get_worker_hb state "/foo" false)))))
++
    (testing "get_worker_hb - fail (bad data)"
      (with-mock-pacemaker-client-and-state
 -      client state
 +      client state pacefactory mock
        (HBMessage. HBServerMessageType/GET_PULSE_RESPONSE nil)
--      
 -      (is (thrown? HBExecutionException
 -                   (.get_worker_hb state "/foo" false))))))
++
 +      (is (thrown? RuntimeException
-                    (.get_worker_hb state "/foo" false))))))
++            (.get_worker_hb state "/foo" false))))))
  
  (deftest pacemaker_state_get_worker_hb_children
    (testing "get_worker_hb_children"
      (with-mock-pacemaker-client-and-state
 -      client state
 +      client state pacefactory mock
        (HBMessage. HBServerMessageType/GET_ALL_NODES_FOR_PATH_RESPONSE
--                (HBMessageData/nodes
--                 (HBNodes. [])))
++        (HBMessageData/nodes
++          (HBNodes. [])))
  
        (.get_worker_hb_children state "/foo" false)
--      (let [sent (.check-captured client)]
++      (let [sent (.checkCaptured client)]
          (is (= (.get_type sent) HBServerMessageType/GET_ALL_NODES_FOR_PATH))
          (is (= (.get_path (.get_data sent)) "/foo")))))
  
    (testing "get_worker_hb_children - fail (bad response)"
      (with-mock-pacemaker-client-and-state
 -      client state
 +      client state pacefactory mock
        (HBMessage. HBServerMessageType/DELETE_PATH nil)
  
 -      (is (thrown? HBExecutionException
 -                   (.get_worker_hb_children state "/foo" false)))))
 +      (is (thrown? RuntimeException
-                    (.get_worker_hb_children state "/foo" false)))))
++            (.get_worker_hb_children state "/foo" false)))))
  
--    (testing "get_worker_hb_children - fail (bad data)"
++  (testing "get_worker_hb_children - fail (bad data)"
      (with-mock-pacemaker-client-and-state
 -      client state
 +      client state pacefactory mock
        (HBMessage. HBServerMessageType/GET_ALL_NODES_FOR_PATH_RESPONSE nil)
-       ;need been update due to HBExecutionException
 -      
 -      (is (thrown? HBExecutionException
 -                   (.get_worker_hb_children state "/foo" false))))))
++
 +      (is (thrown? RuntimeException
-                    (.get_worker_hb_children state "/foo" false))))))
++            (.get_worker_hb_children state "/foo" false))))))
++

http://git-wip-us.apache.org/repos/asf/storm/blob/924f9a29/storm-core/test/clj/org/apache/storm/supervisor_test.clj
----------------------------------------------------------------------
diff --cc storm-core/test/clj/org/apache/storm/supervisor_test.clj
index b89b7bb,9c31ddf..3ebdbcd
--- a/storm-core/test/clj/org/apache/storm/supervisor_test.clj
+++ b/storm-core/test/clj/org/apache/storm/supervisor_test.clj
@@@ -21,20 -21,20 +21,23 @@@
    (:require [clojure [string :as string] [set :as set]])
    (:import [org.apache.storm.testing TestWordCounter TestWordSpout TestGlobalCount TestAggregatesCounter TestPlannerSpout])
    (:import [org.apache.storm.scheduler ISupervisor])
-   (:import [org.apache.storm.utils ConfigUtils])
+   (:import [org.apache.storm.utils Time Utils$UptimeComputer ConfigUtils])
    (:import [org.apache.storm.generated RebalanceOptions])
-   (:import [org.apache.storm.testing.staticmocking MockedConfigUtils MockedCluster])
 -  (:import [org.mockito Matchers Mockito])
++  (:import [org.apache.storm.testing.staticmocking MockedCluster])
    (:import [java.util UUID])
-   (:import [org.mockito Mockito])
++  (:import [org.mockito Mockito Matchers])
 +  (:import [org.mockito.exceptions.base MockitoAssertionError])
    (:import [java.io File])
    (:import [java.nio.file Files])
-   (:import [org.apache.storm.cluster StormClusterStateImpl ClusterStateContext ClusterUtils])
 -  (:import [org.apache.storm.utils Utils IPredicate]
++  (:import [org.apache.storm.utils Utils IPredicate])
++  (:import [org.apache.storm.cluster StormClusterStateImpl ClusterStateContext ClusterUtils]
+            [org.apache.storm.utils.staticmocking ConfigUtilsInstaller
+                                                  UtilsInstaller])
    (:import [java.nio.file.attribute FileAttribute])
 -  (:use [org.apache.storm config testing util timer log])
 +  (:use [org.apache.storm config testing util timer log converter])
    (:use [org.apache.storm.daemon common])
    (:require [org.apache.storm.daemon [worker :as worker] [supervisor :as supervisor]]
 -            [org.apache.storm [thrift :as thrift] [cluster :as cluster]])
 +            [org.apache.storm [thrift :as thrift]])
    (:use [conjure core])
    (:require [clojure.java.io :as io]))
  
@@@ -43,9 -43,10 +46,10 @@@
    [cluster supervisor-id port]
    (let [state (:storm-cluster-state cluster)
          slot-assigns (for [storm-id (.assignments state nil)]
 -                        (let [executors (-> (.assignment-info state storm-id nil)
 +                        (let [executors (-> (clojurify-assignment (.assignmentInfo state storm-id nil))
                                          :executor->node+port
-                                         reverse-map
+                                         (Utils/reverseMap)
+                                         clojurify-structure
                                          (get [supervisor-id port] ))]
                            (when executors [storm-id executors])
                            ))
@@@ -565,198 -632,203 +635,201 @@@
            fake-isupervisor (reify ISupervisor
                               (getSupervisorId [this] nil)
                               (getAssignmentId [this] nil))
+           fake-cu (proxy [ConfigUtils] []
+                     (supervisorStateImpl [conf] nil)
+                     (supervisorLocalDirImpl [conf] nil))
+           fake-utils (proxy [Utils] []
+                        (localHostnameImpl [] nil)
+                        (makeUptimeComputer [] (proxy [Utils$UptimeComputer] []
 -                                                (upTime [] 0))))]
++                                                (upTime [] 0))))
 +          cluster-utils (Mockito/mock ClusterUtils)]
-       (with-open [_ (proxy [MockedConfigUtils] []
-                       (supervisorStateImpl [conf] nil)
-                       (supervisorLocalDirImpl [conf] nil))
+       (with-open [_ (ConfigUtilsInstaller. fake-cu)
 -                  _ (UtilsInstaller. fake-utils)]
 -        (stubbing [cluster/mk-storm-cluster-state nil
 -                   mk-timer nil]
++                  _ (UtilsInstaller. fake-utils)
 +                  mocked-cluster (MockedCluster. cluster-utils)]
-         (stubbing [uptime-computer nil
-               ;   cluster/mk-storm-cluster-state nil
-                  local-hostname nil
-                  mk-timer nil]
            (supervisor/supervisor-data auth-conf nil fake-isupervisor)
-           (.mkStormClusterStateImpl (Mockito/verify cluster-utils (Mockito/times 1)) (Mockito/any) (Mockito/eq expected-acls) (Mockito/any))
-         ;  (verify-call-times-for cluster/mk-storm-cluster-state 1)
-         ;  (verify-first-call-args-for-indices cluster/mk-storm-cluster-state [2]
-         ;                                     expected-acls)
-          )))))
 -          (verify-call-times-for cluster/mk-storm-cluster-state 1)
 -          (verify-first-call-args-for-indices cluster/mk-storm-cluster-state [2]
 -              expected-acls)))))
++          (.mkStormClusterStateImpl (Mockito/verify cluster-utils (Mockito/times 1)) (Mockito/any) (Mockito/eq expected-acls) (Mockito/any))))))
  
- (deftest test-write-log-metadata
-   (testing "supervisor writes correct data to logs metadata file"
-     (let [exp-owner "alice"
-           exp-worker-id "42"
-           exp-storm-id "0123456789"
-           exp-port 4242
-           exp-logs-users ["bob" "charlie" "daryl"]
-           exp-logs-groups ["read-only-group" "special-group"]
-           storm-conf {TOPOLOGY-SUBMITTER-USER "alice"
-                       TOPOLOGY-USERS ["charlie" "bob"]
-                       TOPOLOGY-GROUPS ["special-group"]
-                       LOGS-GROUPS ["read-only-group"]
-                       LOGS-USERS ["daryl"]}
-           exp-data {TOPOLOGY-SUBMITTER-USER exp-owner
-                     "worker-id" exp-worker-id
-                     LOGS-USERS exp-logs-users
-                     LOGS-GROUPS exp-logs-groups}
-           conf {}]
-       (mocking [supervisor/write-log-metadata-to-yaml-file!]
-         (supervisor/write-log-metadata! storm-conf exp-owner exp-worker-id
-                                         exp-storm-id exp-port conf)
-         (verify-called-once-with-args supervisor/write-log-metadata-to-yaml-file!
-                                       exp-storm-id exp-port exp-data conf)))))
+   (deftest test-write-log-metadata
+     (testing "supervisor writes correct data to logs metadata file"
+       (let [exp-owner "alice"
+             exp-worker-id "42"
+             exp-storm-id "0123456789"
+             exp-port 4242
+             exp-logs-users ["bob" "charlie" "daryl"]
+             exp-logs-groups ["read-only-group" "special-group"]
+             storm-conf {TOPOLOGY-SUBMITTER-USER "alice"
+                         TOPOLOGY-USERS ["charlie" "bob"]
+                         TOPOLOGY-GROUPS ["special-group"]
+                         LOGS-GROUPS ["read-only-group"]
+                         LOGS-USERS ["daryl"]}
+             exp-data {TOPOLOGY-SUBMITTER-USER exp-owner
+                       "worker-id" exp-worker-id
+                       LOGS-USERS exp-logs-users
+                       LOGS-GROUPS exp-logs-groups}
+             conf {}]
+         (mocking [supervisor/write-log-metadata-to-yaml-file!]
+           (supervisor/write-log-metadata! storm-conf exp-owner exp-worker-id
+             exp-storm-id exp-port conf)
+           (verify-called-once-with-args supervisor/write-log-metadata-to-yaml-file!
+             exp-storm-id exp-port exp-data conf)))))
  
- (deftest test-worker-launcher-requires-user
-   (testing "worker-launcher throws on blank user"
-     (mocking [launch-process]
-       (is (thrown-cause-with-msg? java.lang.IllegalArgumentException
-                                   #"(?i).*user cannot be blank.*"
-                                   (supervisor/worker-launcher {} nil ""))))))
+   (deftest test-worker-launcher-requires-user
+     (testing "worker-launcher throws on blank user"
+       (let [utils-proxy (proxy [Utils] []
+                           (launchProcessImpl [& _] nil))]
+         (with-open [_ (UtilsInstaller. utils-proxy)]
+           (is (try
+                 (supervisor/worker-launcher {} nil "")
+                 false
+                 (catch Throwable t
+                   (and (re-matches #"(?i).*user cannot be blank.*" (.getMessage t))
+                        (Utils/exceptionCauseIsInstanceOf java.lang.IllegalArgumentException t)))))))))
  
- (defn found? [sub-str input-str]
-   (if (string? input-str)
-     (contrib-str/substring? sub-str (str input-str))
-     (boolean (some #(contrib-str/substring? sub-str %) input-str))))
+   (defn found? [sub-str input-str]
+     (if (string? input-str)
+       (contrib-str/substring? sub-str (str input-str))
+       (boolean (some #(contrib-str/substring? sub-str %) input-str))))
  
- (defn not-found? [sub-str input-str]
+   (defn not-found? [sub-str input-str]
      (complement (found? sub-str input-str)))
  
- (deftest test-substitute-childopts-happy-path-string
-   (testing "worker-launcher replaces ids in childopts"
-     (let [worker-id "w-01"
-           topology-id "s-01"
-           port 9999
-           mem-onheap 512
-           childopts "-Xloggc:/home/y/lib/storm/current/logs/gc.worker-%ID%-%TOPOLOGY-ID%-%WORKER-ID%-%WORKER-PORT%.log -Xms256m -Xmx%HEAP-MEM%m"
-           expected-childopts '("-Xloggc:/home/y/lib/storm/current/logs/gc.worker-9999-s-01-w-01-9999.log" "-Xms256m" "-Xmx512m")
-           childopts-with-ids (supervisor/substitute-childopts childopts worker-id topology-id port mem-onheap)]
-       (is (= expected-childopts childopts-with-ids)))))
+   (deftest test-substitute-childopts-happy-path-string
+     (testing "worker-launcher replaces ids in childopts"
+       (let [worker-id "w-01"
+             topology-id "s-01"
+             port 9999
+             mem-onheap 512
+             childopts "-Xloggc:/home/y/lib/storm/current/logs/gc.worker-%ID%-%TOPOLOGY-ID%-%WORKER-ID%-%WORKER-PORT%.log -Xms256m -Xmx%HEAP-MEM%m"
+             expected-childopts '("-Xloggc:/home/y/lib/storm/current/logs/gc.worker-9999-s-01-w-01-9999.log" "-Xms256m" "-Xmx512m")
+             childopts-with-ids (supervisor/substitute-childopts childopts worker-id topology-id port mem-onheap)]
+         (is (= expected-childopts childopts-with-ids)))))
  
- (deftest test-substitute-childopts-happy-path-list
-   (testing "worker-launcher replaces ids in childopts"
-     (let [worker-id "w-01"
-           topology-id "s-01"
-           port 9999
-           mem-onheap 512
-           childopts '("-Xloggc:/home/y/lib/storm/current/logs/gc.worker-%ID%-%TOPOLOGY-ID%-%WORKER-ID%-%WORKER-PORT%.log" "-Xms256m" "-Xmx%HEAP-MEM%m")
-           expected-childopts '("-Xloggc:/home/y/lib/storm/current/logs/gc.worker-9999-s-01-w-01-9999.log" "-Xms256m" "-Xmx512m")
-           childopts-with-ids (supervisor/substitute-childopts childopts worker-id topology-id port mem-onheap)]
-       (is (= expected-childopts childopts-with-ids)))))
+   (deftest test-substitute-childopts-happy-path-list
+     (testing "worker-launcher replaces ids in childopts"
+       (let [worker-id "w-01"
+             topology-id "s-01"
+             port 9999
+             mem-onheap 512
+             childopts '("-Xloggc:/home/y/lib/storm/current/logs/gc.worker-%ID%-%TOPOLOGY-ID%-%WORKER-ID%-%WORKER-PORT%.log" "-Xms256m" "-Xmx%HEAP-MEM%m")
+             expected-childopts '("-Xloggc:/home/y/lib/storm/current/logs/gc.worker-9999-s-01-w-01-9999.log" "-Xms256m" "-Xmx512m")
+             childopts-with-ids (supervisor/substitute-childopts childopts worker-id topology-id port mem-onheap)]
+         (is (= expected-childopts childopts-with-ids)))))
  
- (deftest test-substitute-childopts-happy-path-list-arraylist
-   (testing "worker-launcher replaces ids in childopts"
-     (let [worker-id "w-01"
-           topology-id "s-01"
-           port 9999
-           mem-onheap 512
-           childopts '["-Xloggc:/home/y/lib/storm/current/logs/gc.worker-%ID%-%TOPOLOGY-ID%-%WORKER-ID%-%WORKER-PORT%.log" "-Xms256m" "-Xmx%HEAP-MEM%m"]
-           expected-childopts '("-Xloggc:/home/y/lib/storm/current/logs/gc.worker-9999-s-01-w-01-9999.log" "-Xms256m" "-Xmx512m")
-           childopts-with-ids (supervisor/substitute-childopts childopts worker-id topology-id port mem-onheap)]
-       (is (= expected-childopts childopts-with-ids)))))
+   (deftest test-substitute-childopts-happy-path-list-arraylist
+     (testing "worker-launcher replaces ids in childopts"
+       (let [worker-id "w-01"
+             topology-id "s-01"
+             port 9999
+             mem-onheap 512
+             childopts '["-Xloggc:/home/y/lib/storm/current/logs/gc.worker-%ID%-%TOPOLOGY-ID%-%WORKER-ID%-%WORKER-PORT%.log" "-Xms256m" "-Xmx%HEAP-MEM%m"]
+             expected-childopts '("-Xloggc:/home/y/lib/storm/current/logs/gc.worker-9999-s-01-w-01-9999.log" "-Xms256m" "-Xmx512m")
+             childopts-with-ids (supervisor/substitute-childopts childopts worker-id topology-id port mem-onheap)]
+         (is (= expected-childopts childopts-with-ids)))))
  
- (deftest test-substitute-childopts-topology-id-alone
-   (testing "worker-launcher replaces ids in childopts"
-     (let [worker-id "w-01"
-           topology-id "s-01"
-           port 9999
-           mem-onheap 512
-           childopts "-Xloggc:/home/y/lib/storm/current/logs/gc.worker-%TOPOLOGY-ID%.log"
-           expected-childopts '("-Xloggc:/home/y/lib/storm/current/logs/gc.worker-s-01.log")
-           childopts-with-ids (supervisor/substitute-childopts childopts worker-id topology-id port mem-onheap)]
-       (is (= expected-childopts childopts-with-ids)))))
+   (deftest test-substitute-childopts-topology-id-alone
+     (testing "worker-launcher replaces ids in childopts"
+       (let [worker-id "w-01"
+             topology-id "s-01"
+             port 9999
+             mem-onheap 512
+             childopts "-Xloggc:/home/y/lib/storm/current/logs/gc.worker-%TOPOLOGY-ID%.log"
+             expected-childopts '("-Xloggc:/home/y/lib/storm/current/logs/gc.worker-s-01.log")
+             childopts-with-ids (supervisor/substitute-childopts childopts worker-id topology-id port mem-onheap)]
+         (is (= expected-childopts childopts-with-ids)))))
  
- (deftest test-substitute-childopts-no-keys
-   (testing "worker-launcher has no ids to replace in childopts"
-     (let [worker-id "w-01"
-           topology-id "s-01"
-           port 9999
-           mem-onheap 512
-           childopts "-Xloggc:/home/y/lib/storm/current/logs/gc.worker.log"
-           expected-childopts '("-Xloggc:/home/y/lib/storm/current/logs/gc.worker.log")
-           childopts-with-ids (supervisor/substitute-childopts childopts worker-id topology-id port mem-onheap)]
-       (is (= expected-childopts childopts-with-ids)))))
+   (deftest test-substitute-childopts-no-keys
+     (testing "worker-launcher has no ids to replace in childopts"
+       (let [worker-id "w-01"
+             topology-id "s-01"
+             port 9999
+             mem-onheap 512
+             childopts "-Xloggc:/home/y/lib/storm/current/logs/gc.worker.log"
+             expected-childopts '("-Xloggc:/home/y/lib/storm/current/logs/gc.worker.log")
+             childopts-with-ids (supervisor/substitute-childopts childopts worker-id topology-id port mem-onheap)]
+         (is (= expected-childopts childopts-with-ids)))))
  
- (deftest test-substitute-childopts-nil-childopts
-   (testing "worker-launcher has nil childopts"
-     (let [worker-id "w-01"
-           topology-id "s-01"
-           port 9999
-           mem-onheap 512
-           childopts nil
-           expected-childopts nil
-           childopts-with-ids (supervisor/substitute-childopts childopts worker-id topology-id port mem-onheap)]
-       (is (= expected-childopts childopts-with-ids)))))
+   (deftest test-substitute-childopts-nil-childopts
+     (testing "worker-launcher has nil childopts"
+       (let [worker-id "w-01"
+             topology-id "s-01"
+             port 9999
+             mem-onheap 512
+             childopts nil
+             expected-childopts nil
+             childopts-with-ids (supervisor/substitute-childopts childopts worker-id topology-id port mem-onheap)]
+         (is (= expected-childopts childopts-with-ids)))))
  
- (deftest test-substitute-childopts-nil-ids
-   (testing "worker-launcher has nil ids"
-     (let [worker-id nil
-           topology-id "s-01"
-           port 9999
-           mem-onheap 512
-           childopts "-Xloggc:/home/y/lib/storm/current/logs/gc.worker-%ID%-%TOPOLOGY-ID%-%WORKER-ID%-%WORKER-PORT%.log"
-           expected-childopts '("-Xloggc:/home/y/lib/storm/current/logs/gc.worker-9999-s-01--9999.log")
-           childopts-with-ids (supervisor/substitute-childopts childopts worker-id topology-id port mem-onheap)]
-       (is (= expected-childopts childopts-with-ids)))))
+   (deftest test-substitute-childopts-nil-ids
+     (testing "worker-launcher has nil ids"
+       (let [worker-id nil
+             topology-id "s-01"
+             port 9999
+             mem-onheap 512
+             childopts "-Xloggc:/home/y/lib/storm/current/logs/gc.worker-%ID%-%TOPOLOGY-ID%-%WORKER-ID%-%WORKER-PORT%.log"
+             expected-childopts '("-Xloggc:/home/y/lib/storm/current/logs/gc.worker-9999-s-01--9999.log")
+             childopts-with-ids (supervisor/substitute-childopts childopts worker-id topology-id port mem-onheap)]
+         (is (= expected-childopts childopts-with-ids)))))
  
- (deftest test-retry-read-assignments
-   (with-simulated-time-local-cluster [cluster
-                                       :supervisors 0
-                                       :ports-per-supervisor 2
-                                       :daemon-conf {ConfigUtils/NIMBUS_DO_NOT_REASSIGN true
-                                                     NIMBUS-MONITOR-FREQ-SECS 10
-                                                     TOPOLOGY-MESSAGE-TIMEOUT-SECS 30
-                                                     TOPOLOGY-ACKER-EXECUTORS 0}]
-     (letlocals
-      (bind sup1 (add-supervisor cluster :id "sup1" :ports [1 2 3 4]))
-      (bind topology1 (thrift/mk-topology
-                       {"1" (thrift/mk-spout-spec (TestPlannerSpout. true) :parallelism-hint 2)}
-                       {}))
-      (bind topology2 (thrift/mk-topology
-                       {"1" (thrift/mk-spout-spec (TestPlannerSpout. true) :parallelism-hint 2)}
-                       {}))
-      (bind state (:storm-cluster-state cluster))
-      (bind changed (capture-changed-workers
-                     (submit-mocked-assignment
-                      (:nimbus cluster)
-                      (:storm-cluster-state cluster)
-                      "topology1"
-                      {TOPOLOGY-WORKERS 2}
-                      topology1
-                      {1 "1"
-                       2 "1"}
-                      {[1 1] ["sup1" 1]
-                       [2 2] ["sup1" 2]}
-                      {["sup1" 1] [0.0 0.0 0.0]
-                       ["sup1" 2] [0.0 0.0 0.0]
-                       })
-                     (submit-mocked-assignment
-                      (:nimbus cluster)
-                      (:storm-cluster-state cluster)
-                      "topology2"
-                      {TOPOLOGY-WORKERS 2}
-                      topology2
-                      {1 "1"
-                       2 "1"}
-                      {[1 1] ["sup1" 1]
-                       [2 2] ["sup1" 2]}
-                      {["sup1" 1] [0.0 0.0 0.0]
-                       ["sup1" 2] [0.0 0.0 0.0]
-                       })
-                     ;; Instead of sleeping until topology is scheduled, rebalance topology so mk-assignments is called.
-                     (.rebalance (:nimbus cluster) "topology1" (doto (RebalanceOptions.) (.set_wait_secs 0)))
-                     ))
-      (is (empty? (:launched changed)))
-      (bind options (RebalanceOptions.))
-      (.set_wait_secs options 0)
-      (bind changed (capture-changed-workers
-                     (.rebalance (:nimbus cluster) "topology2" options)
-                     (advance-cluster-time cluster 10)
-                     (heartbeat-workers cluster "sup1" [1 2 3 4])
-                     (advance-cluster-time cluster 10)
-                     ))
-      (validate-launched-once (:launched changed)
-                              {"sup1" [1 2]}
-                              (get-storm-id (:storm-cluster-state cluster) "topology1"))
-      (validate-launched-once (:launched changed)
-                              {"sup1" [3 4]}
-                              (get-storm-id (:storm-cluster-state cluster) "topology2"))
-      )))
+   (deftest test-retry-read-assignments
+     (with-simulated-time-local-cluster [cluster
+                                         :supervisors 0
+                                         :ports-per-supervisor 2
+                                         :daemon-conf {ConfigUtils/NIMBUS_DO_NOT_REASSIGN true
+                                                       NIMBUS-MONITOR-FREQ-SECS 10
+                                                       TOPOLOGY-MESSAGE-TIMEOUT-SECS 30
+                                                       TOPOLOGY-ACKER-EXECUTORS 0}]
+       (letlocals
+         (bind sup1 (add-supervisor cluster :id "sup1" :ports [1 2 3 4]))
+         (bind topology1 (thrift/mk-topology
+                           {"1" (thrift/mk-spout-spec (TestPlannerSpout. true) :parallelism-hint 2)}
+                           {}))
+         (bind topology2 (thrift/mk-topology
+                           {"1" (thrift/mk-spout-spec (TestPlannerSpout. true) :parallelism-hint 2)}
+                           {}))
+         (bind state (:storm-cluster-state cluster))
+         (bind changed (capture-changed-workers
+                         (submit-mocked-assignment
+                           (:nimbus cluster)
+                           (:storm-cluster-state cluster)
+                           "topology1"
+                           {TOPOLOGY-WORKERS 2}
+                           topology1
+                           {1 "1"
+                            2 "1"}
+                           {[1 1] ["sup1" 1]
+                            [2 2] ["sup1" 2]}
+                           {["sup1" 1] [0.0 0.0 0.0]
+                            ["sup1" 2] [0.0 0.0 0.0]
+                            })
+                         (submit-mocked-assignment
+                           (:nimbus cluster)
+                           (:storm-cluster-state cluster)
+                           "topology2"
+                           {TOPOLOGY-WORKERS 2}
+                           topology2
+                           {1 "1"
+                            2 "1"}
+                           {[1 1] ["sup1" 1]
+                            [2 2] ["sup1" 2]}
+                           {["sup1" 1] [0.0 0.0 0.0]
+                            ["sup1" 2] [0.0 0.0 0.0]
+                            })
+                         ;; Instead of sleeping until topology is scheduled, rebalance topology so mk-assignments is called.
+                         (.rebalance (:nimbus cluster) "topology1" (doto (RebalanceOptions.) (.set_wait_secs 0)))
+                         ))
+         (is (empty? (:launched changed)))
+         (bind options (RebalanceOptions.))
+         (.set_wait_secs options 0)
+         (bind changed (capture-changed-workers
+                         (.rebalance (:nimbus cluster) "topology2" options)
+                         (advance-cluster-time cluster 10)
+                         (heartbeat-workers cluster "sup1" [1 2 3 4])
+                         (advance-cluster-time cluster 10)
+                         ))
+         (validate-launched-once (:launched changed)
+           {"sup1" [1 2]}
+           (get-storm-id (:storm-cluster-state cluster) "topology1"))
+         (validate-launched-once (:launched changed)
+           {"sup1" [3 4]}
+           (get-storm-id (:storm-cluster-state cluster) "topology2"))
 -        ))))
++        )))