You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by kn...@apache.org on 2016/02/11 20:10:48 UTC

[05/15] storm git commit: Squashing util conversion changes.

Squashing util conversion changes.


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/3befae32
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/3befae32
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/3befae32

Branch: refs/heads/master
Commit: 3befae326c24b10cea1b1e6992ef808d481134c2
Parents: 00c18c9
Author: Kyle Nusbaum <Ky...@gmail.com>
Authored: Mon Feb 8 11:46:51 2016 -0600
Committer: Kyle Nusbaum <Ky...@gmail.com>
Committed: Mon Feb 8 11:46:51 2016 -0600

----------------------------------------------------------------------
 pom.xml                                         |    6 +
 storm-core/pom.xml                              |    4 +
 .../src/clj/org/apache/storm/LocalCluster.clj   |    2 +-
 storm-core/src/clj/org/apache/storm/clojure.clj |    8 +-
 storm-core/src/clj/org/apache/storm/cluster.clj |   25 +-
 .../cluster_state/zookeeper_state_factory.clj   |   11 +-
 .../clj/org/apache/storm/command/blobstore.clj  |    9 +-
 .../org/apache/storm/command/dev_zookeeper.clj  |    6 +-
 .../clj/org/apache/storm/command/get_errors.clj |   12 +-
 .../apache/storm/command/shell_submission.clj   |    3 +-
 storm-core/src/clj/org/apache/storm/config.clj  |   18 +-
 .../src/clj/org/apache/storm/converter.clj      |   17 +-
 .../src/clj/org/apache/storm/daemon/acker.clj   |   13 +-
 .../src/clj/org/apache/storm/daemon/common.clj  |   29 +-
 .../src/clj/org/apache/storm/daemon/drpc.clj    |   20 +-
 .../clj/org/apache/storm/daemon/executor.clj    |   80 +-
 .../clj/org/apache/storm/daemon/logviewer.clj   |   68 +-
 .../src/clj/org/apache/storm/daemon/nimbus.clj  |  155 ++-
 .../clj/org/apache/storm/daemon/supervisor.clj  |  200 ++--
 .../src/clj/org/apache/storm/daemon/task.clj    |    2 +-
 .../src/clj/org/apache/storm/daemon/worker.clj  |   64 +-
 .../src/clj/org/apache/storm/disruptor.clj      |   10 +-
 storm-core/src/clj/org/apache/storm/event.clj   |    2 +-
 .../src/clj/org/apache/storm/local_state.clj    |    9 +-
 .../org/apache/storm/pacemaker/pacemaker.clj    |    7 +-
 .../storm/pacemaker/pacemaker_state_factory.clj |   24 +-
 .../clj/org/apache/storm/process_simulator.clj  |    2 -
 .../apache/storm/scheduler/DefaultScheduler.clj |    7 +-
 .../apache/storm/scheduler/EvenScheduler.clj    |   23 +-
 .../storm/scheduler/IsolationScheduler.clj      |   29 +-
 storm-core/src/clj/org/apache/storm/stats.clj   |   82 +-
 storm-core/src/clj/org/apache/storm/testing.clj |   81 +-
 storm-core/src/clj/org/apache/storm/thrift.clj  |    6 +-
 storm-core/src/clj/org/apache/storm/timer.clj   |   12 +-
 .../clj/org/apache/storm/trident/testing.clj    |    9 +-
 storm-core/src/clj/org/apache/storm/ui/core.clj |   97 +-
 .../src/clj/org/apache/storm/ui/helpers.clj     |   14 +-
 storm-core/src/clj/org/apache/storm/util.clj    |  921 +--------------
 .../src/clj/org/apache/storm/zookeeper.clj      |    1 -
 .../serialization/SerializationFactory.java     |    3 +-
 .../staticmocking/MockedConfigUtils.java        |   31 -
 .../jvm/org/apache/storm/utils/ConfigUtils.java |   20 +-
 .../jvm/org/apache/storm/utils/IPredicate.java  |   22 +
 .../org/apache/storm/utils/StaticMockable.java  |   21 +
 .../jvm/org/apache/storm/utils/TestUtils.java   |   34 -
 .../src/jvm/org/apache/storm/utils/Time.java    |   25 +-
 .../src/jvm/org/apache/storm/utils/Utils.java   | 1112 +++++++++++++++++-
 .../org/apache/storm/integration_test.clj       |   98 +-
 .../org/apache/storm/testing4j_test.clj         |   35 +-
 .../apache/storm/trident/integration_test.clj   |   12 +
 .../test/clj/org/apache/storm/cluster_test.clj  |   20 +-
 .../test/clj/org/apache/storm/drpc_test.clj     |   23 +-
 .../clj/org/apache/storm/logviewer_test.clj     |  267 +++--
 .../test/clj/org/apache/storm/nimbus_test.clj   |  131 ++-
 .../scheduler/resource_aware_scheduler_test.clj |   21 +-
 .../apache/storm/security/auth/auth_test.clj    |   11 +-
 .../BlowfishTupleSerializer_test.clj            |    1 -
 .../clj/org/apache/storm/serialization_test.clj |   23 +-
 .../clj/org/apache/storm/supervisor_test.clj    |  645 +++++-----
 .../clj/org/apache/storm/transactional_test.clj |   18 +
 .../clj/org/apache/storm/trident/state_test.clj |    3 +-
 .../clj/org/apache/storm/trident/tuple_test.clj |   12 +
 .../test/clj/org/apache/storm/utils_test.clj    |   14 +-
 .../staticmocking/ConfigUtilsInstaller.java     |   38 +
 .../utils/staticmocking/UtilsInstaller.java     |   38 +
 .../storm/utils/staticmocking/package-info.java |   95 ++
 66 files changed, 2868 insertions(+), 1993 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/3befae32/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 831059a..37dbb19 100644
--- a/pom.xml
+++ b/pom.xml
@@ -215,6 +215,7 @@
         <jgrapht.version>0.9.0</jgrapht.version>
         <guava.version>16.0.1</guava.version>
         <netty.version>3.9.0.Final</netty.version>
+        <sysout-over-slf4j.version>1.0.2</sysout-over-slf4j.version>
         <log4j-over-slf4j.version>1.6.6</log4j-over-slf4j.version>
         <log4j.version>2.1</log4j.version>
         <slf4j.version>1.7.7</slf4j.version>
@@ -829,6 +830,11 @@
               <artifactId>jackson-databind</artifactId>
               <version>${jackson.version}</version>
             </dependency>
+            <dependency>
+                <groupId>uk.org.lidalia</groupId>
+                <artifactId>sysout-over-slf4j</artifactId>
+                <version>${sysout-over-slf4j.version}</version>
+            </dependency>
         </dependencies>
     </dependencyManagement>
 

http://git-wip-us.apache.org/repos/asf/storm/blob/3befae32/storm-core/pom.xml
----------------------------------------------------------------------
diff --git a/storm-core/pom.xml b/storm-core/pom.xml
index 8de2461..9dcad96 100644
--- a/storm-core/pom.xml
+++ b/storm-core/pom.xml
@@ -42,6 +42,10 @@
             This is here as a work around to place it at the beginning of the classpath
             even though maven does not officially support ordering of the classpath.-->
         <dependency>
+            <groupId>uk.org.lidalia</groupId>
+            <artifactId>sysout-over-slf4j</artifactId>
+        </dependency>
+        <dependency>
             <groupId>log4j</groupId>
             <artifactId>log4j</artifactId>
             <version>1.2.17</version>

http://git-wip-us.apache.org/repos/asf/storm/blob/3befae32/storm-core/src/clj/org/apache/storm/LocalCluster.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/LocalCluster.clj b/storm-core/src/clj/org/apache/storm/LocalCluster.clj
index df3c180..8397707 100644
--- a/storm-core/src/clj/org/apache/storm/LocalCluster.clj
+++ b/storm-core/src/clj/org/apache/storm/LocalCluster.clj
@@ -48,7 +48,7 @@
   [this name conf topology]
   (submit-local-topology
     (:nimbus (. this state)) name conf topology)
