You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by bo...@apache.org on 2016/02/24 17:17:28 UTC

[18/27] storm git commit: removed any clojure internals

removed any clojure internals


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/afcd0c6c
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/afcd0c6c
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/afcd0c6c

Branch: refs/heads/master
Commit: afcd0c6c53aca7f99d39f10b91a7b45fda424fe5
Parents: d546387
Author: xiaojian.fxj <xi...@alibaba-inc.com>
Authored: Fri Feb 19 15:21:48 2016 +0800
Committer: xiaojian.fxj <xi...@alibaba-inc.com>
Committed: Fri Feb 19 15:22:22 2016 +0800

----------------------------------------------------------------------
 .../src/clj/org/apache/storm/converter.clj      |  9 +++
 .../src/clj/org/apache/storm/daemon/nimbus.clj  |  4 +-
 .../clj/org/apache/storm/daemon/supervisor.clj  |  8 +-
 .../src/clj/org/apache/storm/daemon/worker.clj  |  6 +-
 storm-core/src/clj/org/apache/storm/testing.clj |  2 +-
 .../org/apache/storm/cluster/ClusterUtils.java  | 41 +++-------
 .../org/apache/storm/cluster/ExecutorBeat.java  | 44 +++++++++++
 .../org/apache/storm/cluster/IStateStorage.java | 11 ++-
 .../storm/cluster/IStormClusterState.java       | 24 +++---
 .../storm/cluster/PaceMakerStateStorage.java    |  4 +-
 .../cluster/PaceMakerStateStorageFactory.java   | 12 +--
 .../storm/cluster/StateStorageFactory.java      |  6 +-
 .../storm/cluster/StormClusterStateImpl.java    | 78 ++++++++++----------
 .../apache/storm/cluster/ZKStateStorage.java    |  3 +-
 .../storm/cluster/ZKStateStorageFactory.java    |  4 +-
 .../org/apache/storm/zookeeper/Zookeeper.java   | 46 +++---------
 16 files changed, 156 insertions(+), 146 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/afcd0c6c/storm-core/src/clj/org/apache/storm/converter.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/converter.clj b/storm-core/src/clj/org/apache/storm/converter.clj
index 18647b1..c845cd4 100644
--- a/storm-core/src/clj/org/apache/storm/converter.clj
+++ b/storm-core/src/clj/org/apache/storm/converter.clj
@@ -18,6 +18,7 @@
             StormBase TopologyStatus ClusterWorkerHeartbeat ExecutorInfo ErrorInfo Credentials RebalanceOptions KillOptions
             TopologyActionOptions DebugOptions ProfileRequest]
            [org.apache.storm.utils Utils])
+  (:import [org.apache.storm.cluster ExecutorBeat])
   (:use [org.apache.storm util stats log])
   (:require [org.apache.storm.daemon [common :as common]]))
 
@@ -238,6 +239,14 @@
      }
     {}))
 
+(defn clojurify-zk-executor-hb [^ExecutorBeat executor-hb]
+  (if executor-hb
+    {:stats (.getStats executor-hb)
+     :uptime (.getUptime executor-hb)
+     :time-secs (.getTimeSecs executor-hb)
+     }
+    {}))
+
 (defn thriftify-zk-worker-hb [worker-hb]
   (if (not-empty (filter second (:executor-stats worker-hb)))
     (doto (ClusterWorkerHeartbeat.)

http://git-wip-us.apache.org/repos/asf/storm/blob/afcd0c6c/storm-core/src/clj/org/apache/storm/daemon/nimbus.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/daemon/nimbus.clj b/storm-core/src/clj/org/apache/storm/daemon/nimbus.clj
index 6bdbdc0..beb6639 100644
--- a/storm-core/src/clj/org/apache/storm/daemon/nimbus.clj
+++ b/storm-core/src/clj/org/apache/storm/daemon/nimbus.clj
@@ -591,8 +591,8 @@
   (let [storm-cluster-state (:storm-cluster-state nimbus)
         executor-beats (let [executor-stats-java-map (.executorBeats storm-cluster-state storm-id (.get_executor_node_port (thriftify-assignment existing-assignment)))
                              executor-stats-clojurify (clojurify-structure executor-stats-java-map)]
-                         (->> (dofor [[^ExecutorInfo executor-info  executor-heartbeat] executor-stats-clojurify]
-                             {[(.get_task_start executor-info) (.get_task_end executor-info)] executor-heartbeat})
+                         (->> (dofor [[^ExecutorInfo executor-info  ^ExecutorBeat executor-heartbeat] executor-stats-clojurify]
+                             {[(.get_task_start executor-info) (.get_task_end executor-info)] (clojurify-zk-executor-hb executor-heartbeat)})
                            (apply merge)))
 
         cache (update-heartbeat-cache (@(:heartbeats-cache nimbus) storm-id)

http://git-wip-us.apache.org/repos/asf/storm/blob/afcd0c6c/storm-core/src/clj/org/apache/storm/daemon/supervisor.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/daemon/supervisor.clj b/storm-core/src/clj/org/apache/storm/daemon/supervisor.clj
index c1f058f..58f6291 100644
--- a/storm-core/src/clj/org/apache/storm/daemon/supervisor.clj
+++ b/storm-core/src/clj/org/apache/storm/daemon/supervisor.clj
@@ -20,7 +20,7 @@
                                    ConfigUtils]
            [org.apache.storm.daemon Shutdownable]
            [org.apache.storm Constants]
-           [org.apache.storm.cluster ClusterStateContext DaemonType StormClusterStateImpl ClusterUtils]
+           [org.apache.storm.cluster ClusterStateContext DaemonType StormClusterStateImpl ClusterUtils IStateStorage]
            [java.net JarURLConnection]
            [java.net URI URLDecoder]
            [org.apache.commons.io FileUtils])
@@ -69,8 +69,8 @@
                       (if (= assignment-version recorded-version)
                         {sid (get assignment-versions sid)}
                         (let [thriftify-assignment-version (.assignmentInfoWithVersion storm-cluster-state sid callback)
-                              assignment (clojurify-assignment (:data thriftify-assignment-version))]
-                        {sid {:data assignment :version (:version thriftify-assignment-version)}}))
+                              assignment (clojurify-assignment (.get thriftify-assignment-version (IStateStorage/DATA)))]
+                        {sid {:data assignment :version (.get thriftify-assignment-version (IStateStorage/VERSION))}}))
                       {sid nil})))
            (apply merge)
            (filter-val not-nil?))
