You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by bo...@apache.org on 2016/01/11 21:57:08 UTC

[12/53] [abbrv] [partial] storm git commit: STORM-1202: Migrate APIs to org.apache.storm, but try to provide some form of backwards compatability

http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/storm-core/src/clj/org/apache/storm/util.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/util.clj b/storm-core/src/clj/org/apache/storm/util.clj
new file mode 100644
index 0000000..23d39f6
--- /dev/null
+++ b/storm-core/src/clj/org/apache/storm/util.clj
@@ -0,0 +1,1118 @@
+;; Licensed to the Apache Software Foundation (ASF) under one
+;; or more contributor license agreements.  See the NOTICE file
+;; distributed with this work for additional information
+;; regarding copyright ownership.  The ASF licenses this file
+;; to you under the Apache License, Version 2.0 (the
+;; "License"); you may not use this file except in compliance
+;; with the License.  You may obtain a copy of the License at
+;;
+;; http://www.apache.org/licenses/LICENSE-2.0
+;;
+;; Unless required by applicable law or agreed to in writing, software
+;; distributed under the License is distributed on an "AS IS" BASIS,
+;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+;; See the License for the specific language governing permissions and
+;; limitations under the License.
+
+(ns org.apache.storm.util
+  (:import [java.net InetAddress])
+  (:import [java.util Map Map$Entry List ArrayList Collection Iterator HashMap])
+  (:import [java.io FileReader FileNotFoundException])
+  (:import [java.nio.file Paths])
+  (:import [org.apache.storm Config])
+  (:import [org.apache.storm.utils Time Container ClojureTimerTask Utils
+            MutableObject MutableInt])
+  (:import [org.apache.storm.security.auth NimbusPrincipal])
+  (:import [javax.security.auth Subject])
+  (:import [java.util UUID Random ArrayList List Collections])
+  (:import [java.util.zip ZipFile])
+  (:import [java.util.concurrent.locks ReentrantReadWriteLock])
+  (:import [java.util.concurrent Semaphore])
+  (:import [java.nio.file Files Paths])
+  (:import [java.nio.file.attribute FileAttribute])
+  (:import [java.io File FileOutputStream RandomAccessFile StringWriter
+            PrintWriter BufferedReader InputStreamReader IOException])
+  (:import [java.lang.management ManagementFactory])
+  (:import [org.apache.commons.exec DefaultExecutor CommandLine])
+  (:import [org.apache.commons.io FileUtils])
+  (:import [org.apache.storm.logging ThriftAccessLogger])
+  (:import [org.apache.commons.exec ExecuteException])
+  (:import [org.json.simple JSONValue])
+  (:import [org.yaml.snakeyaml Yaml]
+           [org.yaml.snakeyaml.constructor SafeConstructor])
+  (:require [clojure [string :as str]])
+  (:import [clojure.lang RT])
+  (:require [clojure [set :as set]])
+  (:require [clojure.java.io :as io])
+  (:use [clojure walk])
+  (:require [ring.util.codec :as codec])
+  (:use [org.apache.storm log]))
+
+(defn wrap-in-runtime
+  "Wraps an exception in a RuntimeException if needed"
+  [^Exception e]
+  (if (instance? RuntimeException e)
+    e
+    (RuntimeException. e)))
+
+(def on-windows?
+  (= "Windows_NT" (System/getenv "OS")))
+
+(def file-path-separator
+  (System/getProperty "file.separator"))
+
+(def class-path-separator
+  (System/getProperty "path.separator"))
+
+(defn is-absolute-path? [path]
+  (.isAbsolute (Paths/get path (into-array String []))))
+
+(defmacro defalias
+  "Defines an alias for a var: a new var with the same root binding (if
+  any) and similar metadata. The metadata of the alias is its initial
+  metadata (as provided by def) merged into the metadata of the original."
+  ([name orig]
+   `(do
+      (alter-meta!
+        (if (.hasRoot (var ~orig))
+          (def ~name (.getRawRoot (var ~orig)))
+          (def ~name))
+        ;; When copying metadata, disregard {:macro false}.
+        ;; Workaround for http://www.assembla.com/spaces/clojure/tickets/273
+        #(conj (dissoc % :macro)
+               (apply dissoc (meta (var ~orig)) (remove #{:macro} (keys %)))))
+      (var ~name)))
+  ([name orig doc]
+   (list `defalias (with-meta name (assoc (meta name) :doc doc)) orig)))
+
+;; name-with-attributes by Konrad Hinsen:
+(defn name-with-attributes
+  "To be used in macro definitions.
+  Handles optional docstrings and attribute maps for a name to be defined
+  in a list of macro arguments. If the first macro argument is a string,
+  it is added as a docstring to name and removed from the macro argument
+  list. If afterwards the first macro argument is a map, its entries are
+  added to the name's metadata map and the map is removed from the
+  macro argument list. The return value is a vector containing the name
+  with its extended metadata map and the list of unprocessed macro
+  arguments."
+  [name macro-args]
+  (let [[docstring macro-args] (if (string? (first macro-args))
+                                 [(first macro-args) (next macro-args)]
+                                 [nil macro-args])
+        [attr macro-args] (if (map? (first macro-args))
+                            [(first macro-args) (next macro-args)]
+                            [{} macro-args])
+        attr (if docstring
+               (assoc attr :doc docstring)
+               attr)
+        attr (if (meta name)
+               (conj (meta name) attr)
+               attr)]
+    [(with-meta name attr) macro-args]))
+
+(defmacro defnk
+  "Define a function accepting keyword arguments. Symbols up to the first
+  keyword in the parameter list are taken as positional arguments.  Then
+  an alternating sequence of keywords and defaults values is expected. The
+  values of the keyword arguments are available in the function body by
+  virtue of the symbol corresponding to the keyword (cf. :keys destructuring).
+  defnk accepts an optional docstring as well as an optional metadata map."
+  [fn-name & fn-tail]
+  (let [[fn-name [args & body]] (name-with-attributes fn-name fn-tail)
+        [pos kw-vals] (split-with symbol? args)
+        syms (map #(-> % name symbol) (take-nth 2 kw-vals))
+        values (take-nth 2 (rest kw-vals))
+        sym-vals (apply hash-map (interleave syms values))
+        de-map {:keys (vec syms) :or sym-vals}]
+    `(defn ~fn-name
+       [~@pos & options#]
+       (let [~de-map (apply hash-map options#)]
+         ~@body))))
+
+(defn find-first
+  "Returns the first item of coll for which (pred item) returns logical true.
+  Consumes sequences up to the first match, will consume the entire sequence
+  and return nil if no match is found."
+  [pred coll]
+  (first (filter pred coll)))
+
+(defn dissoc-in
+  "Dissociates an entry from a nested associative structure returning a new
+  nested structure. keys is a sequence of keys. Any empty maps that result
+  will not be present in the new structure."
+  [m [k & ks :as keys]]
+  (if ks
+    (if-let [nextmap (get m k)]
+      (let [newmap (dissoc-in nextmap ks)]
+        (if (seq newmap)
+          (assoc m k newmap)
+          (dissoc m k)))
+      m)
+    (dissoc m k)))
+
+(defn indexed
+  "Returns a lazy sequence of [index, item] pairs, where items come
+  from 's' and indexes count up from zero.
+
+  (indexed '(a b c d))  =>  ([0 a] [1 b] [2 c] [3 d])"
+  [s]
+  (map vector (iterate inc 0) s))
+
+(defn positions
+  "Returns a lazy sequence containing the positions at which pred
+  is true for items in coll."
+  [pred coll]
+  (for [[idx elt] (indexed coll) :when (pred elt)] idx))
+
+(defn exception-cause?
+  [klass ^Throwable t]
+  (->> (iterate #(.getCause ^Throwable %) t)
+       (take-while identity)
+       (some (partial instance? klass))
+       boolean))
+
+(defmacro thrown-cause?
+  [klass & body]
+  `(try
+     ~@body
+     false
+     (catch Throwable t#
+       (exception-cause? ~klass t#))))
+
+(defmacro thrown-cause-with-msg?
+  [klass re & body]
+  `(try
+     ~@body
+     false
+     (catch Throwable t#
+       (and (re-matches ~re (.getMessage t#))
+            (exception-cause? ~klass t#)))))
+
+(defmacro forcat
+  [[args aseq] & body]
+  `(mapcat (fn [~args]
+             ~@body)
+           ~aseq))
+
+(defmacro try-cause
+  [& body]
+  (let [checker (fn [form]
+                  (or (not (sequential? form))
+                      (not= 'catch (first form))))
+        [code guards] (split-with checker body)
+        error-local (gensym "t")
+        guards (forcat [[_ klass local & guard-body] guards]
+                       `((exception-cause? ~klass ~error-local)
+                         (let [~local ~error-local]
+                           ~@guard-body
+                           )))]
+    `(try ~@code
+       (catch Throwable ~error-local
+         (cond ~@guards
+               true (throw ~error-local)
+               )))))
+
+(defn local-hostname
+  []
+  (.getCanonicalHostName (InetAddress/getLocalHost)))
+
+(def memoized-local-hostname (memoize local-hostname))
+
+;; checks conf for STORM_LOCAL_HOSTNAME.
+;; when unconfigured, falls back to (memoized) guess by `local-hostname`.
+(defn hostname
+  [conf]
+  (conf Config/STORM_LOCAL_HOSTNAME (memoized-local-hostname)))
+
+(letfn [(try-port [port]
+                  (with-open [socket (java.net.ServerSocket. port)]
+                    (.getLocalPort socket)))]
+  (defn available-port
+    ([] (try-port 0))
+    ([preferred]
+     (try
+       (try-port preferred)
+       (catch java.io.IOException e
+         (available-port))))))
+
+(defn uuid []
+  (str (UUID/randomUUID)))
+
+(defn current-time-secs
+  []
+  (Time/currentTimeSecs))
+
+(defn current-time-millis
+  []
+  (Time/currentTimeMillis))
+
+(defn secs-to-millis-long
+  [secs]
+  (long (* (long 1000) secs)))
+
+(defn clojurify-structure
+  [s]
+  (prewalk (fn [x]
+             (cond (instance? Map x) (into {} x)
+                   (instance? List x) (vec x)
+                   ;; (Boolean. false) does not evaluate to false in an if.
+                   ;; This fixes that.
+                   (instance? Boolean x) (boolean x)
+                   true x))
+           s))
+
+(defmacro with-file-lock
+  [path & body]
+  `(let [f# (File. ~path)
+         _# (.createNewFile f#)
+         rf# (RandomAccessFile. f# "rw")
+         lock# (.. rf# (getChannel) (lock))]
+     (try
+       ~@body
+       (finally
+         (.release lock#)
+         (.close rf#)))))
+
+(defn tokenize-path
+  [^String path]
+  (let [toks (.split path "/")]
+    (vec (filter (complement empty?) toks))))
+
+(defn assoc-conj
+  [m k v]
+  (merge-with concat m {k [v]}))
+
+;; returns [ones in first set not in second, ones in second set not in first]
+(defn set-delta
+  [old curr]
+  (let [s1 (set old)
+        s2 (set curr)]
+    [(set/difference s1 s2) (set/difference s2 s1)]))
+
+(defn parent-path
+  [path]
+  (let [toks (tokenize-path path)]
+    (str "/" (str/join "/" (butlast toks)))))
+
+(defn toks->path
+  [toks]
+  (str "/" (str/join "/" toks)))
+
+(defn normalize-path
+  [^String path]
+  (toks->path (tokenize-path path)))
+
+(defn map-val
+  [afn amap]
+  (into {}
+        (for [[k v] amap]
+          [k (afn v)])))
+
+(defn filter-val
+  [afn amap]
+  (into {} (filter (fn [[k v]] (afn v)) amap)))
+
+(defn filter-key
+  [afn amap]
+  (into {} (filter (fn [[k v]] (afn k)) amap)))
+
+(defn map-key
+  [afn amap]
+  (into {} (for [[k v] amap] [(afn k) v])))
+
+(defn separate
+  [pred aseq]
+  [(filter pred aseq) (filter (complement pred) aseq)])
+
+(defn full-path
+  [parent name]
+  (let [toks (tokenize-path parent)]
+    (toks->path (conj toks name))))
+
+(def not-nil? (complement nil?))
+
+(defn barr
+  [& vals]
+  (byte-array (map byte vals)))
+
+(defn exit-process!
+  [val & msg]
+  (log-error (RuntimeException. (str msg)) "Halting process: " msg)
+  (.exit (Runtime/getRuntime) val))
+
+(defn sum
+  [vals]
+  (reduce + vals))
+
+(defn repeat-seq
+  ([aseq]
+   (apply concat (repeat aseq)))
+  ([amt aseq]
+   (apply concat (repeat amt aseq))))
+
+(defn div
+  "Perform floating point division on the arguments."
+  [f & rest]
+  (apply / (double f) rest))
+
+(defn defaulted
+  [val default]
+  (if val val default))
+
+(defn mk-counter
+  ([] (mk-counter 1))
+  ([start-val]
+   (let [val (atom (dec start-val))]
+     (fn [] (swap! val inc)))))
+
+(defmacro for-times [times & body]
+  `(for [i# (range ~times)]
+     ~@body))
+
+(defmacro dofor [& body]
+  `(doall (for ~@body)))
+
+(defn reverse-map
+  "{:a 1 :b 1 :c 2} -> {1 [:a :b] 2 :c}"
+  [amap]
+  (reduce (fn [m [k v]]
+            (let [existing (get m v [])]
+              (assoc m v (conj existing k))))
+          {} amap))
+
+(defmacro print-vars [& vars]
+  (let [prints (for [v vars] `(println ~(str v) ~v))]
+    `(do ~@prints)))
+
+(defn process-pid
+  "Gets the pid of this JVM. Hacky because Java doesn't provide a real way to do this."
+  []
+  (let [name (.getName (ManagementFactory/getRuntimeMXBean))
+        split (.split name "@")]
+    (when-not (= 2 (count split))
+      (throw (RuntimeException. (str "Got unexpected process name: " name))))
+    (first split)))
+
+(defn exec-command! [command]
+  (let [[comm-str & args] (seq (.split command " "))
+        command (CommandLine. comm-str)]
+    (doseq [a args]
+      (.addArgument command a))
+    (.execute (DefaultExecutor.) command)))
+
+(defn extract-dir-from-jar [jarpath dir destdir]
+  (try-cause
+    (with-open [jarpath (ZipFile. jarpath)]
+      (let [entries (enumeration-seq (.entries jarpath))]
+        (doseq [file (filter (fn [entry](and (not (.isDirectory entry)) (.startsWith (.getName entry) dir))) entries)]
+          (.mkdirs (.getParentFile (File. destdir (.getName file))))
+          (with-open [out (FileOutputStream. (File. destdir (.getName file)))]
+            (io/copy (.getInputStream jarpath file) out)))))
+    (catch IOException e
+      (log-message "Could not extract " dir " from " jarpath))))
+
+(defn sleep-secs [secs]
+  (when (pos? secs)
+    (Time/sleep (* (long secs) 1000))))
+
+(defn sleep-until-secs [target-secs]
+  (Time/sleepUntil (* (long target-secs) 1000)))
+
+(def ^:const sig-kill 9)
+
+(def ^:const sig-term 15)
+
+(defn send-signal-to-process
+  [pid signum]
+  (try-cause
+    (exec-command! (str (if on-windows?
+                          (if (== signum sig-kill) "taskkill /f /pid " "taskkill /pid ")
+                          (str "kill -" signum " "))
+                     pid))
+    (catch ExecuteException e
+      (log-message "Error when trying to kill " pid ". Process is probably already dead."))))
+
+(defn read-and-log-stream
+  [prefix stream]
+  (try
+    (let [reader (BufferedReader. (InputStreamReader. stream))]
+      (loop []
+        (if-let [line (.readLine reader)]
+                (do
+                  (log-warn (str prefix ":" line))
+                  (recur)))))
+    (catch IOException e
+      (log-warn "Error while trying to log stream" e))))
+
+(defn force-kill-process
+  [pid]
+  (send-signal-to-process pid sig-kill))
+
+(defn kill-process-with-sig-term
+  [pid]
+  (send-signal-to-process pid sig-term))
+
+(defn add-shutdown-hook-with-force-kill-in-1-sec
+  "adds the user supplied function as a shutdown hook for cleanup.
+   Also adds a function that sleeps for a second and then sends kill -9 to process to avoid any zombie process in case
+   cleanup function hangs."
+  [func]
+  (.addShutdownHook (Runtime/getRuntime) (Thread. #(func)))
+  (.addShutdownHook (Runtime/getRuntime) (Thread. #((sleep-secs 1)
+                                                    (.halt (Runtime/getRuntime) 20)))))
+
+(defprotocol SmartThread
+  (start [this])
+  (join [this])
+  (interrupt [this])
+  (sleeping? [this]))
+
+;; afn returns amount of time to sleep
+(defnk async-loop [afn
+                   :daemon false
+                   :kill-fn (fn [error] (exit-process! 1 "Async loop died!"))
+                   :priority Thread/NORM_PRIORITY
+                   :factory? false
+                   :start true
+                   :thread-name nil]
+  (let [thread (Thread.
+                 (fn []
+                   (try-cause
+                     (let [afn (if factory? (afn) afn)]
+                       (loop []
+                         (let [sleep-time (afn)]
+                           (when-not (nil? sleep-time)
+                             (sleep-secs sleep-time)
+                             (recur))
+                           )))
+                     (catch InterruptedException e
+                       (log-message "Async loop interrupted!")
+                       )
+                     (catch Throwable t
+                       (log-error t "Async loop died!")
+                       (kill-fn t)))))]
+    (.setDaemon thread daemon)
+    (.setPriority thread priority)
+    (when thread-name
+      (.setName thread (str (.getName thread) "-" thread-name)))
+    (when start
+      (.start thread))
+    ;; should return object that supports stop, interrupt, join, and waiting?
+    (reify SmartThread
+      (start
+        [this]
+        (.start thread))
+      (join
+        [this]
+        (.join thread))
+      (interrupt
+        [this]
+        (.interrupt thread))
+      (sleeping?
+        [this]
+        (Time/isThreadWaiting thread)))))
+
+(defn shell-cmd
+  [command]
+  (->> command
+    (map #(str \' (clojure.string/escape % {\' "'\"'\"'"}) \'))
+      (clojure.string/join " ")))
+
+(defn script-file-path [dir]
+  (str dir file-path-separator "storm-worker-script.sh"))
+
+(defn container-file-path [dir]
+  (str dir file-path-separator "launch_container.sh"))
+
+(defnk write-script
+  [dir command :environment {}]
+  (let [script-src (str "#!/bin/bash\n" (clojure.string/join "" (map (fn [[k v]] (str (shell-cmd ["export" (str k "=" v)]) ";\n")) environment)) "\nexec " (shell-cmd command) ";")
+        script-path (script-file-path dir)
+        _ (spit script-path script-src)]
+    script-path
+  ))
+
+(defnk launch-process
+  [command :environment {} :log-prefix nil :exit-code-callback nil :directory nil]
+  (let [builder (ProcessBuilder. command)
+        process-env (.environment builder)]
+    (when directory (.directory builder directory))
+    (.redirectErrorStream builder true)
+    (doseq [[k v] environment]
+      (.put process-env k v))
+    (let [process (.start builder)]
+      (if (or log-prefix exit-code-callback)
+        (async-loop
+         (fn []
+           (if log-prefix
+             (read-and-log-stream log-prefix (.getInputStream process)))
+           (when exit-code-callback
+             (try
+               (.waitFor process)
+               (catch InterruptedException e
+                 (log-message log-prefix " interrupted.")))
+             (exit-code-callback (.exitValue process)))
+           nil)))                    
+      process)))
+   
+(defn exists-file?
+  [path]
+  (.exists (File. path)))
+
+(defn rmr
+  [path]
+  (log-debug "Rmr path " path)
+  (when (exists-file? path)
+    (try
+      (FileUtils/forceDelete (File. path))
+      (catch FileNotFoundException e))))
+
+(defn rmpath
+  "Removes file or directory at the path. Not recursive. Throws exception on failure"
+  [path]
+  (log-debug "Removing path " path)
+  (when (exists-file? path)
+    (let [deleted? (.delete (File. path))]
+      (when-not deleted?
+        (throw (RuntimeException. (str "Failed to delete " path)))))))
+
+(defn local-mkdirs
+  [path]
+  (log-debug "Making dirs at " path)
+  (FileUtils/forceMkdir (File. path)))
+
+(defn touch
+  [path]
+  (log-debug "Touching file at " path)
+  (let [success? (do (if on-windows? (.mkdirs (.getParentFile (File. path))))
+                   (.createNewFile (File. path)))]
+    (when-not success?
+      (throw (RuntimeException. (str "Failed to touch " path))))))
+
+(defn create-symlink!
+  "Create symlink is to the target"
+  ([path-dir target-dir file-name]
+    (create-symlink! path-dir target-dir file-name file-name))
+  ([path-dir target-dir from-file-name to-file-name]
+    (let [path (str path-dir file-path-separator from-file-name)
+          target (str target-dir file-path-separator to-file-name)
+          empty-array (make-array String 0)
+          attrs (make-array FileAttribute 0)
+          abs-path (.toAbsolutePath (Paths/get path empty-array))
+          abs-target (.toAbsolutePath (Paths/get target empty-array))]
+      (log-debug "Creating symlink [" abs-path "] to [" abs-target "]")
+      (if (not (.exists (.toFile abs-path)))
+        (Files/createSymbolicLink abs-path abs-target attrs)))))
+
+(defn read-dir-contents
+  [dir]
+  (if (exists-file? dir)
+    (let [content-files (.listFiles (File. dir))]
+      (map #(.getName ^File %) content-files))
+    []))
+
+(defn compact
+  [aseq]
+  (filter (complement nil?) aseq))
+
+(defn current-classpath
+  []
+  (System/getProperty "java.class.path"))
+
+(defn get-full-jars
+  [dir]
+  (map #(str dir file-path-separator %) (filter #(.endsWith % ".jar") (read-dir-contents dir))))
+
+(defn worker-classpath
+  []
+  (let [storm-dir (System/getProperty "storm.home")
+        storm-lib-dir (str storm-dir file-path-separator "lib")
+        storm-conf-dir (if-let [confdir (System/getenv "STORM_CONF_DIR")]
+                         confdir 
+                         (str storm-dir file-path-separator "conf"))
+        storm-extlib-dir (str storm-dir file-path-separator "extlib")
+        extcp (System/getenv "STORM_EXT_CLASSPATH")]
+    (if (nil? storm-dir) 
+      (current-classpath)
+      (str/join class-path-separator
+                (remove nil? (concat (get-full-jars storm-lib-dir) (get-full-jars storm-extlib-dir) [extcp] [storm-conf-dir]))))))
+
+(defn add-to-classpath
+  [classpath paths]
+  (if (empty? paths)
+    classpath
+    (str/join class-path-separator (cons classpath paths))))
+
+(defn ^ReentrantReadWriteLock mk-rw-lock
+  []
+  (ReentrantReadWriteLock.))
+
+(defmacro read-locked
+  [rw-lock & body]
+  (let [lock (with-meta rw-lock {:tag `ReentrantReadWriteLock})]
+    `(let [rlock# (.readLock ~lock)]
+       (try (.lock rlock#)
+         ~@body
+         (finally (.unlock rlock#))))))
+
+(defmacro write-locked
+  [rw-lock & body]
+  (let [lock (with-meta rw-lock {:tag `ReentrantReadWriteLock})]
+    `(let [wlock# (.writeLock ~lock)]
+       (try (.lock wlock#)
+         ~@body
+         (finally (.unlock wlock#))))))
+
+(defn time-delta
+  [time-secs]
+  (- (current-time-secs) time-secs))
+
+(defn time-delta-ms
+  [time-ms]
+  (- (System/currentTimeMillis) (long time-ms)))
+
+(defn parse-int
+  [str]
+  (Integer/valueOf str))
+
+(defn integer-divided
+  [sum num-pieces]
+  (clojurify-structure (Utils/integerDivided sum num-pieces)))
+
+(defn collectify
+  [obj]
+  (if (or (sequential? obj) (instance? Collection obj))
+    obj
+    [obj]))
+
+(defn to-json
+  [obj]
+  (JSONValue/toJSONString obj))
+
+(defn from-json
+  [^String str]
+  (if str
+    (clojurify-structure
+      (JSONValue/parse str))
+    nil))
+
+(defmacro letlocals
+  [& body]
+  (let [[tobind lexpr] (split-at (dec (count body)) body)
+        binded (vec (mapcat (fn [e]
+                              (if (and (list? e) (= 'bind (first e)))
+                                [(second e) (last e)]
+                                ['_ e]
+                                ))
+                            tobind))]
+    `(let ~binded
+       ~(first lexpr))))
+
+(defn remove-first
+  [pred aseq]
+  (let [[b e] (split-with (complement pred) aseq)]
+    (when (empty? e)
+      (throw (IllegalArgumentException. "Nothing to remove")))
+    (concat b (rest e))))
+
+(defn assoc-non-nil
+  [m k v]
+  (if v (assoc m k v) m))
+
+(defn multi-set
+  "Returns a map of elem to count"
+  [aseq]
+  (apply merge-with +
+         (map #(hash-map % 1) aseq)))
+
+(defn set-var-root*
+  [avar val]
+  (alter-var-root avar (fn [avar] val)))
+
+(defmacro set-var-root
+  [var-sym val]
+  `(set-var-root* (var ~var-sym) ~val))
+
+(defmacro with-var-roots
+  [bindings & body]
+  (let [settings (partition 2 bindings)
+        tmpvars (repeatedly (count settings) (partial gensym "old"))
+        vars (map first settings)
+        savevals (vec (mapcat (fn [t v] [t v]) tmpvars vars))
+        setters (for [[v s] settings] `(set-var-root ~v ~s))
+        restorers (map (fn [v s] `(set-var-root ~v ~s)) vars tmpvars)]
+    `(let ~savevals
+       ~@setters
+       (try
+         ~@body
+         (finally
+           ~@restorers)))))
+
+(defn map-diff
+  "Returns mappings in m2 that aren't in m1"
+  [m1 m2]
+  (into {} (filter (fn [[k v]] (not= v (m1 k))) m2)))
+
+(defn select-keys-pred
+  [pred amap]
+  (into {} (filter (fn [[k v]] (pred k)) amap)))
+
+(defn rotating-random-range
+  [choices]
+  (let [rand (Random.)
+        choices (ArrayList. choices)]
+    (Collections/shuffle choices rand)
+    [(MutableInt. -1) choices rand]))
+
+(defn acquire-random-range-id
+  [[^MutableInt curr ^List state ^Random rand]]
+  (when (>= (.increment curr) (.size state))
+    (.set curr 0)
+    (Collections/shuffle state rand))
+  (.get state (.get curr)))
+
+; this can be rewritten to be tail recursive
+(defn interleave-all
+  [& colls]
+  (if (empty? colls)
+    []
+    (let [colls (filter (complement empty?) colls)
+          my-elems (map first colls)
+          rest-elems (apply interleave-all (map rest colls))]
+      (concat my-elems rest-elems))))
+
+(defn any-intersection
+  [& sets]
+  (let [elem->count (multi-set (apply concat sets))]
+    (-> (filter-val #(> % 1) elem->count)
+        keys)))
+
+(defn between?
+  "val >= lower and val <= upper"
+  [val lower upper]
+  (and (>= val lower)
+       (<= val upper)))
+
+(defmacro benchmark
+  [& body]
+  `(let [l# (doall (range 1000000))]
+     (time
+       (doseq [i# l#]
+         ~@body))))
+
+(defn rand-sampler
+  [freq]
+  (let [r (java.util.Random.)]
+    (fn [] (= 0 (.nextInt r freq)))))
+
+(defn even-sampler
+  [freq]
+  (let [freq (int freq)
+        start (int 0)
+        r (java.util.Random.)
+        curr (MutableInt. -1)
+        target (MutableInt. (.nextInt r freq))]
+    (with-meta
+      (fn []
+        (let [i (.increment curr)]
+          (when (>= i freq)
+            (.set curr start)
+            (.set target (.nextInt r freq))))
+        (= (.get curr) (.get target)))
+      {:rate freq})))
+
+(defn sampler-rate
+  [sampler]
+  (:rate (meta sampler)))
+
+(defn class-selector
+  [obj & args]
+  (class obj))
+
+(defn uptime-computer []
+  (let [start-time (current-time-secs)]
+    (fn [] (time-delta start-time))))
+
+(defn stringify-error [error]
+  (let [result (StringWriter.)
+        printer (PrintWriter. result)]
+    (.printStackTrace error printer)
+    (.toString result)))
+
+(defn nil-to-zero
+  [v]
+  (or v 0))
+
+(defn bit-xor-vals
+  [vals]
+  (reduce bit-xor 0 vals))
+
+(defmacro with-error-reaction
+  [afn & body]
+  `(try ~@body
+     (catch Throwable t# (~afn t#))))
+
+(defn container
+  []
+  (Container.))
+
+(defn container-set! [^Container container obj]
+  (set! (. container object) obj)
+  container)
+
+(defn container-get [^Container container]
+  (. container object))
+
+(defn to-millis [secs]
+  (* 1000 (long secs)))
+
+(defn throw-runtime [& strs]
+  (throw (RuntimeException. (apply str strs))))
+
+(defn redirect-stdio-to-slf4j!
+  []
+  ;; set-var-root doesn't work with *out* and *err*, so digging much deeper here
+  ;; Unfortunately, this code seems to work at the REPL but not when spawned as worker processes
+  ;; it might have something to do with being a child process
+  ;; (set! (. (.getThreadBinding RT/OUT) val)
+  ;;       (java.io.OutputStreamWriter.
+  ;;         (log-stream :info "STDIO")))
+  ;; (set! (. (.getThreadBinding RT/ERR) val)
+  ;;       (PrintWriter.
+  ;;         (java.io.OutputStreamWriter.
+  ;;           (log-stream :error "STDIO"))
+  ;;         true))
+  (log-capture! "STDIO"))
+
+(defn spy
+  [prefix val]
+  (log-message prefix ": " val)
+  val)
+
+(defn zip-contains-dir?
+  [zipfile target]
+  (let [entries (->> zipfile (ZipFile.) .entries enumeration-seq (map (memfn getName)))]
+    (boolean (some #(.startsWith % (str target "/")) entries))))
+
+(defn url-encode
+  [s]
+  (codec/url-encode s))
+
+(defn url-decode
+  [s]
+  (codec/url-decode s))
+
+(defn join-maps
+  [& maps]
+  (let [all-keys (apply set/union (for [m maps] (-> m keys set)))]
+    (into {} (for [k all-keys]
+               [k (for [m maps] (m k))]))))
+
+(defn partition-fixed
+  [max-num-chunks aseq]
+  (if (zero? max-num-chunks)
+    []
+    (let [chunks (->> (integer-divided (count aseq) max-num-chunks)
+                      (#(dissoc % 0))
+                      (sort-by (comp - first))
+                      (mapcat (fn [[size amt]] (repeat amt size)))
+                      )]
+      (loop [result []
+             [chunk & rest-chunks] chunks
+             data aseq]
+        (if (nil? chunk)
+          result
+          (let [[c rest-data] (split-at chunk data)]
+            (recur (conj result c)
+                   rest-chunks
+                   rest-data)))))))
+
+
+(defn assoc-apply-self
+  [curr key afn]
+  (assoc curr key (afn curr)))
+
+(defmacro recursive-map
+  [& forms]
+  (->> (partition 2 forms)
+       (map (fn [[key form]] `(assoc-apply-self ~key (fn [~'<>] ~form))))
+       (concat `(-> {}))))
+
+(defn current-stack-trace
+  []
+  (->> (Thread/currentThread)
+       .getStackTrace
+       (map str)
+       (str/join "\n")))
+
+(defn get-iterator
+  [^Iterable alist]
+  (if alist (.iterator alist)))
+
+(defn iter-has-next?
+  [^Iterator iter]
+  (if iter (.hasNext iter) false))
+
+(defn iter-next
+  [^Iterator iter]
+  (.next iter))
+
+(defmacro fast-list-iter
+  [pairs & body]
+  (let [pairs (partition 2 pairs)
+        lists (map second pairs)
+        elems (map first pairs)
+        iters (map (fn [_] (gensym)) lists)
+        bindings (->> (map (fn [i l] [i `(get-iterator ~l)]) iters lists)
+                      (apply concat))
+        tests (map (fn [i] `(iter-has-next? ~i)) iters)
+        assignments (->> (map (fn [e i] [e `(iter-next ~i)]) elems iters)
+                         (apply concat))]
+    `(let [~@bindings]
+       (while (and ~@tests)
+         (let [~@assignments]
+           ~@body)))))
+
+(defn fast-list-map
+  [afn alist]
+  (let [ret (ArrayList.)]
+    (fast-list-iter [e alist]
+                    (.add ret (afn e)))
+    ret))
+
+(defmacro fast-list-for
+  [[e alist] & body]
+  `(fast-list-map (fn [~e] ~@body) ~alist))
+
+(defn map-iter
+  [^Map amap]
+  (if amap (-> amap .entrySet .iterator)))
+
+(defn convert-entry
+  [^Map$Entry entry]
+  [(.getKey entry) (.getValue entry)])
+
+(defmacro fast-map-iter
+  [[bind amap] & body]
+  `(let [iter# (map-iter ~amap)]
+     (while (iter-has-next? iter#)
+       (let [entry# (iter-next iter#)
+             ~bind (convert-entry entry#)]
+         ~@body))))
+
+(defn fast-first
+  [^List alist]
+  (.get alist 0))
+
+(defmacro get-with-default
+  [amap key default-val]
+  `(let [curr# (.get ~amap ~key)]
+     (if curr#
+       curr#
+       (do
+         (let [new# ~default-val]
+           (.put ~amap ~key new#)
+           new#)))))
+
+(defn fast-group-by
+  [afn alist]
+  (let [ret (HashMap.)]
+    (fast-list-iter
+      [e alist]
+      (let [key (afn e)
+            ^List curr (get-with-default ret key (ArrayList.))]
+        (.add curr e)))
+    ret))
+
+(defn new-instance
+  [klass]
+  (let [klass (if (string? klass) (Class/forName klass) klass)]
+    (.newInstance klass)))
+
+(defn get-configured-class
+  [conf config-key]
+  (if (.get conf config-key) (new-instance (.get conf config-key)) nil))
+
+(defmacro -<>
+  ([x] x)
+  ([x form] (if (seq? form)
+              (with-meta
+                (let [[begin [_ & end]] (split-with #(not= % '<>) form)]
+                  (concat begin [x] end))
+                (meta form))
+              (list form x)))
+  ([x form & more] `(-<> (-<> ~x ~form) ~@more)))
+
+(defn logs-filename
+  [storm-id port]
+  (str storm-id file-path-separator port file-path-separator "worker.log"))
+
+(def worker-log-filename-pattern #"^worker.log(.*)")
+
+(defn event-logs-filename
+  [storm-id port]
+  (str storm-id file-path-separator port file-path-separator "events.log"))
+
+(defn clojure-from-yaml-file [yamlFile]
+  (try
+    (with-open [reader (java.io.FileReader. yamlFile)]
+      (clojurify-structure (.load (Yaml. (SafeConstructor.)) reader)))
+    (catch Exception ex
+      (log-error ex))))
+
+(defn hashmap-to-persistent [^HashMap m]
+  (zipmap (.keySet m) (.values m)))
+
+(defn retry-on-exception
+  "Retries specific function on exception based on retries count"
+  [retries task-description f & args]
+  (let [res (try {:value (apply f args)}
+              (catch Exception e
+                (if (<= 0 retries)
+                  (throw e)
+                  {:exception e})))]
+    (if (:exception res)
+      (do 
+        (log-error (:exception res) (str "Failed to " task-description ". Will make [" retries "] more attempts."))
+        (recur (dec retries) task-description f args))
+      (do 
+        (log-debug (str "Successful " task-description "."))
+        (:value res)))))
+
+(defn setup-default-uncaught-exception-handler
+  "Set a default uncaught exception handler to handle exceptions not caught in other threads."
+  []
+  (Thread/setDefaultUncaughtExceptionHandler
+    (proxy [Thread$UncaughtExceptionHandler] []
+      (uncaughtException [thread thrown]
+        (try
+          (Utils/handleUncaughtException thrown)
+          (catch Error err
+            (do
+              (log-error err "Received error in main thread.. terminating server...")
+              (.exit (Runtime/getRuntime) -2))))))))
+
+(defn redact-value
+  "Hides value for k in coll for printing coll safely"
+  [coll k]
+  (if (contains? coll k)
+    (assoc coll k (apply str (repeat (count (coll k)) "#")))
+    coll))
+
+(defn log-thrift-access
+  [request-id remoteAddress principal operation]
+  (doto
+    (ThriftAccessLogger.)
+    (.log (str "Request ID: " request-id " access from: " remoteAddress " principal: " principal " operation: " operation))))
+
+(def DISALLOWED-KEY-NAME-STRS #{"/" "." ":" "\\"})
+
+(defn validate-key-name!
+  [name]
+  (if (some #(.contains name %) DISALLOWED-KEY-NAME-STRS)
+    (throw (RuntimeException.
+             (str "Key name cannot contain any of the following: " (pr-str DISALLOWED-KEY-NAME-STRS))))
+    (if (clojure.string/blank? name)
+      (throw (RuntimeException.
+               ("Key name cannot be blank"))))))

http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/storm-core/src/clj/org/apache/storm/zookeeper.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/zookeeper.clj b/storm-core/src/clj/org/apache/storm/zookeeper.clj
new file mode 100644
index 0000000..8a223cd
--- /dev/null
+++ b/storm-core/src/clj/org/apache/storm/zookeeper.clj
@@ -0,0 +1,308 @@
+;; Licensed to the Apache Software Foundation (ASF) under one
+;; or more contributor license agreements.  See the NOTICE file
+;; distributed with this work for additional information
+;; regarding copyright ownership.  The ASF licenses this file
+;; to you under the Apache License, Version 2.0 (the
+;; "License"); you may not use this file except in compliance
+;; with the License.  You may obtain a copy of the License at
+;;
+;; http://www.apache.org/licenses/LICENSE-2.0
+;;
+;; Unless required by applicable law or agreed to in writing, software
+;; distributed under the License is distributed on an "AS IS" BASIS,
+;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+;; See the License for the specific language governing permissions and
+;; limitations under the License.
+
+(ns org.apache.storm.zookeeper
+  (:import [org.apache.curator.retry RetryNTimes]
+           [org.apache.storm Config])
+  (:import [org.apache.curator.framework.api CuratorEvent CuratorEventType CuratorListener UnhandledErrorListener])
+  (:import [org.apache.curator.framework.state ConnectionStateListener])
+  (:import [org.apache.curator.framework CuratorFramework CuratorFrameworkFactory])
+  (:import [org.apache.curator.framework.recipes.leader LeaderLatch LeaderLatch$State Participant LeaderLatchListener])
+  (:import [org.apache.zookeeper ZooKeeper Watcher KeeperException$NoNodeException
+            ZooDefs ZooDefs$Ids CreateMode WatchedEvent Watcher$Event Watcher$Event$KeeperState
+            Watcher$Event$EventType KeeperException$NodeExistsException])
+  (:import [org.apache.zookeeper.data Stat])
+  (:import [org.apache.zookeeper.server ZooKeeperServer NIOServerCnxnFactory])
+  (:import [java.net InetSocketAddress BindException InetAddress])
+  (:import [org.apache.storm.nimbus ILeaderElector NimbusInfo])
+  (:import [java.io File])
+  (:import [java.util List Map])
+  (:import [org.apache.storm.utils Utils ZookeeperAuthInfo])
+  (:use [org.apache.storm util log config]))
+
+(def zk-keeper-states
+  {Watcher$Event$KeeperState/Disconnected :disconnected
+   Watcher$Event$KeeperState/SyncConnected :connected
+   Watcher$Event$KeeperState/AuthFailed :auth-failed
+   Watcher$Event$KeeperState/Expired :expired})
+
+(def zk-event-types
+  {Watcher$Event$EventType/None :none
+   Watcher$Event$EventType/NodeCreated :node-created
+   Watcher$Event$EventType/NodeDeleted :node-deleted
+   Watcher$Event$EventType/NodeDataChanged :node-data-changed
+   Watcher$Event$EventType/NodeChildrenChanged :node-children-changed})
+
+(defn- default-watcher
+  [state type path]
+  (log-message "Zookeeper state update: " state type path))
+
+(defnk mk-client
+  [conf servers port
+   :root ""
+   :watcher default-watcher
+   :auth-conf nil]
+  (let [fk (Utils/newCurator conf servers port root (when auth-conf (ZookeeperAuthInfo. auth-conf)))]
+    (.. fk
+        (getCuratorListenable)
+        (addListener
+          (reify CuratorListener
+            (^void eventReceived [this ^CuratorFramework _fk ^CuratorEvent e]
+                   (when (= (.getType e) CuratorEventType/WATCHED)
+                     (let [^WatchedEvent event (.getWatchedEvent e)]
+                       (watcher (zk-keeper-states (.getState event))
+                                (zk-event-types (.getType event))
+                                (.getPath event))))))))
+    ;;    (.. fk
+    ;;        (getUnhandledErrorListenable)
+    ;;        (addListener
+    ;;         (reify UnhandledErrorListener
+    ;;           (unhandledError [this msg error]
+    ;;             (if (or (exception-cause? InterruptedException error)
+    ;;                     (exception-cause? java.nio.channels.ClosedByInterruptException error))
+    ;;               (do (log-warn-error error "Zookeeper exception " msg)
+    ;;                   (let [to-throw (InterruptedException.)]
+    ;;                     (.initCause to-throw error)
+    ;;                     (throw to-throw)
+    ;;                     ))
+    ;;               (do (log-error error "Unrecoverable Zookeeper error " msg)
+    ;;                   (halt-process! 1 "Unrecoverable Zookeeper error")))
+    ;;             ))))
+    (.start fk)
+    fk))
+
+(def zk-create-modes
+  {:ephemeral CreateMode/EPHEMERAL
+   :persistent CreateMode/PERSISTENT
+   :sequential CreateMode/PERSISTENT_SEQUENTIAL})
+
+(defn create-node
+  ([^CuratorFramework zk ^String path ^bytes data mode acls]
+    (let [mode  (zk-create-modes mode)]
+      (try
+        (.. zk (create) (creatingParentsIfNeeded) (withMode mode) (withACL acls) (forPath (normalize-path path) data))
+        (catch Exception e (throw (wrap-in-runtime e))))))
+  ([^CuratorFramework zk ^String path ^bytes data acls]
+    (create-node zk path data :persistent acls)))
+
+(defn exists-node?
+  [^CuratorFramework zk ^String path watch?]
+  ((complement nil?)
+   (try
+     (if watch?
+       (.. zk (checkExists) (watched) (forPath (normalize-path path)))
+       (.. zk (checkExists) (forPath (normalize-path path))))
+     (catch Exception e (throw (wrap-in-runtime e))))))
+
+(defnk delete-node
+  [^CuratorFramework zk ^String path]
+  (let [path (normalize-path path)]
+    (when (exists-node? zk path false)
+      (try-cause  (.. zk (delete) (deletingChildrenIfNeeded) (forPath (normalize-path path)))
+                  (catch KeeperException$NoNodeException e
+                    ;; do nothing
+                    (log-message "exception" e)
+                  )
+                  (catch Exception e (throw (wrap-in-runtime e)))))))
+
+(defn mkdirs
+  [^CuratorFramework zk ^String path acls]
+  (let [path (normalize-path path)]
+    (when-not (or (= path "/") (exists-node? zk path false))
+      (mkdirs zk (parent-path path) acls)
+      (try-cause
+        (create-node zk path (barr 7) :persistent acls)
+        (catch KeeperException$NodeExistsException e
+          ;; this can happen when multiple clients doing mkdir at same time
+          ))
+      )))
+
+(defn sync-path
+  [^CuratorFramework zk ^String path]
+  (try
+    (.. zk (sync) (forPath (normalize-path path)))
+    (catch Exception e (throw (wrap-in-runtime e)))))
+
+
+(defn add-listener [^CuratorFramework zk ^ConnectionStateListener listener]
+  (.. zk (getConnectionStateListenable) (addListener listener)))
+
+(defn get-data
+  [^CuratorFramework zk ^String path watch?]
+  (let [path (normalize-path path)]
+    (try-cause
+      (if (exists-node? zk path watch?)
+        (if watch?
+          (.. zk (getData) (watched) (forPath path))
+          (.. zk (getData) (forPath path))))
+      (catch KeeperException$NoNodeException e
+        ;; this is fine b/c we still have a watch from the successful exists call
+        nil )
+      (catch Exception e (throw (wrap-in-runtime e))))))
+
+(defn get-data-with-version 
+  [^CuratorFramework zk ^String path watch?]
+  (let [stats (org.apache.zookeeper.data.Stat. )
+        path (normalize-path path)]
+    (try-cause
+     (if-let [data
+              (if (exists-node? zk path watch?)
+                (if watch?
+                  (.. zk (getData) (watched) (storingStatIn stats) (forPath path))
+                  (.. zk (getData) (storingStatIn stats) (forPath path))))]
+       {:data data
+        :version (.getVersion stats)})
+     (catch KeeperException$NoNodeException e
+       ;; this is fine b/c we still have a watch from the successful exists call
+       nil ))))
+
+(defn get-version 
+[^CuratorFramework zk ^String path watch?]
+  (if-let [stats
+           (if watch?
+             (.. zk (checkExists) (watched) (forPath (normalize-path path)))
+             (.. zk (checkExists) (forPath (normalize-path path))))]
+    (.getVersion stats)
+    nil))
+
+(defn get-children
+  [^CuratorFramework zk ^String path watch?]
+  (try
+    (if watch?
+      (.. zk (getChildren) (watched) (forPath (normalize-path path)))
+      (.. zk (getChildren) (forPath (normalize-path path))))
+    (catch Exception e (throw (wrap-in-runtime e)))))
+
+(defn delete-node-blobstore
+  "Deletes the state inside the zookeeper for a key, for which the
+   contents of the key starts with nimbus host port information"
+  [^CuratorFramework zk ^String parent-path ^String host-port-info]
+  (let [parent-path (normalize-path parent-path)
+        child-path-list (if (exists-node? zk parent-path false)
+                          (into [] (get-children zk parent-path false))
+                          [])]
+    (doseq [child child-path-list]
+      (when (.startsWith child host-port-info)
+        (log-debug "delete-node " "child" child)
+        (delete-node zk (str parent-path "/" child))))))
+
+(defn set-data
+  [^CuratorFramework zk ^String path ^bytes data]
+  (try
+    (.. zk (setData) (forPath (normalize-path path) data))
+    (catch Exception e (throw (wrap-in-runtime e)))))
+
+(defn exists
+  [^CuratorFramework zk ^String path watch?]
+  (exists-node? zk path watch?))
+
+(defnk mk-inprocess-zookeeper
+  [localdir :port nil]
+  (let [localfile (File. localdir)
+        zk (ZooKeeperServer. localfile localfile 2000)
+        [retport factory]
+        (loop [retport (if port port 2000)]
+          (if-let [factory-tmp
+                   (try-cause
+                     (doto (NIOServerCnxnFactory.)
+                       (.configure (InetSocketAddress. retport) 0))
+                     (catch BindException e
+                       (when (> (inc retport) (if port port 65535))
+                         (throw (RuntimeException.
+                                  "No port is available to launch an inprocess zookeeper.")))))]
+            [retport factory-tmp]
+            (recur (inc retport))))]
+    (log-message "Starting inprocess zookeeper at port " retport " and dir " localdir)
+    (.startup factory zk)
+    [retport factory]))
+
+(defn shutdown-inprocess-zookeeper
+  [handle]
+  (.shutdown handle))
+
+(defn- to-NimbusInfo [^Participant participant]
+  (let
+    [id (if (clojure.string/blank? (.getId participant))
+          (throw (RuntimeException. "No nimbus leader participant host found, have you started your nimbus hosts?"))
+          (.getId participant))
+     nimbus-info (NimbusInfo/parse id)]
+    (.setLeader nimbus-info (.isLeader participant))
+    nimbus-info))
+
+(defn leader-latch-listener-impl
+  "Leader latch listener that will be invoked when we either gain or lose leadership"
+  [conf zk leader-latch]
+  (let [hostname (.getCanonicalHostName (InetAddress/getLocalHost))]
+    (reify LeaderLatchListener
+      (^void isLeader[this]
+        (log-message (str hostname " gained leadership")))
+      (^void notLeader[this]
+        (log-message (str hostname " lost leadership."))))))
+
+(defn zk-leader-elector
+  "Zookeeper Implementation of ILeaderElector."
+  [conf]
+  (let [servers (conf STORM-ZOOKEEPER-SERVERS)
+        zk (mk-client conf (conf STORM-ZOOKEEPER-SERVERS) (conf STORM-ZOOKEEPER-PORT) :auth-conf conf)
+        leader-lock-path (str (conf STORM-ZOOKEEPER-ROOT) "/leader-lock")
+        id (.toHostPortString (NimbusInfo/fromConf conf))
+        leader-latch (atom (LeaderLatch. zk leader-lock-path id))
+        leader-latch-listener (atom (leader-latch-listener-impl conf zk @leader-latch))
+        ]
+    (reify ILeaderElector
+      (prepare [this conf]
+        (log-message "no-op for zookeeper implementation"))
+
+      (^void addToLeaderLockQueue [this]
+        ;if this latch is already closed, we need to create new instance.
+        (if (.equals LeaderLatch$State/CLOSED (.getState @leader-latch))
+          (do
+            (reset! leader-latch (LeaderLatch. zk leader-lock-path id))
+            (reset! leader-latch-listener (leader-latch-listener-impl conf zk @leader-latch))
+            (log-message "LeaderLatch was in closed state. Resetted the leaderLatch and listeners.")
+            ))
+
+        ;Only if the latch is not already started we invoke start.
+        (if (.equals LeaderLatch$State/LATENT (.getState @leader-latch))
+          (do
+            (.addListener @leader-latch @leader-latch-listener)
+            (.start @leader-latch)
+            (log-message "Queued up for leader lock."))
+          (log-message "Node already in queue for leader lock.")))
+
+      (^void removeFromLeaderLockQueue [this]
+        ;Only started latches can be closed.
+        (if (.equals LeaderLatch$State/STARTED (.getState @leader-latch))
+          (do
+            (.close @leader-latch)
+            (log-message "Removed from leader lock queue."))
+          (log-message "leader latch is not started so no removeFromLeaderLockQueue needed.")))
+
+      (^boolean isLeader [this]
+        (.hasLeadership @leader-latch))
+
+      (^NimbusInfo getLeader [this]
+        (to-NimbusInfo (.getLeader @leader-latch)))
+
+      (^List getAllNimbuses [this]
+        (let [participants (.getParticipants @leader-latch)]
+          (map (fn [^Participant participant]
+                 (to-NimbusInfo participant))
+            participants)))
+
+      (^void close[this]
+        (log-message "closing zookeeper connection of leader elector.")
+        (.close zk)))))

http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/storm-core/src/clj/storm/trident/testing.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/storm/trident/testing.clj b/storm-core/src/clj/storm/trident/testing.clj
deleted file mode 100644
index ac5fcab..0000000
--- a/storm-core/src/clj/storm/trident/testing.clj
+++ /dev/null
@@ -1,79 +0,0 @@
-;; Licensed to the Apache Software Foundation (ASF) under one
-;; or more contributor license agreements.  See the NOTICE file
-;; distributed with this work for additional information
-;; regarding copyright ownership.  The ASF licenses this file
-;; to you under the Apache License, Version 2.0 (the
-;; "License"); you may not use this file except in compliance
-;; with the License.  You may obtain a copy of the License at
-;;
-;; http://www.apache.org/licenses/LICENSE-2.0
-;;
-;; Unless required by applicable law or agreed to in writing, software
-;; distributed under the License is distributed on an "AS IS" BASIS,
-;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-;; See the License for the specific language governing permissions and
-;; limitations under the License.
-(ns storm.trident.testing
-  (:require [backtype.storm.LocalDRPC :as LocalDRPC])
-  (:import [storm.trident.testing FeederBatchSpout FeederCommitterBatchSpout MemoryMapState MemoryMapState$Factory TuplifyArgs])
-  (:require [backtype.storm [LocalDRPC]])
-  (:import [backtype.storm LocalDRPC])
-  (:import [backtype.storm.tuple Fields])
-  (:import [backtype.storm.generated KillOptions])
-  (:require [backtype.storm [testing :as t]])
-  (:use [backtype.storm util])
-  )
-
-(defn local-drpc []
-  (LocalDRPC.))
-
-(defn exec-drpc [^LocalDRPC drpc function-name args]
-  (let [res (.execute drpc function-name args)]
-    (from-json res)))
-
-(defn exec-drpc-tuples [^LocalDRPC drpc function-name tuples]
-  (exec-drpc drpc function-name (to-json tuples)))
-
-(defn feeder-spout [fields]
-  (FeederBatchSpout. fields))
-
-(defn feeder-committer-spout [fields]
-  (FeederCommitterBatchSpout. fields))
-
-(defn feed [feeder tuples]
-  (.feed feeder tuples))
-
-(defn fields [& fields]
-  (Fields. fields))
-
-(defn memory-map-state []
-  (MemoryMapState$Factory.))
-
-(defmacro with-drpc [[drpc] & body]
-  `(let [~drpc (backtype.storm.LocalDRPC.)]
-     ~@body
-     (.shutdown ~drpc)
-     ))
-
-(defn with-topology* [cluster topo body-fn]
-  (t/submit-local-topology (:nimbus cluster) "tester" {} (.build topo))
-  (body-fn)
-  (.killTopologyWithOpts (:nimbus cluster) "tester" (doto (KillOptions.) (.set_wait_secs 0)))
-  )
-
-(defmacro with-topology [[cluster topo] & body]
-  `(with-topology* ~cluster ~topo (fn [] ~@body)))
-
-(defn bootstrap-imports []
-  (import 'backtype.storm.LocalDRPC)
-  (import 'storm.trident.TridentTopology)
-  (import '[storm.trident.operation.builtin Count Sum Equals MapGet Debug FilterNull FirstN TupleCollectionGet])
-  )
-
-(defn drpc-tuples-input [topology function-name drpc outfields]
-  (-> topology
-      (.newDRPCStream function-name drpc)
-      (.each (fields "args") (TuplifyArgs.) outfields)
-      ))
-
-

http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/storm-core/src/genthrift.sh
----------------------------------------------------------------------
diff --git a/storm-core/src/genthrift.sh b/storm-core/src/genthrift.sh
index 54cd10a..eeec78a 100644
--- a/storm-core/src/genthrift.sh
+++ b/storm-core/src/genthrift.sh
@@ -16,9 +16,9 @@
 # limitations under the License.
 
 rm -rf gen-javabean gen-py py
-rm -rf jvm/backtype/storm/generated
+rm -rf jvm/org/apache/storm/generated
 thrift --gen java:beans,hashcode,nocamel,generated_annotations=undated --gen py:utf8strings storm.thrift
-for file in gen-javabean/backtype/storm/generated/* ; do
+for file in gen-javabean/org/apache/storm/generated/* ; do
   cat java_license_header.txt ${file} > ${file}.tmp
   mv -f ${file}.tmp ${file}
 done
@@ -28,6 +28,6 @@ for file in gen-py/storm/* ; do
   cat py_license_header.txt ${file} > ${file}.tmp
   mv -f ${file}.tmp ${file}
 done
-mv gen-javabean/backtype/storm/generated jvm/backtype/storm/generated
+mv gen-javabean/org/apache/storm/generated jvm/org/apache/storm/generated
 mv gen-py py
 rm -rf gen-javabean