-  (let [hook (get-configured-class conf STORM-TOPOLOGY-SUBMISSION-NOTIFIER-PLUGIN)]
+  (let [hook (Utils/getConfiguredClass conf STORM-TOPOLOGY-SUBMISSION-NOTIFIER-PLUGIN)]
     (when hook (submit-hook hook name conf topology))))
 
 

http://git-wip-us.apache.org/repos/asf/storm/blob/3befae32/storm-core/src/clj/org/apache/storm/clojure.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/clojure.clj b/storm-core/src/clj/org/apache/storm/clojure.clj
index ff33829..9e1836f 100644
--- a/storm-core/src/clj/org/apache/storm/clojure.clj
+++ b/storm-core/src/clj/org/apache/storm/clojure.clj
@@ -23,7 +23,7 @@
   (:import [org.apache.storm.spout SpoutOutputCollector ISpout])
   (:import [org.apache.storm.utils Utils])
   (:import [org.apache.storm.clojure ClojureBolt ClojureSpout])
-  (:import [java.util List])
+  (:import [java.util Collection List])
   (:require [org.apache.storm [thrift :as thrift]]))
 
 (defn direct-stream [fields]
@@ -153,6 +153,12 @@
   (tuple-values [this collector stream]
     this))
 
+(defn- collectify
+  [obj]
+  (if (or (sequential? obj) (instance? Collection obj))
+    obj
+    [obj]))
+
 (defnk emit-bolt! [collector values
                    :stream Utils/DEFAULT_STREAM_ID :anchor []]
   (let [^List anchor (collectify anchor)

http://git-wip-us.apache.org/repos/asf/storm/blob/3befae32/storm-core/src/clj/org/apache/storm/cluster.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/cluster.clj b/storm-core/src/clj/org/apache/storm/cluster.clj
index 152423a..2ecae72 100644
--- a/storm-core/src/clj/org/apache/storm/cluster.clj
+++ b/storm-core/src/clj/org/apache/storm/cluster.clj
@@ -18,10 +18,11 @@
   (:import [org.apache.zookeeper.data Stat ACL Id]
            [org.apache.storm.generated SupervisorInfo Assignment StormBase ClusterWorkerHeartbeat ErrorInfo Credentials NimbusSummary
             LogConfig ProfileAction ProfileRequest NodeInfo]
-           [java.io Serializable])
+           [java.io Serializable StringWriter PrintWriter]
+           [java.net URLEncoder])
   (:import [org.apache.zookeeper KeeperException KeeperException$NoNodeException ZooDefs ZooDefs$Ids ZooDefs$Perms])
   (:import [org.apache.curator.framework CuratorFramework])
-  (:import [org.apache.storm.utils Utils])
+  (:import [org.apache.storm.utils Utils Time])
   (:import [org.apache.storm.cluster ClusterState ClusterStateContext ClusterStateListener ConnectionState])
   (:import [java.security MessageDigest])
   (:import [org.apache.zookeeper.server.auth DigestAuthenticationProvider])
@@ -176,7 +177,7 @@
 
 (defn error-path
   [storm-id component-id]
-  (str (error-storm-root storm-id) "/" (url-encode component-id)))
+  (str (error-storm-root storm-id) "/" (URLEncoder/encode component-id)))
 
 (def last-error-path-seg "last-error")
 
@@ -184,7 +185,7 @@
   [storm-id component-id]
   (str (error-storm-root storm-id)
        "/"
-       (url-encode component-id)
+       (URLEncoder/encode component-id)
        "-"
        last-error-path-seg))
 
@@ -240,6 +241,12 @@
                       :stats (get executor-stats t)}})))
          (into {}))))
 
+(defn- stringify-error [error]
+  (let [result (StringWriter.)
+        printer (PrintWriter. result)]
+    (.printStackTrace error printer)
+    (.toString result)))
+
 ;; Watches should be used for optimization. When ZK is reconnecting, they're not guaranteed to be called.
 (defnk mk-storm-cluster-state
   [cluster-state-spec :acls nil :context (ClusterStateContext.)]
@@ -259,7 +266,7 @@
         state-id (.register
                   cluster-state
                   (fn [type path]
-                    (let [[subtree & args] (tokenize-path path)]
+                    (let [[subtree & args] (Utils/tokenizePath path)]
                       (condp = subtree
                          ASSIGNMENTS-ROOT (if (empty? args)
                                              (issue-callback! assignments-callback)
@@ -274,7 +281,9 @@
                          LOGCONFIG-ROOT (issue-map-callback! log-config-callback (first args))
                          BACKPRESSURE-ROOT (issue-map-callback! backpressure-callback (first args))
                          ;; this should never happen
-                         (exit-process! 30 "Unknown callback for subtree " subtree args)))))]
+                         ;(exit-process! 30 "Unknown callback for subtree " subtree args)
+                         (Utils/exitProcess 30 ["Unknown callback for subtree " subtree args])
+                          ))))]
     (doseq [p [ASSIGNMENTS-SUBTREE STORMS-SUBTREE SUPERVISORS-SUBTREE WORKERBEATS-SUBTREE ERRORS-SUBTREE BLOBSTORE-SUBTREE NIMBUSES-SUBTREE
                LOGCONFIG-SUBTREE]]
       (.mkdirs cluster-state p acls))
@@ -381,7 +390,7 @@
         ;; long dead worker with a skewed clock overrides all the timestamps. By only checking heartbeats
         ;; with an assigned node+port, and only reading executors from that heartbeat that are actually assigned,
         ;; we avoid situations like that