@@ -1184,7 +1184,7 @@
       (.readBlobTo blob-store (ConfigUtils/masterStormConfKey storm-id) (FileOutputStream. (ConfigUtils/supervisorStormConfPath tmproot)) nil)
       (finally
         (.shutdown blob-store)))
-    (try (FileUtils/moveDirectory (File. tmproot) (File. stormroot)) (catch Exception e))
+    (FileUtils/moveDirectory (File. tmproot) (File. stormroot))
 
     (setup-storm-code-dir conf (clojurify-structure (ConfigUtils/readSupervisorStormConf conf storm-id)) stormroot)
     (let [classloader (.getContextClassLoader (Thread/currentThread))

http://git-wip-us.apache.org/repos/asf/storm/blob/afcd0c6c/storm-core/src/clj/org/apache/storm/daemon/worker.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/daemon/worker.clj b/storm-core/src/clj/org/apache/storm/daemon/worker.clj
index b80cd9e..395be23 100644
--- a/storm-core/src/clj/org/apache/storm/daemon/worker.clj
+++ b/storm-core/src/clj/org/apache/storm/daemon/worker.clj
@@ -38,7 +38,7 @@
   (:import [org.apache.storm.task WorkerTopologyContext])
   (:import [org.apache.storm Constants])
   (:import [org.apache.storm.security.auth AuthUtils])
-  (:import [org.apache.storm.cluster ClusterStateContext DaemonType ZKStateStorage StormClusterStateImpl ClusterUtils])
+  (:import [org.apache.storm.cluster ClusterStateContext DaemonType ZKStateStorage StormClusterStateImpl ClusterUtils IStateStorage])
   (:import [javax.security.auth Subject])
   (:import [java.security PrivilegedExceptionAction])
   (:import [org.apache.logging.log4j LogManager])
@@ -381,8 +381,8 @@
          (let [version (.assignmentVersion storm-cluster-state storm-id callback)
                assignment (if (= version (:version (get @(:assignment-versions worker) storm-id)))
                             (:data (get @(:assignment-versions worker) storm-id))
-                            (let [java-assignment (.assignmentInfoWithVersion storm-cluster-state storm-id callback)
-                              new-assignment {:data (clojurify-assignment (:data java-assignment)) :version version}]
+                            (let [thriftify-assignment-version (.assignmentInfoWithVersion storm-cluster-state storm-id callback)
+                              new-assignment {:data (clojurify-assignment (.get thriftify-assignment-version (IStateStorage/DATA))) :version version}]
                               (swap! (:assignment-versions worker) assoc storm-id new-assignment)
                               (:data new-assignment)))
               my-assignment (-> assignment

http://git-wip-us.apache.org/repos/asf/storm/blob/afcd0c6c/storm-core/src/clj/org/apache/storm/testing.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/testing.clj b/storm-core/src/clj/org/apache/storm/testing.clj
index eef7754..3dee54b 100644
--- a/storm-core/src/clj/org/apache/storm/testing.clj
+++ b/storm-core/src/clj/org/apache/storm/testing.clj
@@ -448,7 +448,7 @@
                            component->tasks)
         task-ids (apply concat (vals component->tasks))
         assignment (clojurify-assignment (.assignmentInfo state storm-id nil))
-        taskbeats (.taskbeats state storm-id (:task->node+port assignment))  ;hava question?
+        taskbeats (.taskbeats state storm-id (:task->node+port assignment))
         heartbeats (dofor [id task-ids] (get taskbeats id))
         stats (dofor [hb heartbeats] (if hb (stat-key (:stats hb)) 0))]
     (reduce + stats)))

http://git-wip-us.apache.org/repos/asf/storm/blob/afcd0c6c/storm-core/src/jvm/org/apache/storm/cluster/ClusterUtils.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/cluster/ClusterUtils.java b/storm-core/src/jvm/org/apache/storm/cluster/ClusterUtils.java
index 0c663f0..aae4231 100644
--- a/storm-core/src/jvm/org/apache/storm/cluster/ClusterUtils.java
+++ b/storm-core/src/jvm/org/apache/storm/cluster/ClusterUtils.java
@@ -17,9 +17,6 @@
  */
 package org.apache.storm.cluster;
 
-import clojure.lang.APersistentMap;
-import clojure.lang.PersistentArrayMap;
-import clojure.lang.RT;
 import org.apache.storm.Config;
 import org.apache.storm.generated.ClusterWorkerHeartbeat;
 import org.apache.storm.generated.ExecutorInfo;
@@ -192,14 +189,15 @@ public class ClusterUtils {
      * @param workerHeartbeat
      * @return
      */
-    public static Map<ExecutorInfo, APersistentMap> convertExecutorBeats(List<ExecutorInfo> executors, ClusterWorkerHeartbeat workerHeartbeat) {
-        Map<ExecutorInfo, APersistentMap> executorWhb = new HashMap<>();
+    public static Map<ExecutorInfo, ExecutorBeat> convertExecutorBeats(List<ExecutorInfo> executors, ClusterWorkerHeartbeat workerHeartbeat) {
+        Map<ExecutorInfo, ExecutorBeat> executorWhb = new HashMap<>();
         Map<ExecutorInfo, ExecutorStats> executorStatsMap = workerHeartbeat.get_executor_stats();
         for (ExecutorInfo executor : executors) {
             if (executorStatsMap.containsKey(executor)) {
-                APersistentMap executorBeat =
-                        new PersistentArrayMap(new Object[] { RT.keyword(null, "time-secs"), workerHeartbeat.get_time_secs(), RT.keyword(null, "uptime"),
-                                workerHeartbeat.get_uptime_secs(), RT.keyword(null, "stats"), workerHeartbeat.get_executor_stats().get(executor) });
+                int time = workerHeartbeat.get_time_secs();
+                int uptime = workerHeartbeat.get_uptime_secs();
+                ExecutorStats executorStats = workerHeartbeat.get_executor_stats().get(executor);
+                ExecutorBeat executorBeat = new ExecutorBeat(time, uptime, executorStats);
                 executorWhb.put(executor, executorBeat);
             }
         }
@@ -210,13 +208,13 @@ public class ClusterUtils {
         if (stateStorage instanceof IStateStorage) {
             return new StormClusterStateImpl((IStateStorage) stateStorage, acls, context, false);
         } else {
-            IStateStorage Storage = _instance.mkStateStorageImpl((APersistentMap) stateStorage, (APersistentMap) stateStorage, acls, context);
+            IStateStorage Storage = _instance.mkStateStorageImpl((Map) stateStorage, (Map) stateStorage, acls, context);
             return new StormClusterStateImpl(Storage, acls, context, true);
         }
 
     }
 
-    public IStateStorage mkStateStorageImpl(APersistentMap config, APersistentMap auth_conf, List<ACL> acls, ClusterStateContext context) throws Exception {
+    public IStateStorage mkStateStorageImpl(Map config, Map auth_conf, List<ACL> acls, ClusterStateContext context) throws Exception {
         String className = null;
         IStateStorage stateStorage = null;
         if (config.get(Config.STORM_CLUSTER_STATE_STORE) != null) {
@@ -230,7 +228,7 @@ public class ClusterUtils {
         return stateStorage;
     }
 
-    public static IStateStorage mkStateStorage(APersistentMap config, APersistentMap auth_conf, List<ACL> acls, ClusterStateContext context) throws Exception {
+    public static IStateStorage mkStateStorage(Map config, Map auth_conf, List<ACL> acls, ClusterStateContext context) throws Exception {
         return _instance.mkStateStorageImpl(config, auth_conf, acls, context);
     }
 
@@ -238,26 +236,7 @@ public class ClusterUtils {
         return _instance.mkStormClusterStateImpl(StateStorage, acls, context);
     }
 
-    // TO be remove
-    public static <K, V> HashMap<V, List<K>> reverseMap(Map<K, V> map) {
-        HashMap<V, List<K>> rtn = new HashMap<V, List<K>>();
-        if (map == null) {
-            return rtn;
-        }
-        for (Map.Entry<K, V> entry : map.entrySet()) {
-            K key = entry.getKey();
-            V val = entry.getValue();
-            List<K> list = rtn.get(val);
-            if (list == null) {
-                list = new ArrayList<K>();
-                rtn.put(entry.getValue(), list);
-            }
-            list.add(key);
-        }
-        return rtn;
-    }
-
-    public static String StringifyError(Throwable error) {
+    public static String stringifyError(Throwable error) {
         String errorString = null;
         StringWriter result = null;
         PrintWriter printWriter = null;

http://git-wip-us.apache.org/repos/asf/storm/blob/afcd0c6c/storm-core/src/jvm/org/apache/storm/cluster/ExecutorBeat.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/cluster/ExecutorBeat.java b/storm-core/src/jvm/org/apache/storm/cluster/ExecutorBeat.java
new file mode 100644
index 0000000..b32615e
--- /dev/null
+++ b/storm-core/src/jvm/org/apache/storm/cluster/ExecutorBeat.java
@@ -0,0 +1,44 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.cluster;
+
+import org.apache.storm.generated.ExecutorStats;
+
+public class ExecutorBeat {
+    private final int timeSecs;
+    private final int uptime;
+    private final ExecutorStats stats;
+
+    public ExecutorBeat(int timeSecs, int uptime, ExecutorStats stats) {
+        this.timeSecs = timeSecs;
+        this.uptime = uptime;
+        this.stats = stats;
+    }
+
+    public int getTimeSecs() {
+        return timeSecs;
+    }
+
+    public int getUptime() {
+        return uptime;
+    }
+
+    public ExecutorStats getStats() {
+        return stats;
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/afcd0c6c/storm-core/src/jvm/org/apache/storm/cluster/IStateStorage.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/cluster/IStateStorage.java b/storm-core/src/jvm/org/apache/storm/cluster/IStateStorage.java
index 1a2b14f..0b6f043 100644
--- a/storm-core/src/jvm/org/apache/storm/cluster/IStateStorage.java
+++ b/storm-core/src/jvm/org/apache/storm/cluster/IStateStorage.java
@@ -17,8 +17,8 @@
  */
 package org.apache.storm.cluster;
 
-import clojure.lang.APersistentMap;
 import java.util.List;
+import java.util.Map;
 
 import org.apache.curator.framework.state.ConnectionStateListener;
 import org.apache.storm.callback.ZKStateChangedCallback;
@@ -42,6 +42,9 @@ import org.apache.zookeeper.data.ACL;
  */
 public interface IStateStorage {
 
+    public static final String DATA = "data";
+    public static final String VERSION = "version";
+
     /**
      * Registers a callback function that gets called when CuratorEvents happen.
      * @param callback is a clojure IFn that accepts the type - translated to
@@ -149,14 +152,14 @@ public interface IStateStorage {
 
     /**
      * Get the data at the node along with its version. Data is returned
-     * in an APersistentMap with clojure keyword keys :data and :version.
+     * in an Map with the keys data and version.
      * @param path The path to look under
      * @param watch Whether or not to set a watch on the path. Watched paths
      * emit events which are consumed by functions registered with the
      * register method. Very useful for catching updates to nodes.
-     * @return An APersistentMap in the form {:data data :version version}
+     * @return An Map in the form {:data data :version version}
      */
-    APersistentMap get_data_with_version(String path, boolean watch);
+    Map get_data_with_version(String path, boolean watch);
 
     /**
      * Write a worker heartbeat at the path.

http://git-wip-us.apache.org/repos/asf/storm/blob/afcd0c6c/storm-core/src/jvm/org/apache/storm/cluster/IStormClusterState.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/cluster/IStormClusterState.java b/storm-core/src/jvm/org/apache/storm/cluster/IStormClusterState.java
index 01cf56a..c88935e 100644
--- a/storm-core/src/jvm/org/apache/storm/cluster/IStormClusterState.java
+++ b/storm-core/src/jvm/org/apache/storm/cluster/IStormClusterState.java
@@ -17,8 +17,6 @@
  */
 package org.apache.storm.cluster;
 
-import clojure.lang.APersistentMap;
-import clojure.lang.IFn;
 import org.apache.storm.generated.*;
 import org.apache.storm.nimbus.NimbusInfo;
 
@@ -27,13 +25,13 @@ import java.util.List;
 import java.util.Map;
 
 public interface IStormClusterState {
-    public List<String> assignments(IFn callback);
+    public List<String> assignments(Runnable callback);
 
-    public Assignment assignmentInfo(String stormId, IFn callback);
+    public Assignment assignmentInfo(String stormId, Runnable callback);
 
-    public APersistentMap assignmentInfoWithVersion(String stormId, IFn callback);
+    public Map assignmentInfoWithVersion(String stormId, Runnable callback);
 
-    public Integer assignmentVersion(String stormId, IFn callback) throws Exception;
+    public Integer assignmentVersion(String stormId, Runnable callback) throws Exception;
 
     public List<String> blobstoreInfo(String blobKey);
 
@@ -43,7 +41,7 @@ public interface IStormClusterState {
 
     public List<String> activeStorms();
 
-    public StormBase stormBase(String stormId, IFn callback);
+    public StormBase stormBase(String stormId, Runnable callback);
 
     public ClusterWorkerHeartbeat getWorkerHeartbeat(String stormId, String node, Long port);
 
@@ -55,9 +53,9 @@ public interface IStormClusterState {
 
     public void deleteTopologyProfileRequests(String stormId, ProfileRequest profileRequest);
 
-    public Map<ExecutorInfo, APersistentMap> executorBeats(String stormId, Map<List<Long>, NodeInfo> executorNodePort);
+    public Map<ExecutorInfo, ExecutorBeat> executorBeats(String stormId, Map<List<Long>, NodeInfo> executorNodePort);
 
-    public List<String> supervisors(IFn callback);
+    public List<String> supervisors(Runnable callback);
 
     public SupervisorInfo supervisorInfo(String supervisorId); // returns nil if doesn't exist
 
@@ -73,7 +71,7 @@ public interface IStormClusterState {
 
     public void setTopologyLogConfig(String stormId, LogConfig logConfig);
 
-    public LogConfig topologyLogConfig(String stormId, IFn cb);
+    public LogConfig topologyLogConfig(String stormId, Runnable cb);
 
     public void workerHeartbeat(String stormId, String node, Long port, ClusterWorkerHeartbeat info);
 
@@ -83,7 +81,7 @@ public interface IStormClusterState {
 
     public void workerBackpressure(String stormId, String node, Long port, boolean on);
 
-    public boolean topologyBackpressure(String stormId, IFn callback);
+    public boolean topologyBackpressure(String stormId, Runnable callback);
 
     public void setupBackpressure(String stormId);
 
@@ -101,7 +99,7 @@ public interface IStormClusterState {
 
     public List<String> activeKeys();
 
-    public List<String> blobstore(IFn callback);
+    public List<String> blobstore(Runnable callback);
 
     public void removeStorm(String stormId);
 
@@ -117,7 +115,7 @@ public interface IStormClusterState {
 
     public void setCredentials(String stormId, Credentials creds, Map topoConf) throws NoSuchAlgorithmException;
 
-    public Credentials credentials(String stormId, IFn callback);
+    public Credentials credentials(String stormId, Runnable callback);
 
     public void disconnect();
 

http://git-wip-us.apache.org/repos/asf/storm/blob/afcd0c6c/storm-core/src/jvm/org/apache/storm/cluster/PaceMakerStateStorage.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/cluster/PaceMakerStateStorage.java b/storm-core/src/jvm/org/apache/storm/cluster/PaceMakerStateStorage.java
index a9c4d89..c29078e 100644
--- a/storm-core/src/jvm/org/apache/storm/cluster/PaceMakerStateStorage.java
+++ b/storm-core/src/jvm/org/apache/storm/cluster/PaceMakerStateStorage.java
@@ -17,7 +17,6 @@
  */
 package org.apache.storm.cluster;
 
-import clojure.lang.APersistentMap;
 import org.apache.curator.framework.state.ConnectionStateListener;
 import org.apache.storm.callback.ZKStateChangedCallback;
 import org.apache.storm.generated.*;
@@ -28,6 +27,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.util.List;
+import java.util.Map;
 
 public class PaceMakerStateStorage implements IStateStorage {
 
@@ -104,7 +104,7 @@ public class PaceMakerStateStorage implements IStateStorage {
     }
 
     @Override
-    public APersistentMap get_data_with_version(String path, boolean watch) {
+    public Map get_data_with_version(String path, boolean watch) {
         return stateStorage.get_data_with_version(path, watch);
     }
 

http://git-wip-us.apache.org/repos/asf/storm/blob/afcd0c6c/storm-core/src/jvm/org/apache/storm/cluster/PaceMakerStateStorageFactory.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/cluster/PaceMakerStateStorageFactory.java b/storm-core/src/jvm/org/apache/storm/cluster/PaceMakerStateStorageFactory.java
index eafd2e7..3111e04 100644
--- a/storm-core/src/jvm/org/apache/storm/cluster/PaceMakerStateStorageFactory.java
+++ b/storm-core/src/jvm/org/apache/storm/cluster/PaceMakerStateStorageFactory.java
@@ -17,12 +17,12 @@
  */
 package org.apache.storm.cluster;
 
-import clojure.lang.APersistentMap;
 import org.apache.storm.pacemaker.PacemakerClient;
 import org.apache.storm.utils.Utils;
 import org.apache.zookeeper.data.ACL;
 
 import java.util.List;
+import java.util.Map;
 
 public class PaceMakerStateStorageFactory implements StateStorageFactory {
 
@@ -38,7 +38,7 @@ public class PaceMakerStateStorageFactory implements StateStorageFactory {
     }
 
     @Override
-    public IStateStorage mkStore(APersistentMap config, APersistentMap auth_conf, List<ACL> acls, ClusterStateContext context) {
+    public IStateStorage mkStore(Map config, Map auth_conf, List<ACL> acls, ClusterStateContext context) {
         try {
             return new PaceMakerStateStorage(initMakeClient(config), initZKstate(config, auth_conf, acls, context));
         } catch (Exception e) {
@@ -46,19 +46,19 @@ public class PaceMakerStateStorageFactory implements StateStorageFactory {
         }
     }
 
-    public static IStateStorage initZKstate(APersistentMap config, APersistentMap auth_conf, List<ACL> acls, ClusterStateContext context) throws Exception {
+    public static IStateStorage initZKstate(Map config, Map auth_conf, List<ACL> acls, ClusterStateContext context) throws Exception {
         return _instance.initZKstateImpl(config, auth_conf, acls, context);
     }
 
-    public static PacemakerClient initMakeClient(APersistentMap config) {
+    public static PacemakerClient initMakeClient(Map config) {
         return _instance.initMakeClientImpl(config);
     }
 
-    public IStateStorage initZKstateImpl(APersistentMap config, APersistentMap auth_conf, List<ACL> acls, ClusterStateContext context) throws Exception {
+    public IStateStorage initZKstateImpl(Map config, Map auth_conf, List<ACL> acls, ClusterStateContext context) throws Exception {
         return ClusterUtils.mkStateStorage(config, auth_conf, acls, context);
     }
 
-    public PacemakerClient initMakeClientImpl(APersistentMap config) {
+    public PacemakerClient initMakeClientImpl(Map config) {
         return new PacemakerClient(config);
     }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/afcd0c6c/storm-core/src/jvm/org/apache/storm/cluster/StateStorageFactory.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/cluster/StateStorageFactory.java b/storm-core/src/jvm/org/apache/storm/cluster/StateStorageFactory.java
index 110da41..0929750 100644
--- a/storm-core/src/jvm/org/apache/storm/cluster/StateStorageFactory.java
+++ b/storm-core/src/jvm/org/apache/storm/cluster/StateStorageFactory.java
@@ -17,12 +17,12 @@
  */
 package org.apache.storm.cluster;
 
-import clojure.lang.APersistentMap;
 import java.util.List;
+import java.util.Map;
+
 import org.apache.zookeeper.data.ACL;
 
 public interface StateStorageFactory {
 
-    IStateStorage mkStore(APersistentMap config, APersistentMap auth_conf, List<ACL> acls, ClusterStateContext context);
-
+    IStateStorage mkStore(Map config, Map auth_conf, List<ACL> acls, ClusterStateContext context);
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/afcd0c6c/storm-core/src/jvm/org/apache/storm/cluster/StormClusterStateImpl.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/cluster/StormClusterStateImpl.java b/storm-core/src/jvm/org/apache/storm/cluster/StormClusterStateImpl.java
index 17c8641..5fa586a 100644
--- a/storm-core/src/jvm/org/apache/storm/cluster/StormClusterStateImpl.java
+++ b/storm-core/src/jvm/org/apache/storm/cluster/StormClusterStateImpl.java
@@ -34,8 +34,6 @@ import org.apache.zookeeper.data.ACL;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.PrintWriter;
-import java.io.StringWriter;
 import java.security.NoSuchAlgorithmException;
 import java.util.*;
 import java.util.concurrent.ConcurrentHashMap;
@@ -47,17 +45,17 @@ public class StormClusterStateImpl implements IStormClusterState {
 
     private IStateStorage stateStorage;
 
-    private ConcurrentHashMap<String, IFn> assignmentInfoCallback;
-    private ConcurrentHashMap<String, IFn> assignmentInfoWithVersionCallback;
-    private ConcurrentHashMap<String, IFn> assignmentVersionCallback;
-    private AtomicReference<IFn> supervisorsCallback;
+    private ConcurrentHashMap<String, Runnable> assignmentInfoCallback;
+    private ConcurrentHashMap<String, Runnable> assignmentInfoWithVersionCallback;
+    private ConcurrentHashMap<String, Runnable> assignmentVersionCallback;
+    private AtomicReference<Runnable> supervisorsCallback;
     // we want to reigister a topo directory getChildren callback for all workers of this dir
-    private ConcurrentHashMap<String, IFn> backPressureCallback;
-    private AtomicReference<IFn> assignmentsCallback;
-    private ConcurrentHashMap<String, IFn> stormBaseCallback;
-    private AtomicReference<IFn> blobstoreCallback;
-    private ConcurrentHashMap<String, IFn> credentialsCallback;
-    private ConcurrentHashMap<String, IFn> logConfigCallback;
+    private ConcurrentHashMap<String, Runnable> backPressureCallback;
+    private AtomicReference<Runnable> assignmentsCallback;
+    private ConcurrentHashMap<String, Runnable> stormBaseCallback;
+    private AtomicReference<Runnable> blobstoreCallback;
+    private ConcurrentHashMap<String, Runnable> credentialsCallback;
+    private ConcurrentHashMap<String, Runnable> logConfigCallback;
 
     private List<ACL> acls;
     private String stateId;
@@ -129,20 +127,20 @@ public class StormClusterStateImpl implements IStormClusterState {
 
     }
 
-    protected void issueCallback(AtomicReference<IFn> cb) {
-        IFn callback = cb.getAndSet(null);
+    protected void issueCallback(AtomicReference<Runnable> cb) {
+        Runnable callback = cb.getAndSet(null);
         if (callback != null)
-            callback.invoke();
+            callback.run();
     }
 
-    protected void issueMapCallback(ConcurrentHashMap<String, IFn> callbackConcurrentHashMap, String key) {
-        IFn callback = callbackConcurrentHashMap.remove(key);
+    protected void issueMapCallback(ConcurrentHashMap<String, Runnable> callbackConcurrentHashMap, String key) {
+        Runnable callback = callbackConcurrentHashMap.remove(key);
         if (callback != null)
-            callback.invoke();
+            callback.run();
     }
 
     @Override
-    public List<String> assignments(IFn callback) {
+    public List<String> assignments(Runnable callback) {
         if (callback != null) {
             assignmentsCallback.set(callback);
         }
@@ -150,7 +148,7 @@ public class StormClusterStateImpl implements IStormClusterState {
     }
 
     @Override
-    public Assignment assignmentInfo(String stormId, IFn callback) {
+    public Assignment assignmentInfo(String stormId, Runnable callback) {
         if (callback != null) {
             assignmentInfoCallback.put(stormId, callback);
         }
@@ -159,23 +157,25 @@ public class StormClusterStateImpl implements IStormClusterState {
     }
 
     @Override
-    public APersistentMap assignmentInfoWithVersion(String stormId, IFn callback) {
+    public Map assignmentInfoWithVersion(String stormId, Runnable callback) {
+        Map map = new HashMap();
         if (callback != null) {
             assignmentInfoWithVersionCallback.put(stormId, callback);
         }
         Assignment assignment = null;
         Integer version = 0;
-        APersistentMap aPersistentMap = stateStorage.get_data_with_version(ClusterUtils.assignmentPath(stormId), callback != null);
-        if (aPersistentMap != null) {
-            assignment = ClusterUtils.maybeDeserialize((byte[]) aPersistentMap.get(RT.keyword(null, "data")), Assignment.class);
-            version = (Integer) aPersistentMap.get(RT.keyword(null, "version"));
+        Map dataWithVersionMap = stateStorage.get_data_with_version(ClusterUtils.assignmentPath(stormId), callback != null);
+        if (dataWithVersionMap != null) {
+            assignment = ClusterUtils.maybeDeserialize((byte[]) dataWithVersionMap.get(IStateStorage.DATA), Assignment.class);
+            version = (Integer) dataWithVersionMap.get(IStateStorage.VERSION);
         }
-        APersistentMap map = new PersistentArrayMap(new Object[] { RT.keyword(null, "data"), assignment, RT.keyword(null, "version"), version });
+        map.put(IStateStorage.DATA, assignment);
+        map.put(IStateStorage.VERSION, version);
         return map;
     }
 
     @Override
-    public Integer assignmentVersion(String stormId, IFn callback) throws Exception {
+    public Integer assignmentVersion(String stormId, Runnable callback) throws Exception {
         if (callback != null) {
             assignmentVersionCallback.put(stormId, callback);
         }
@@ -227,7 +227,7 @@ public class StormClusterStateImpl implements IStormClusterState {
     }
 
     @Override
-    public StormBase stormBase(String stormId, IFn callback) {
+    public StormBase stormBase(String stormId, Runnable callback) {
         if (callback != null) {
             stormBaseCallback.put(stormId, callback);
         }
@@ -298,10 +298,10 @@ public class StormClusterStateImpl implements IStormClusterState {
      * @return
      */
     @Override
-    public Map<ExecutorInfo, APersistentMap> executorBeats(String stormId, Map<List<Long>, NodeInfo> executorNodePort) {
-        Map<ExecutorInfo, APersistentMap> executorWhbs = new HashMap<>();
+    public Map<ExecutorInfo, ExecutorBeat> executorBeats(String stormId, Map<List<Long>, NodeInfo> executorNodePort) {
+        Map<ExecutorInfo, ExecutorBeat> executorWhbs = new HashMap<>();
 
-        Map<NodeInfo, List<List<Long>>> nodePortExecutors = ClusterUtils.reverseMap(executorNodePort);
+        Map<NodeInfo, List<List<Long>>> nodePortExecutors = Utils.reverseMap(executorNodePort);
 
         for (Map.Entry<NodeInfo, List<List<Long>>> entry : nodePortExecutors.entrySet()) {
 
@@ -319,7 +319,7 @@ public class StormClusterStateImpl implements IStormClusterState {
     }
 
     @Override
-    public List<String> supervisors(IFn callback) {
+    public List<String> supervisors(Runnable callback) {
         if (callback != null) {
             supervisorsCallback.set(callback);
         }
@@ -342,7 +342,7 @@ public class StormClusterStateImpl implements IStormClusterState {
         try {
             stateStorage.delete_worker_hb(ClusterUtils.workerbeatStormRoot(stormId));
         } catch (Exception e) {
-            if (Zookeeper.exceptionCause(KeeperException.class, e)) {
+            if (Utils.exceptionCauseIsInstanceOf(KeeperException.class, e)) {
                 // do nothing
                 LOG.warn("Could not teardown heartbeats for {}.", stormId);
             } else {
@@ -356,7 +356,7 @@ public class StormClusterStateImpl implements IStormClusterState {
         try {
             stateStorage.delete_node(ClusterUtils.errorStormRoot(stormId));
         } catch (Exception e) {
-            if (Zookeeper.exceptionCause(KeeperException.class, e)) {
+            if (Utils.exceptionCauseIsInstanceOf(KeeperException.class, e)) {
                 // do nothing
                 LOG.warn("Could not teardown errors for {}.", stormId);
             } else {
@@ -381,7 +381,7 @@ public class StormClusterStateImpl implements IStormClusterState {
     }
 
     @Override
-    public LogConfig topologyLogConfig(String stormId, IFn cb) {
+    public LogConfig topologyLogConfig(String stormId, Runnable cb) {
         String path = ClusterUtils.logConfigPath(stormId);
         return ClusterUtils.maybeDeserialize(stateStorage.get_data(path, cb != null), LogConfig.class);
     }
@@ -437,7 +437,7 @@ public class StormClusterStateImpl implements IStormClusterState {
      * @return
      */
     @Override
-    public boolean topologyBackpressure(String stormId, IFn callback) {
+    public boolean topologyBackpressure(String stormId, Runnable callback) {
         if (callback != null) {
             backPressureCallback.put(stormId, callback);
         }
@@ -568,7 +568,7 @@ public class StormClusterStateImpl implements IStormClusterState {
 
     // blobstore state
     @Override
-    public List<String> blobstore(IFn callback) {
+    public List<String> blobstore(Runnable callback) {
         if (callback != null) {
             blobstoreCallback.set(callback);
         }
@@ -602,7 +602,7 @@ public class StormClusterStateImpl implements IStormClusterState {
 
         String path = ClusterUtils.errorPath(stormId, componentId);
         String lastErrorPath = ClusterUtils.lastErrorPath(stormId, componentId);
-        ErrorInfo errorInfo = new ErrorInfo(ClusterUtils.StringifyError(error), Time.currentTimeSecs());
+        ErrorInfo errorInfo = new ErrorInfo(ClusterUtils.stringifyError(error), Time.currentTimeSecs());
         errorInfo.set_host(node);
         errorInfo.set_port(port.intValue());
         byte[] serData = Utils.serialize(errorInfo);
@@ -669,7 +669,7 @@ public class StormClusterStateImpl implements IStormClusterState {
     }
 
     @Override
-    public Credentials credentials(String stormId, IFn callback) {
+    public Credentials credentials(String stormId, Runnable callback) {
         if (callback != null) {
             credentialsCallback.put(stormId, callback);
         }

http://git-wip-us.apache.org/repos/asf/storm/blob/afcd0c6c/storm-core/src/jvm/org/apache/storm/cluster/ZKStateStorage.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/cluster/ZKStateStorage.java b/storm-core/src/jvm/org/apache/storm/cluster/ZKStateStorage.java
index b277751..56115ce 100644
--- a/storm-core/src/jvm/org/apache/storm/cluster/ZKStateStorage.java
+++ b/storm-core/src/jvm/org/apache/storm/cluster/ZKStateStorage.java
@@ -17,7 +17,6 @@
  */
 package org.apache.storm.cluster;
 
-import clojure.lang.APersistentMap;
 import org.apache.curator.framework.CuratorFramework;
 import org.apache.curator.framework.state.*;
 import org.apache.curator.framework.state.ConnectionState;
@@ -220,7 +219,7 @@ public class ZKStateStorage implements IStateStorage {
     }
 
     @Override
-    public APersistentMap get_data_with_version(String path, boolean watch) {
+    public Map get_data_with_version(String path, boolean watch) {
         return Zookeeper.getDataWithVersion(zkReader, path, watch);
     }
 

http://git-wip-us.apache.org/repos/asf/storm/blob/afcd0c6c/storm-core/src/jvm/org/apache/storm/cluster/ZKStateStorageFactory.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/cluster/ZKStateStorageFactory.java b/storm-core/src/jvm/org/apache/storm/cluster/ZKStateStorageFactory.java
index 956c20e..232488b 100644
--- a/storm-core/src/jvm/org/apache/storm/cluster/ZKStateStorageFactory.java
+++ b/storm-core/src/jvm/org/apache/storm/cluster/ZKStateStorageFactory.java
@@ -17,16 +17,16 @@
  */
 package org.apache.storm.cluster;
 
-import clojure.lang.APersistentMap;
 import org.apache.storm.utils.Utils;
 import org.apache.zookeeper.data.ACL;
 
 import java.util.List;
+import java.util.Map;
 
 public class ZKStateStorageFactory implements StateStorageFactory {
 
     @Override
-    public IStateStorage mkStore(APersistentMap config, APersistentMap auth_conf, List<ACL> acls, ClusterStateContext context) {
+    public IStateStorage mkStore(Map config, Map auth_conf, List<ACL> acls, ClusterStateContext context) {
         try {
             return new ZKStateStorage(config, auth_conf, acls, context);
         } catch (Exception e) {

http://git-wip-us.apache.org/repos/asf/storm/blob/afcd0c6c/storm-core/src/jvm/org/apache/storm/zookeeper/Zookeeper.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/zookeeper/Zookeeper.java b/storm-core/src/jvm/org/apache/storm/zookeeper/Zookeeper.java
index f80b0a4..e5b2666 100644
--- a/storm-core/src/jvm/org/apache/storm/zookeeper/Zookeeper.java
+++ b/storm-core/src/jvm/org/apache/storm/zookeeper/Zookeeper.java
@@ -17,15 +17,11 @@
  */
 package org.apache.storm.zookeeper;
 
-import clojure.lang.APersistentMap;
-import clojure.lang.PersistentArrayMap;
-import clojure.lang.RT;
 import org.apache.commons.lang.StringUtils;
 import org.apache.curator.framework.CuratorFramework;
 import org.apache.curator.framework.api.CuratorEvent;
 import org.apache.curator.framework.api.CuratorEventType;
 import org.apache.curator.framework.api.CuratorListener;
-import org.apache.curator.framework.api.UnhandledErrorListener;
 import org.apache.curator.framework.recipes.leader.LeaderLatch;
 import org.apache.curator.framework.recipes.leader.LeaderLatchListener;
 import org.apache.curator.framework.recipes.leader.Participant;
@@ -33,6 +29,7 @@ import org.apache.curator.framework.state.ConnectionStateListener;
 import org.apache.storm.Config;
 import org.apache.storm.callback.DefaultWatcherCallBack;
 import org.apache.storm.callback.WatcherCallBack;
+import org.apache.storm.cluster.IStateStorage;
 import org.apache.storm.nimbus.ILeaderElector;
 import org.apache.storm.nimbus.NimbusInfo;
 import org.apache.storm.utils.Utils;
@@ -47,17 +44,13 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.File;
-import java.io.IOException;
 import java.net.BindException;
 import java.net.InetAddress;
 import java.net.InetSocketAddress;
 import java.net.UnknownHostException;
-import java.util.Arrays;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
+import java.util.*;
 import java.util.concurrent.atomic.AtomicReference;
-import java.util.Vector;
+
 
 public class Zookeeper {
     private static Logger LOG = LoggerFactory.getLogger(Zookeeper.class);
@@ -169,7 +162,7 @@ public class Zookeeper {
                 zk.delete().deletingChildrenIfNeeded().forPath(normalizePath(path));
             }
         } catch (Exception e) {
-            if (exceptionCause(KeeperException.NodeExistsException.class, e)) {
+            if (Utils.exceptionCauseIsInstanceOf(KeeperException.NodeExistsException.class, e)) {
                 // do nothing
                 LOG.info("delete {} failed.", path, e);
             } else {
@@ -195,7 +188,7 @@ public class Zookeeper {
         try {
             createNode(zk, npath, byteArray, org.apache.zookeeper.CreateMode.PERSISTENT, acls);
         } catch (Exception e) {
-            if (exceptionCause(KeeperException.NodeExistsException.class, e)) {
+            if (Utils.exceptionCauseIsInstanceOf(KeeperException.NodeExistsException.class, e)) {
                 // this can happen when multiple clients doing mkdir at same time
             }
         }
@@ -224,7 +217,7 @@ public class Zookeeper {
                 }
             }
         } catch (Exception e) {
-            if (exceptionCause(KeeperException.NoNodeException.class, e)) {
+            if (Utils.exceptionCauseIsInstanceOf(KeeperException.NoNodeException.class, e)) {
                 // this is fine b/c we still have a watch from the successful exists call
             } else {
                 throw Utils.wrapInRuntime(e);
@@ -312,7 +305,7 @@ public class Zookeeper {
         }
         LOG.info("Starting inprocess zookeeper at port {} and dir {}", report, localdir);
         factory.startup(zk);
-        return Arrays.asList((Object)new Long(report), (Object)factory);
+        return Arrays.asList((Object) new Long(report), (Object) factory);
     }
 
     public static void shutdownInprocessZookeeper(NIOServerCnxnFactory handle) {
@@ -361,9 +354,8 @@ public class Zookeeper {
         return new LeaderElectorImp(conf, servers, zk, leaderLockPath, id, leaderLatchAtomicReference, leaderLatchListenerAtomicReference);
     }
 
-    // To update @return to be a Map
-    public static APersistentMap getDataWithVersion(CuratorFramework zk, String path, boolean watch) {
-        APersistentMap map = null;
+    public static Map getDataWithVersion(CuratorFramework zk, String path, boolean watch) {
+        Map map = new HashMap();
         try {
             byte[] bytes = null;
             Stat stats = new Stat();
@@ -376,11 +368,12 @@ public class Zookeeper {
                 }
                 if (bytes != null) {
                     int version = stats.getVersion();
-                    map = new PersistentArrayMap(new Object[] { RT.keyword(null, "data"), bytes, RT.keyword(null, "version"), version });
+                    map.put(IStateStorage.DATA, bytes);
+                    map.put(IStateStorage.VERSION, version);
                 }
             }
         } catch (Exception e) {
-            if (exceptionCause(KeeperException.NoNodeException.class, e)) {
+            if (Utils.exceptionCauseIsInstanceOf(KeeperException.NoNodeException.class, e)) {
                 // this is fine b/c we still have a watch from the successful exists call
             } else {
                 Utils.wrapInRuntime(e);
@@ -423,19 +416,4 @@ public class Zookeeper {
         String rtn = toksToPath(tokenizePath(path));
         return rtn;
     }
-
-    // To remove exceptionCause if port Utils.try-cause to java
-    public static boolean exceptionCause(Class klass, Throwable t) {
-        boolean ret = false;
-        Throwable throwable = t;
-        while (throwable != null) {
-            if (throwable.getClass() == klass) {
-                ret = true;
-                break;
-            }
-            throwable = throwable.getCause();
-        }
-        return ret;
-    }
-
 }