-        (let [node+port->executors (reverse-map executor->node+port)
+        (let [node+port->executors (clojurify-structure (Utils/reverseMap executor->node+port))
               all-heartbeats (for [[[node port] executors] node+port->executors]
                                (->> (get-worker-heartbeat this storm-id node port)
                                     (convert-executor-beats executors)
@@ -580,7 +589,7 @@
          [this storm-id component-id node port error]
          (let [path (error-path storm-id component-id)
                last-error-path (last-error-path storm-id component-id)
-               data (thriftify-error {:time-secs (current-time-secs) :error (stringify-error error) :host node :port port})
+               data (thriftify-error {:time-secs (Time/currentTimeSecs) :error (stringify-error error) :host node :port port})
                _ (.mkdirs cluster-state path acls)
                ser-data (Utils/serialize data)
                _ (.mkdirs cluster-state path acls)

http://git-wip-us.apache.org/repos/asf/storm/blob/3befae32/storm-core/src/clj/org/apache/storm/cluster_state/zookeeper_state_factory.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/cluster_state/zookeeper_state_factory.clj b/storm-core/src/clj/org/apache/storm/cluster_state/zookeeper_state_factory.clj
index dcfa8d8..9594aab 100644
--- a/storm-core/src/clj/org/apache/storm/cluster_state/zookeeper_state_factory.clj
+++ b/storm-core/src/clj/org/apache/storm/cluster_state/zookeeper_state_factory.clj
@@ -16,9 +16,10 @@
 
 (ns org.apache.storm.cluster-state.zookeeper-state-factory
   (:import [org.apache.curator.framework.state ConnectionStateListener]
-           [org.apache.storm.zookeeper Zookeeper])
+           [org.apache.storm.zookeeper Zookeeper]
+           [org.apache.storm.utils Utils])
   (:import [org.apache.zookeeper KeeperException$NoNodeException CreateMode
-             Watcher$Event$EventType Watcher$Event$KeeperState]
+            Watcher$Event$EventType Watcher$Event$KeeperState]
            [org.apache.storm.cluster ClusterState DaemonType])
   (:use [org.apache.storm cluster config log util])
   (:require [org.apache.storm [zookeeper :as zk]])
@@ -63,7 +64,7 @@
 
      (register
        [this callback]
-       (let [id (uuid)]
+       (let [id (Utils/uuid)]
          (swap! callbacks assoc id callback)
          id))
 
@@ -73,7 +74,7 @@
 
      (set-ephemeral-node
        [this path data acls]
-       (Zookeeper/mkdirs zk-writer (parent-path path) acls)
+       (Zookeeper/mkdirs zk-writer (Utils/parentPath path) acls)
        (if (Zookeeper/exists zk-writer path false)
          (try-cause
            (Zookeeper/setData zk-writer path data) ; should verify that it's ephemeral
@@ -92,7 +93,7 @@
        (if (Zookeeper/exists zk-writer path false)
          (Zookeeper/setData zk-writer path data)
          (do
-           (Zookeeper/mkdirs zk-writer (parent-path path) acls)
+           (Zookeeper/mkdirs zk-writer (Utils/parentPath path) acls)
            (Zookeeper/createNode zk-writer path data CreateMode/PERSISTENT acls))))
 
      (set-worker-hb

http://git-wip-us.apache.org/repos/asf/storm/blob/3befae32/storm-core/src/clj/org/apache/storm/command/blobstore.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/command/blobstore.clj b/storm-core/src/clj/org/apache/storm/command/blobstore.clj
index b1496db..76d8afb 100644
--- a/storm-core/src/clj/org/apache/storm/command/blobstore.clj
+++ b/storm-core/src/clj/org/apache/storm/command/blobstore.clj
@@ -17,7 +17,8 @@
   (:import [java.io InputStream OutputStream]
            [org.apache.storm.generated SettableBlobMeta AccessControl AuthorizationException
             KeyNotFoundException]
-           [org.apache.storm.blobstore BlobStoreAclHandler])
+           [org.apache.storm.blobstore BlobStoreAclHandler]
+           [org.apache.storm.utils Utils])
   (:use [org.apache.storm config]
         [clojure.string :only [split]]
         [clojure.tools.cli :only [cli]]
@@ -88,10 +89,10 @@
 (defn create-cli [args]
   (let [[{file :file acl :acl replication-factor :replication-factor} [key] _] (cli args ["-f" "--file" :default nil]
                                                   ["-a" "--acl" :default [] :parse-fn as-acl]
-                                                  ["-r" "--replication-factor" :default -1 :parse-fn parse-int])
+                                                  ["-r" "--replication-factor" :default -1 :parse-fn #(Integer/parseInt %)])
         meta (doto (SettableBlobMeta. acl)
                    (.set_replication_factor replication-factor))]
-    (validate-key-name! key)
+    (Utils/validateKeyName key)
     (log-message "Creating " key " with ACL " (pr-str (map access-control-str acl)))
     (if file
       (with-open [f (input-stream file)]
@@ -140,7 +141,7 @@
                  (log-message "Current replication factor " blob-replication)
                  blob-replication)
       "--update" (let [[{replication-factor :replication-factor} [key] _]
-                        (cli new-args ["-r" "--replication-factor" :parse-fn parse-int])]
+                        (cli new-args ["-r" "--replication-factor" :parse-fn #(Integer/parseInt %)])]
                    (if (nil? replication-factor)
                      (throw (RuntimeException. (str "Please set the replication factor")))
                      (let [blob-replication (.updateBlobReplication blobstore key replication-factor)]

http://git-wip-us.apache.org/repos/asf/storm/blob/3befae32/storm-core/src/clj/org/apache/storm/command/dev_zookeeper.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/command/dev_zookeeper.clj b/storm-core/src/clj/org/apache/storm/command/dev_zookeeper.clj
index ef9ecbb..657e242 100644
--- a/storm-core/src/clj/org/apache/storm/command/dev_zookeeper.clj
+++ b/storm-core/src/clj/org/apache/storm/command/dev_zookeeper.clj
@@ -14,6 +14,7 @@
 ;; See the License for the specific language governing permissions and
 ;; limitations under the License.
 (ns org.apache.storm.command.dev-zookeeper
+  (:import [org.apache.storm.utils Utils])
   (:use [org.apache.storm zookeeper util config])
   (:import [org.apache.storm.utils ConfigUtils])
   (:import [org.apache.storm.zookeeper Zookeeper])
@@ -23,6 +24,5 @@
   (let [conf (clojurify-structure (ConfigUtils/readStormConfig))
         port (conf STORM-ZOOKEEPER-PORT)
         localpath (conf DEV-ZOOKEEPER-PATH)]
-    (rmr localpath)
-    (Zookeeper/mkInprocessZookeeper localpath port)
-    ))
+    (Utils/forceDelete localpath)
+    (Zookeeper/mkInprocessZookeeper localpath port)))

http://git-wip-us.apache.org/repos/asf/storm/blob/3befae32/storm-core/src/clj/org/apache/storm/command/get_errors.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/command/get_errors.clj b/storm-core/src/clj/org/apache/storm/command/get_errors.clj
index c267390..615a5f3 100644
--- a/storm-core/src/clj/org/apache/storm/command/get_errors.clj
+++ b/storm-core/src/clj/org/apache/storm/command/get_errors.clj
@@ -21,7 +21,8 @@
              [nimbus :as nimbus]
              [common :as common]])
   (:import [org.apache.storm.generated GetInfoOptions NumErrorsChoice
-            TopologySummary ErrorInfo])
+            TopologySummary ErrorInfo]
+           [org.json.simple JSONValue])
   (:gen-class))
 
 (defn get-topology-id [name topologies]
@@ -44,9 +45,10 @@
           topo-id (get-topology-id name topologies)
           topo-info (when (not-nil? topo-id) (.getTopologyInfoWithOpts nimbus topo-id opts))]
       (if (or (nil? topo-id) (nil? topo-info))
-        (println (to-json {"Failure" (str "No topologies running with name " name)}))
+        (println (JSONValue/toJSONString {"Failure" (str "No topologies running with name " name)}))
         (let [topology-name (.get_name topo-info)
               topology-errors (.get_errors topo-info)]
-          (println (to-json (hash-map
-                              "Topology Name" topology-name
-                              "Comp-Errors" (get-component-errors topology-errors)))))))))
+          (println (JSONValue/toJSONString
+                     (hash-map
+                       "Topology Name" topology-name
+                       "Comp-Errors" (get-component-errors topology-errors)))))))))

http://git-wip-us.apache.org/repos/asf/storm/blob/3befae32/storm-core/src/clj/org/apache/storm/command/shell_submission.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/command/shell_submission.clj b/storm-core/src/clj/org/apache/storm/command/shell_submission.clj
index 8a5eb21..0d5783b 100644
--- a/storm-core/src/clj/org/apache/storm/command/shell_submission.clj
+++ b/storm-core/src/clj/org/apache/storm/command/shell_submission.clj
@@ -15,6 +15,7 @@
 ;; limitations under the License.
 (ns org.apache.storm.command.shell-submission
   (:import [org.apache.storm StormSubmitter]
+           [org.apache.storm.utils Utils]
            [org.apache.storm.zookeeper Zookeeper])
   (:use [org.apache.storm thrift util config log zookeeper])
   (:require [clojure.string :as str])
@@ -31,5 +32,5 @@
         no-op (.close zk-leader-elector)
         jarpath (StormSubmitter/submitJar conf tmpjarpath)
         args (concat args [host port jarpath])]
-    (exec-command! (str/join " " args))
+    (Utils/execCommand (str/join " " args))
     ))

http://git-wip-us.apache.org/repos/asf/storm/blob/3befae32/storm-core/src/clj/org/apache/storm/config.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/config.clj b/storm-core/src/clj/org/apache/storm/config.clj
index 3666e13..e50f023 100644
--- a/storm-core/src/clj/org/apache/storm/config.clj
+++ b/storm-core/src/clj/org/apache/storm/config.clj
@@ -18,7 +18,7 @@
   (:import [java.io FileReader File IOException]
            [org.apache.storm.generated StormTopology])
   (:import [org.apache.storm Config])
-  (:import [org.apache.storm.utils Utils LocalState ConfigUtils])
+  (:import [org.apache.storm.utils Utils LocalState ConfigUtils MutableInt])
   (:import [org.apache.storm.validation ConfigValidation])
   (:import [org.apache.commons.io FileUtils])
   (:require [clojure [string :as str]])
@@ -49,6 +49,22 @@
     (/ 1)
     int))
 
+(defn- even-sampler
+  [freq]
+  (let [freq (int freq)
+        start (int 0)
+        r (java.util.Random.)
+        curr (MutableInt. -1)
+        target (MutableInt. (.nextInt r freq))]
+    (with-meta
+      (fn []
+        (let [i (.increment curr)]
+          (when (>= i freq)
+            (.set curr start)
+            (.set target (.nextInt r freq))))
+        (= (.get curr) (.get target)))
+      {:rate freq})))
+
 ;; TODO this function together with sampling-rate are to be replaced with Java version when util.clj is in
 (defn mk-stats-sampler
   [conf]

http://git-wip-us.apache.org/repos/asf/storm/blob/3befae32/storm-core/src/clj/org/apache/storm/converter.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/converter.clj b/storm-core/src/clj/org/apache/storm/converter.clj
index bb2dc87..23e7452 100644
--- a/storm-core/src/clj/org/apache/storm/converter.clj
+++ b/storm-core/src/clj/org/apache/storm/converter.clj
@@ -16,7 +16,8 @@
 (ns org.apache.storm.converter
   (:import [org.apache.storm.generated SupervisorInfo NodeInfo Assignment WorkerResources
             StormBase TopologyStatus ClusterWorkerHeartbeat ExecutorInfo ErrorInfo Credentials RebalanceOptions KillOptions
-            TopologyActionOptions DebugOptions ProfileRequest])
+            TopologyActionOptions DebugOptions ProfileRequest]
+           [org.apache.storm.utils Utils])
   (:use [org.apache.storm util stats log])
   (:require [org.apache.storm.daemon [common :as common]]))
 
@@ -71,6 +72,8 @@
                                                           (:worker->resources assignment)))))
     thrift-assignment))
 
+;TODO: when translating this function, you should replace the map-val with a proper for loop HERE
+;TODO: when translating this function, you should replace the map-key with a proper for loop HERE
 (defn clojurify-executor->node_port [executor->node_port]
   (into {}
     (map-val
@@ -90,6 +93,7 @@
                 [(.get_mem_on_heap resources) (.get_mem_off_heap resources) (.get_cpu resources)]])
              worker->resources)))
 
+;TODO: when translating this function, you should replace the map-val with a proper for loop HERE
 (defn clojurify-assignment [^Assignment assignment]
   (if assignment
     (org.apache.storm.daemon.common.Assignment.
@@ -117,12 +121,17 @@
       :killed TopologyStatus/KILLED
       nil)))
 
+(defn assoc-non-nil
+  [m k v]
+  (if v (assoc m k v) m))
+
 (defn clojurify-rebalance-options [^RebalanceOptions rebalance-options]
   (-> {:action :rebalance}
     (assoc-non-nil :delay-secs (if (.is_set_wait_secs rebalance-options) (.get_wait_secs rebalance-options)))
     (assoc-non-nil :num-workers (if (.is_set_num_workers rebalance-options) (.get_num_workers rebalance-options)))
     (assoc-non-nil :component->executors (if (.is_set_num_executors rebalance-options) (into {} (.get_num_executors rebalance-options))))))
 
+;TODO: when translating this function, you should replace the map-val with a proper for loop HERE
 (defn thriftify-rebalance-options [rebalance-options]
   (if rebalance-options
     (let [thrift-rebalance-options (RebalanceOptions.)]
@@ -178,6 +187,7 @@
     (.set_enable (get options :enable false))
     (.set_samplingpct (get options :samplingpct 10))))
 
+;TODO: when translating this function, you should replace the map-val with a proper for loop HERE
 (defn thriftify-storm-base [storm-base]
   (doto (StormBase.)
     (.set_name (:storm-name storm-base))
@@ -190,6 +200,7 @@
     (.set_prev_status (convert-to-status-from-symbol (:prev-status storm-base)))
     (.set_component_debug (map-val thriftify-debugoptions (:component->debug storm-base)))))
 
+;TODO: when translating this function, you should replace the map-val with a proper for loop HERE
 (defn clojurify-storm-base [^StormBase storm-base]
   (if storm-base
     (org.apache.storm.daemon.common.StormBase.
@@ -203,6 +214,8 @@
       (convert-to-symbol-from-status (.get_prev_status storm-base))
       (map-val clojurify-debugoptions (.get_component_debug storm-base)))))
 
+;TODO: when translating this function, you should replace the map-val with a proper for loop HERE
+;TODO: when translating this function, you should replace the map-val with a proper for loop HERE
 (defn thriftify-stats [stats]
   (if stats
     (map-val thriftify-executor-stats
@@ -210,6 +223,8 @@
         stats))
     {}))
 
+;TODO: when translating this function, you should replace the map-val with a proper for loop HERE
+;TODO: when translating this function, you should replace the map-val with a proper for loop HERE
 (defn clojurify-stats [stats]
   (if stats
     (map-val clojurify-executor-stats

http://git-wip-us.apache.org/repos/asf/storm/blob/3befae32/storm-core/src/clj/org/apache/storm/daemon/acker.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/daemon/acker.clj b/storm-core/src/clj/org/apache/storm/daemon/acker.clj
index 7c4d614..58d8e7a 100644
--- a/storm-core/src/clj/org/apache/storm/daemon/acker.clj
+++ b/storm-core/src/clj/org/apache/storm/daemon/acker.clj
@@ -14,9 +14,10 @@
 ;; See the License for the specific language governing permissions and
 ;; limitations under the License.
 (ns org.apache.storm.daemon.acker
-  (:import [org.apache.storm.task OutputCollector TopologyContext IBolt])
+  (:import [org.apache.storm.task OutputCollector TopologyContext IBolt]
+           [org.apache.storm.utils Utils])
   (:import [org.apache.storm.tuple Tuple Fields])
-  (:import [org.apache.storm.utils RotatingMap MutableObject])
+  (:import [org.apache.storm.utils Container RotatingMap MutableObject])
   (:import [java.util List Map])
   (:import [org.apache.storm Constants])
   (:use [org.apache.storm config util log])
@@ -88,20 +89,20 @@
       )))
 
 (defn -init []
-  [[] (container)])
+  [[] (Container.)])
 
 (defn -prepare [this conf context collector]
   (let [^IBolt ret (mk-acker-bolt)]
-    (container-set! (.state ^org.apache.storm.daemon.acker this) ret)
+    (Utils/containerSet (.state ^org.apache.storm.daemon.acker this) ret)
     (.prepare ret conf context collector)
     ))
 
 (defn -execute [this tuple]
-  (let [^IBolt delegate (container-get (.state ^org.apache.storm.daemon.acker this))]
+  (let [^IBolt delegate (Utils/containerGet (.state ^org.apache.storm.daemon.acker this))]
     (.execute delegate tuple)
     ))
 
 (defn -cleanup [this]
-  (let [^IBolt delegate (container-get (.state ^org.apache.storm.daemon.acker this))]
+  (let [^IBolt delegate (Utils/containerGet (.state ^org.apache.storm.daemon.acker this))]
     (.cleanup delegate)
     ))

http://git-wip-us.apache.org/repos/asf/storm/blob/3befae32/storm-core/src/clj/org/apache/storm/daemon/common.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/daemon/common.clj b/storm-core/src/clj/org/apache/storm/daemon/common.clj
index d0f8dd9..3dc2ee5 100644
--- a/storm-core/src/clj/org/apache/storm/daemon/common.clj
+++ b/storm-core/src/clj/org/apache/storm/daemon/common.clj
@@ -17,17 +17,17 @@
   (:use [org.apache.storm log config util])
   (:import [org.apache.storm.generated StormTopology
             InvalidTopologyException GlobalStreamId]
-           [org.apache.storm.utils ThriftTopologyUtils]
+           [org.apache.storm.utils Utils ConfigUtils IPredicate ThriftTopologyUtils]
            [org.apache.storm.daemon.metrics.reporters PreparableReporter]
            [com.codahale.metrics MetricRegistry])
-  (:import [org.apache.storm.utils Utils ConfigUtils])
   (:import [org.apache.storm.daemon.metrics MetricsUtils])
   (:import [org.apache.storm.task WorkerTopologyContext])
   (:import [org.apache.storm Constants])
   (:import [org.apache.storm.metric SystemBolt])
   (:import [org.apache.storm.metric EventLoggerBolt])
-  (:import [org.apache.storm.security.auth IAuthorizer]) 
-  (:import [java.io InterruptedIOException])
+  (:import [org.apache.storm.security.auth IAuthorizer])
+  (:import [java.io InterruptedIOException]
+           [org.json.simple JSONValue])
   (:require [clojure.set :as set])  
   (:require [org.apache.storm.daemon.acker :as acker])
   (:require [org.apache.storm.thrift :as thrift])
@@ -84,10 +84,9 @@
   (ExecutorStats. 0 0 0 0 0))
 
 (defn get-storm-id [storm-cluster-state storm-name]
-  (let [active-storms (.active-storms storm-cluster-state)]
-    (find-first
-      #(= storm-name (:storm-name (.storm-base storm-cluster-state % nil)))
-      active-storms)
+  (let [active-storms (.active-storms storm-cluster-state)
+        pred  (reify IPredicate (test [this x] (= storm-name (:storm-name (.storm-base storm-cluster-state x nil)))))]
+    (Utils/findFirst pred active-storms)
     ))
 
 (defn topology-bases [storm-cluster-state]
@@ -114,12 +113,12 @@
         (throw e#))
       (catch Throwable t#
         (log-error t# "Error on initialization of server " ~(str name))
-        (exit-process! 13 "Error on initialization")
+        (Utils/exitProcess 13 "Error on initialization")
         )))))
 
 (defn- validate-ids! [^StormTopology topology]
   (let [sets (map #(.getFieldValue topology %) thrift/STORM-TOPOLOGY-FIELDS)
-        offending (apply any-intersection sets)]
+        offending (apply set/intersection sets)]
     (if-not (empty? offending)
       (throw (InvalidTopologyException.
               (str "Duplicate component ids: " offending))))
@@ -145,9 +144,10 @@
 
 (defn component-conf [component]
   (->> component
-      .get_common
-      .get_json_conf
-      from-json))
+       .get_common
+       .get_json_conf
+       (#(if % (JSONValue/parse %)))
+       clojurify-structure))
 
 (defn validate-basic! [^StormTopology topology]
   (validate-ids! topology)
@@ -238,7 +238,7 @@
                                {TOPOLOGY-TICK-TUPLE-FREQ-SECS (storm-conf TOPOLOGY-MESSAGE-TIMEOUT-SECS)})]]
       (do
         ;; this set up tick tuples to cause timeouts to be triggered
-        (.set_json_conf common (to-json spout-conf))
+        (.set_json_conf common (JSONValue/toJSONString spout-conf))
         (.put_to_streams common ACKER-INIT-STREAM-ID (thrift/output-fields ["id" "init-val" "spout-task"]))
         (.put_to_inputs common
                         (GlobalStreamId. ACKER-COMPONENT-ID ACKER-ACK-STREAM-ID)
@@ -363,6 +363,7 @@
 (defn num-start-executors [component]
   (thrift/parallelism-hint (.get_common component)))
 
+;TODO: when translating this function, you should replace the map-val with a proper for loop HERE
 (defn storm-task-info
   "Returns map from task -> component id"
   [^StormTopology user-topology storm-conf]

http://git-wip-us.apache.org/repos/asf/storm/blob/3befae32/storm-core/src/clj/org/apache/storm/daemon/drpc.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/daemon/drpc.clj b/storm-core/src/clj/org/apache/storm/daemon/drpc.clj
index a07b9ef..7e5965b 100644
--- a/storm-core/src/clj/org/apache/storm/daemon/drpc.clj
+++ b/storm-core/src/clj/org/apache/storm/daemon/drpc.clj
@@ -17,12 +17,14 @@
 (ns org.apache.storm.daemon.drpc
   (:import [org.apache.storm.security.auth AuthUtils ThriftServer ThriftConnectionType ReqContext])
   (:import [org.apache.storm.security.auth.authorizer DRPCAuthorizerBase])
+  (:import [org.apache.storm.utils Utils])
   (:import [org.apache.storm.generated DistributedRPC DistributedRPC$Iface DistributedRPC$Processor
             DRPCRequest DRPCExecutionException DistributedRPCInvocations DistributedRPCInvocations$Iface
             DistributedRPCInvocations$Processor])
   (:import [java.util.concurrent Semaphore ConcurrentLinkedQueue
             ThreadPoolExecutor ArrayBlockingQueue TimeUnit])
-  (:import [org.apache.storm.daemon Shutdownable])
+  (:import [org.apache.storm.daemon Shutdownable]
+           [org.apache.storm.utils Time])
   (:import [java.net InetAddress])
   (:import [org.apache.storm.generated AuthorizationException]
            [org.apache.storm.utils VersionInfo ConfigUtils])
@@ -57,7 +59,7 @@
 (defn check-authorization
   ([aclHandler mapping operation context]
     (if (not-nil? context)
-      (log-thrift-access (.requestID context) (.remoteAddress context) (.principal context) operation))
+      (Utils/logThriftAccess (.requestID context) (.remoteAddress context) (.principal context) operation))
     (if aclHandler
       (let [context (or context (ReqContext/context))]
         (if-not (.permit aclHandler context operation mapping)
@@ -85,10 +87,10 @@
                   (swap! id->request dissoc id)
                   (swap! id->start dissoc id))
         my-ip (.getHostAddress (InetAddress/getLocalHost))
-        clear-thread (async-loop
+        clear-thread (Utils/asyncLoop
                        (fn []
                          (doseq [[id start] @id->start]
-                           (when (> (time-delta start) (conf DRPC-REQUEST-TIMEOUT-SECS))
+                           (when (> (Time/delta start) (conf DRPC-REQUEST-TIMEOUT-SECS))
                              (when-let [sem (@id->sem id)]
                                (.remove (acquire-queue request-queues (@id->function id)) (@id->request id))
                                (log-warn "Timeout DRPC request id: " id " start at " start)
@@ -107,7 +109,7 @@
               ^Semaphore sem (Semaphore. 0)
               req (DRPCRequest. args id)
               ^ConcurrentLinkedQueue queue (acquire-queue request-queues function)]
-          (swap! id->start assoc id (current-time-secs))
+          (swap! id->start assoc id (Time/currentTimeSecs))
           (swap! id->sem assoc id sem)
           (swap! id->function assoc id function)
           (swap! id->request assoc id req)
@@ -227,9 +229,9 @@
                           (DistributedRPCInvocations$Processor. drpc-service-handler)
                           ThriftConnectionType/DRPC_INVOCATIONS)
           http-creds-handler (AuthUtils/GetDrpcHttpCredentialsPlugin conf)]
-      (add-shutdown-hook-with-force-kill-in-1-sec (fn []
-                                                    (if handler-server (.stop handler-server))
-                                                    (.stop invoke-server)))
+      (Utils/addShutdownHookWithForceKillIn1Sec (fn []
+                                            (if handler-server (.stop handler-server))
+                                            (.stop invoke-server)))
       (log-message "Starting Distributed RPC servers...")
       (future (.serve invoke-server))
       (when (> drpc-http-port 0)
@@ -270,5 +272,5 @@
         (.serve handler-server)))))
 
 (defn -main []
-  (setup-default-uncaught-exception-handler)
+  (Utils/setupDefaultUncaughtExceptionHandler)
   (launch-server!))

http://git-wip-us.apache.org/repos/asf/storm/blob/3befae32/storm-core/src/clj/org/apache/storm/daemon/executor.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/daemon/executor.clj b/storm-core/src/clj/org/apache/storm/daemon/executor.clj
index ab0c8aa..2415d5b 100644
--- a/storm-core/src/clj/org/apache/storm/daemon/executor.clj
+++ b/storm-core/src/clj/org/apache/storm/daemon/executor.clj
@@ -36,7 +36,9 @@
   (:import [org.apache.storm Config Constants])
   (:import [org.apache.storm.cluster ClusterStateContext DaemonType])
   (:import [org.apache.storm.grouping LoadAwareCustomStreamGrouping LoadAwareShuffleGrouping LoadMapping ShuffleGrouping])
-  (:import [java.util.concurrent ConcurrentLinkedQueue])
+  (:import [java.lang Thread Thread$UncaughtExceptionHandler]
+           [java.util.concurrent ConcurrentLinkedQueue]
+           [org.json.simple JSONValue])
   (:require [org.apache.storm [thrift :as thrift]
              [cluster :as cluster] [disruptor :as disruptor] [stats :as stats]])
   (:require [org.apache.storm.daemon [task :as task]])
@@ -109,6 +111,7 @@
         :direct
       )))
 
+;TODO: when translating this function, you should replace the filter-val with a proper for loop + if condition HERE
 (defn- outbound-groupings
   [^WorkerTopologyContext worker-context this-component-id stream-id out-fields component->grouping topo-conf]
   (->> component->grouping
@@ -151,7 +154,7 @@
         bolts (.get_bolts topology)]
     (cond (contains? spouts component-id) :spout
           (contains? bolts component-id) :bolt
-          :else (throw-runtime "Could not find " component-id " in topology " topology))))
+          :else (Utils/throwRuntime ["Could not find " component-id " in topology " topology]))))
 
 (defn executor-selector [executor-data & _] (:type executor-data))
 
@@ -181,7 +184,8 @@
         spec-conf (-> general-context
                       (.getComponentCommon component-id)
                       .get_json_conf
-                      from-json)]
+                      (#(if % (JSONValue/parse %)))
+                      clojurify-structure)]
     (merge storm-conf (apply dissoc spec-conf to-remove))
     ))
 
@@ -195,20 +199,20 @@
   (let [storm-conf (:storm-conf executor)
         error-interval-secs (storm-conf TOPOLOGY-ERROR-THROTTLE-INTERVAL-SECS)
         max-per-interval (storm-conf TOPOLOGY-MAX-ERROR-REPORT-PER-INTERVAL)
-        interval-start-time (atom (current-time-secs))
+        interval-start-time (atom (Time/currentTimeSecs))
         interval-errors (atom 0)
         ]
     (fn [error]
       (log-error error)
-      (when (> (time-delta @interval-start-time)
+      (when (> (Time/delta @interval-start-time)
                error-interval-secs)
         (reset! interval-errors 0)
-        (reset! interval-start-time (current-time-secs)))
+        (reset! interval-start-time (Time/currentTimeSecs)))
       (swap! interval-errors inc)
 
       (when (<= @interval-errors max-per-interval)
         (cluster/report-error (:storm-cluster-state executor) (:storm-id executor) (:component-id executor)
-                              (hostname storm-conf)
+                              (Utils/hostname storm-conf)
                               (.getThisWorkerPort (:worker-context executor)) error)
         ))))
 
@@ -262,13 +266,16 @@
      :task->component (:task->component worker)
      :stream->component->grouper (outbound-components worker-context component-id storm-conf)
      :report-error (throttled-report-error-fn <>)
-     :report-error-and-die (fn [error]
-                             ((:report-error <>) error)
-                             (if (or
-                                    (exception-cause? InterruptedException error)
-                                    (exception-cause? java.io.InterruptedIOException error))
-                               (log-message "Got interrupted excpetion shutting thread down...")
-                               ((:suicide-fn <>))))
+     :report-error-and-die (reify
+                             Thread$UncaughtExceptionHandler
+                             (uncaughtException [this _ error]
+                               (fn [error]
+                                 ((:report-error <>) error)
+                                 (if (or
+                                       (Utils/exceptionCauseIsInstanceOf InterruptedException error)
+                                       (Utils/exceptionCauseIsInstanceOf java.io.InterruptedIOException error))
+                                   (log-message "Got interrupted excpetion shutting thread down...")
+                                   ((:suicide-fn <>))))))
      :sampler (mk-stats-sampler storm-conf)
      :backpressure (atom false)
      :spout-throttling-metrics (if (= executor-type :spout) 
@@ -329,7 +336,7 @@
          task-id (:task-id task-data)
          name->imetric (-> interval->task->metric-registry (get interval) (get task-id))
          task-info (IMetricsConsumer$TaskInfo.
-                     (hostname (:storm-conf executor-data))
+                     (Utils/hostname (:storm-conf executor-data))
                      (.getThisWorkerPort worker-context)
                      (:component-id executor-data)
                      task-id
@@ -386,8 +393,9 @@
         ;; doesn't block (because it's a single threaded queue and the caching/consumer started
         ;; trick isn't thread-safe)
         system-threads [(start-batch-transfer->worker-handler! worker executor-data)]
-        handlers (with-error-reaction report-error-and-die
-                   (mk-threads executor-data task-datas initial-credentials))
+        handlers (try
+                   (mk-threads executor-data task-datas initial-credentials)
+                   (catch Throwable t (report-error-and-die t)))
         threads (concat handlers system-threads)]    
     (setup-ticks! worker executor-data)
 
@@ -472,7 +480,7 @@
     (if p (* p num-tasks))))
 
 (defn init-spout-wait-strategy [storm-conf]
-  (let [ret (-> storm-conf (get TOPOLOGY-SPOUT-WAIT-STRATEGY) new-instance)]
+  (let [ret (-> storm-conf (get TOPOLOGY-SPOUT-WAIT-STRATEGY) Utils/newInstance)]
     (.prepare ret storm-conf)
     ret
     ))
@@ -491,6 +499,10 @@
           EVENTLOGGER-STREAM-ID
           [component-id message-id (System/currentTimeMillis) values]))))
 
+(defn- bit-xor-vals
+  [vals]
+  (reduce bit-xor 0 vals))
+
 (defmethod mk-threads :spout [executor-data task-datas initial-credentials]
   (let [{:keys [storm-conf component-id worker-context transfer-fn report-error sampler open-or-prepare-was-called?]} executor-data
         ^ISpoutWaitStrategy spout-wait-strategy (init-spout-wait-strategy storm-conf)
@@ -506,7 +518,7 @@
                  2 ;; microoptimize for performance of .size method
                  (reify RotatingMap$ExpiredCallback
                    (expire [this id [task-id spout-id tuple-info start-time-ms]]
-                     (let [time-delta (if start-time-ms (time-delta-ms start-time-ms))]
+                     (let [time-delta (if start-time-ms (Time/deltaMs start-time-ms))]
                        (fail-spout-msg executor-data (get task-datas task-id) spout-id tuple-info time-delta "TIMEOUT" id)
                        ))))
         tuple-action-fn (fn [task-id ^TupleImpl tuple]
@@ -523,8 +535,8 @@
                                     [stored-task-id spout-id tuple-finished-info start-time-ms] (.remove pending id)]
                                 (when spout-id
                                   (when-not (= stored-task-id task-id)
-                                    (throw-runtime "Fatal error, mismatched task ids: " task-id " " stored-task-id))
-                                  (let [time-delta (if start-time-ms (time-delta-ms start-time-ms))]
+                                    (Utils/throwRuntime ["Fatal error, mismatched task ids: " task-id " " stored-task-id]))
+                                  (let [time-delta (if start-time-ms (Time/deltaMs start-time-ms))]
                                     (condp = stream-id
                                       ACKER-ACK-STREAM-ID (ack-spout-msg executor-data (get task-datas task-id)
                                                                          spout-id tuple-finished-info time-delta id)
@@ -540,7 +552,7 @@
         emitted-count (MutableLong. 0)
         empty-emit-streak (MutableLong. 0)]
    
-    [(async-loop
+    [(Utils/asyncLoop
       (fn []
         ;; If topology was started in inactive state, don't call (.open spout) until it's activated first.
         (while (not @(:storm-active-atom executor-data))
@@ -661,19 +673,22 @@
               (.set empty-emit-streak 0)
               ))
           0))
-      :kill-fn (:report-error-and-die executor-data)
-      :factory? true
-      :thread-name (str component-id "-executor" (:executor-id executor-data)))]))
+      false ; isDaemon
+      (:report-error-and-die executor-data)
+      Thread/NORM_PRIORITY
+      true ; isFactory
+      true ; startImmediately
+      (str component-id "-executor" (:executor-id executor-data)))]))
 
 (defn- tuple-time-delta! [^TupleImpl tuple]
   (let [ms (.getProcessSampleStartTime tuple)]
     (if ms
-      (time-delta-ms ms))))
+      (Time/deltaMs ms))))
       
 (defn- tuple-execute-time-delta! [^TupleImpl tuple]
   (let [ms (.getExecuteSampleStartTime tuple)]
     (if ms
-      (time-delta-ms ms))))
+      (Time/deltaMs ms))))
 
 (defn put-xor! [^Map pending key id]
   (let [curr (or (.get pending key) (long 0))]
@@ -738,7 +753,7 @@
     
     ;; TODO: can get any SubscribedState objects out of the context now
 
-    [(async-loop
+    [(Utils/asyncLoop
       (fn []
         ;; If topology was started in inactive state, don't call prepare bolt until it's activated first.
         (while (not @(:storm-active-atom executor-data))          
@@ -840,9 +855,12 @@
           (fn []            
             (disruptor/consume-batch-when-available receive-queue event-handler)
             0)))
-      :kill-fn (:report-error-and-die executor-data)
-      :factory? true
-      :thread-name (str component-id "-executor" (:executor-id executor-data)))]))
+      false ; isDaemon
+      (:report-error-and-die executor-data)
+      Thread/NORM_PRIORITY
+      true ; isFactory
+      true ; startImmediately
+      (str component-id "-executor" (:executor-id executor-data)))]))
 
 (defmethod close-component :spout [executor-data spout]
   (.close spout))

http://git-wip-us.apache.org/repos/asf/storm/blob/3befae32/storm-core/src/clj/org/apache/storm/daemon/logviewer.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/daemon/logviewer.clj b/storm-core/src/clj/org/apache/storm/daemon/logviewer.clj
index 0edfe08..6ca1759 100644
--- a/storm-core/src/clj/org/apache/storm/daemon/logviewer.clj
+++ b/storm-core/src/clj/org/apache/storm/daemon/logviewer.clj
@@ -20,7 +20,7 @@
   (:use [hiccup core page-helpers form-helpers])
   (:use [org.apache.storm config util log timer])
   (:use [org.apache.storm.ui helpers])
-  (:import [org.apache.storm.utils Utils VersionInfo ConfigUtils])
+  (:import [org.apache.storm.utils Utils Time VersionInfo ConfigUtils])
   (:import [org.slf4j LoggerFactory])
   (:import [java.util Arrays ArrayList HashSet])
   (:import [java.util.zip GZIPInputStream])
@@ -28,10 +28,10 @@
   (:import [org.apache.logging.log4j.core Appender LoggerContext])
   (:import [org.apache.logging.log4j.core.appender RollingFileAppender])
   (:import [java.io BufferedInputStream File FileFilter FileInputStream
-            InputStream InputStreamReader])
+            InputStream InputStreamReader]
+           [java.net URLDecoder])
   (:import [java.nio.file Files Path Paths DirectoryStream])
   (:import [java.nio ByteBuffer])
-  (:import [org.apache.storm.utils Utils])
   (:import [org.apache.storm.daemon DirectoryCleaner])
   (:import [org.yaml.snakeyaml Yaml]
            [org.yaml.snakeyaml.constructor SafeConstructor])
@@ -51,6 +51,8 @@
 (def ^:dynamic *STORM-CONF* (clojurify-structure (ConfigUtils/readStormConfig)))
 (def STORM-VERSION (VersionInfo/getVersion))
 
+(def worker-log-filename-pattern  #"^worker.log(.*)")
+
 (defmeter logviewer:num-log-page-http-requests)
 (defmeter logviewer:num-daemonlog-page-http-requests)
 (defmeter logviewer:num-download-log-file-http-requests)
@@ -117,9 +119,9 @@
 (defn get-topo-port-workerlog
   "Return the path of the worker log with the format of topoId/port/worker.log.*"
   [^File file]
-  (clojure.string/join file-path-separator
+  (clojure.string/join Utils/FILE_PATH_SEPARATOR
                        (take-last 3
-                                  (split (.getCanonicalPath file) (re-pattern file-path-separator)))))
+                                  (split (.getCanonicalPath file) (re-pattern Utils/FILE_PATH_SEPARATOR)))))
 
 (defn get-metadata-file-for-log-root-name [root-name root-dir]
   (let [metaFile (clojure.java.io/file root-dir "metadata"
@@ -141,10 +143,10 @@
         nil))))
 
 (defn get-worker-id-from-metadata-file [metaFile]
-  (get (clojure-from-yaml-file metaFile) "worker-id"))
+  (get (clojurify-structure (Utils/readYamlFile metaFile)) "worker-id"))
 
 (defn get-topo-owner-from-metadata-file [metaFile]
-  (get (clojure-from-yaml-file metaFile) TOPOLOGY-SUBMITTER-USER))
+  (get (clojurify-structure (Utils/readYamlFile  metaFile)) TOPOLOGY-SUBMITTER-USER))
 
 (defn identify-worker-log-dirs [log-dirs]
   "return the workerid to worker-log-dir map"
@@ -188,7 +190,7 @@
   "Return a sorted set of java.io.Files that were written by workers that are
   now active"
   [conf root-dir]
-  (let [alive-ids (get-alive-ids conf (current-time-secs))
+  (let [alive-ids (get-alive-ids conf (Time/currentTimeSecs))
         log-dirs (get-all-worker-dirs root-dir)
         id->dir (identify-worker-log-dirs log-dirs)]
     (apply sorted-set
@@ -227,12 +229,12 @@
   [^File dir]
   (let [topodir (.getParentFile dir)]
     (if (empty? (.listFiles topodir))
-      (rmr (.getCanonicalPath topodir)))))
+      (Utils/forceDelete (.getCanonicalPath topodir)))))
 
 (defn cleanup-fn!
   "Delete old log dirs for which the workers are no longer alive"
   [log-root-dir]
-  (let [now-secs (current-time-secs)
+  (let [now-secs (Time/currentTimeSecs)
         old-log-dirs (select-dirs-for-cleanup *STORM-CONF*
                                               (* now-secs 1000)
                                               log-root-dir)
@@ -250,7 +252,7 @@
     (dofor [dir dead-worker-dirs]
            (let [path (.getCanonicalPath dir)]
              (log-message "Cleaning up: Removing " path)
-             (try (rmr path)
+             (try (Utils/forceDelete path)
                   (cleanup-empty-topodir! dir)
                   (catch Exception ex (log-error ex)))))
     (per-workerdir-cleanup! (File. log-root-dir) (* per-dir-size (* 1024 1024)) cleaner)
@@ -264,7 +266,7 @@
       (schedule-recurring (mk-timer :thread-name "logviewer-cleanup"
                                     :kill-fn (fn [t]
                                                (log-error t "Error when doing logs cleanup")
-                                               (exit-process! 20 "Error when doing log cleanup")))
+                                               (Utils/exitProcess 20 "Error when doing log cleanup")))
                           0 ;; Start immediately.
                           interval-secs
                           (fn [] (cleanup-fn! log-root-dir))))))
@@ -309,7 +311,7 @@
 
 (defn get-log-user-group-whitelist [fname]
   (let [wl-file (ConfigUtils/getLogMetaDataFile fname)
-        m (clojure-from-yaml-file wl-file)]
+        m (clojurify-structure (Utils/readYamlFile wl-file))]
     (if (not-nil? m)
       (do
         (let [user-wl (.get m LOGS-USERS)
@@ -514,9 +516,9 @@
 
 (defn url-to-match-centered-in-log-page
   [needle fname offset port]
-  (let [host (local-hostname)
+  (let [host (Utils/localHostname)
         port (logviewer-port)
-        fname (clojure.string/join file-path-separator (take-last 3 (split fname (re-pattern file-path-separator))))]
+        fname (clojure.string/join Utils/FILE_PATH_SEPARATOR (take-last 3 (split fname (re-pattern Utils/FILE_PATH_SEPARATOR))))]
     (url (str "http://" host ":" port "/log")
       {:file fname
        :start (max 0
@@ -851,7 +853,7 @@
               new-matches (conj matches
                             (merge these-matches
                               { "fileName" file-name
-                                "port" (first (take-last 2 (split (.getCanonicalPath (first logs)) (re-pattern file-path-separator))))}))
+                                "port" (first (take-last 2 (split (.getCanonicalPath (first logs)) (re-pattern Utils/FILE_PATH_SEPARATOR))))}))
               new-count (+ match-count (count (these-matches "matches")))]
           (if (empty? these-matches)
             (recur matches (rest logs) 0 (+ file-offset 1) match-count)
@@ -874,12 +876,12 @@
 (defn deep-search-logs-for-topology
   [topology-id user ^String root-dir search num-matches port file-offset offset search-archived? callback origin]
   (json-response
-    (if (or (not search) (not (.exists (File. (str root-dir file-path-separator topology-id)))))
+    (if (or (not search) (not (.exists (File. (str root-dir Utils/FILE_PATH_SEPARATOR topology-id)))))
       []
       (let [file-offset (if file-offset (Integer/parseInt file-offset) 0)
             offset (if offset (Integer/parseInt offset) 0)
             num-matches (or (Integer/parseInt num-matches) 1)
-            port-dirs (vec (.listFiles (File. (str root-dir file-path-separator topology-id))))
+            port-dirs (vec (.listFiles (File. (str root-dir Utils/FILE_PATH_SEPARATOR topology-id))))
             logs-for-port-fn (partial logs-for-port user)]
         (if (or (not port) (= "*" port))
           ;; Check for all ports
@@ -892,7 +894,7 @@
           ;; Check just the one port
           (if (not (contains? (into #{} (map str (*STORM-CONF* SUPERVISOR-SLOTS-PORTS))) port))
             []
-            (let [port-dir (File. (str root-dir file-path-separator topology-id file-path-separator port))]
+            (let [port-dir (File. (str root-dir Utils/FILE_PATH_SEPARATOR topology-id Utils/FILE_PATH_SEPARATOR port))]
               (if (or (not (.exists port-dir)) (empty? (logs-for-port user port-dir)))
                 []
                 (let [filtered-logs (logs-for-port user port-dir)]
@@ -945,7 +947,7 @@
                     (if (= (str port) (.getName port-dir))
                       (into [] (DirectoryCleaner/getFilesForDir port-dir))))))))
           (if (nil? port)
-            (let [topo-dir (File. (str log-root file-path-separator topoId))]
+            (let [topo-dir (File. (str log-root Utils/FILE_PATH_SEPARATOR topoId))]
               (if (.exists topo-dir)
                 (reduce concat
                   (for [port-dir (.listFiles topo-dir)]
@@ -982,7 +984,7 @@
             user (.getUserName http-creds-handler servlet-request)
             start (if (:start m) (parse-long-from-map m :start))
             length (if (:length m) (parse-long-from-map m :length))
-            file (url-decode (:file m))]
+            file (URLDecoder/decode (:file m))]
         (log-template (log-page file start length (:grep m) user log-root)
           file user))
       (catch InvalidRequestException ex
@@ -993,21 +995,21 @@
      (let [user (.getUserName http-creds-handler servlet-request)
            port (second (split host-port #":"))
            dir (File. (str log-root
-                           file-path-separator
+                           Utils/FILE_PATH_SEPARATOR
                            topo-id
-                           file-path-separator
+                           Utils/FILE_PATH_SEPARATOR
                            port))
            file (File. (str log-root
-                            file-path-separator
+                            Utils/FILE_PATH_SEPARATOR
                             topo-id
-                            file-path-separator
+                            Utils/FILE_PATH_SEPARATOR
                             port
-                            file-path-separator
+                            Utils/FILE_PATH_SEPARATOR
                             filename))]
        (if (and (.exists dir) (.exists file))
          (if (or (blank? (*STORM-CONF* UI-FILTER))
                (authorized-log-user? user 
-                                     (str topo-id file-path-separator port file-path-separator "worker.log")
+                                     (str topo-id Utils/FILE_PATH_SEPARATOR port Utils/FILE_PATH_SEPARATOR "worker.log")
                                      *STORM-CONF*))
            (-> (resp/response file)
                (resp/content-type "application/octet-stream"))
@@ -1019,14 +1021,14 @@
      (let [user (.getUserName http-creds-handler servlet-request)
            port (second (split host-port #":"))
            dir (File. (str log-root
-                           file-path-separator
+                           Utils/FILE_PATH_SEPARATOR
                            topo-id
-                           file-path-separator
+                           Utils/FILE_PATH_SEPARATOR
                            port))]
        (if (.exists dir)
          (if (or (blank? (*STORM-CONF* UI-FILTER))
                (authorized-log-user? user 
-                                     (str topo-id file-path-separator port file-path-separator "worker.log")
+                                     (str topo-id Utils/FILE_PATH_SEPARATOR port Utils/FILE_PATH_SEPARATOR "worker.log")
                                      *STORM-CONF*))
            (html4
              [:head
@@ -1050,7 +1052,7 @@
             user (.getUserName http-creds-handler servlet-request)
             start (if (:start m) (parse-long-from-map m :start))
             length (if (:length m) (parse-long-from-map m :length))
-            file (url-decode (:file m))]
+            file (URLDecoder/decode (:file m))]
         (log-template (daemonlog-page file start length (:grep m) user daemonlog-root)
           file user))
       (catch InvalidRequestException ex
@@ -1078,7 +1080,7 @@
     ;; filter is configured.
     (try
       (let [user (.getUserName http-creds-handler servlet-request)]
-        (search-log-file (url-decode file)
+        (search-log-file (URLDecoder/decode file)
           user
           (if (= (:is-daemon m) "yes") daemonlog-root log-root)
           (:search-string m)
@@ -1192,7 +1194,7 @@
   (let [conf (clojurify-structure (ConfigUtils/readStormConfig))
         log-root (ConfigUtils/workerArtifactsRoot conf)
         daemonlog-root (log-root-dir (conf LOGVIEWER-APPENDER-NAME))]
-    (setup-default-uncaught-exception-handler)
+    (Utils/setupDefaultUncaughtExceptionHandler)
     (start-log-cleaner! conf log-root)
     (log-message "Starting logviewer server for storm version '"
                  STORM-VERSION