You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by sr...@apache.org on 2016/02/18 23:03:07 UTC

[1/3] storm git commit: STORM-1246: port backtype.storm.local-state to java

Repository: storm
Updated Branches:
  refs/heads/master abb1b85f2 -> 8052a8c78


STORM-1246: port backtype.storm.local-state to java


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/4bf331d6
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/4bf331d6
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/4bf331d6

Branch: refs/heads/master
Commit: 4bf331d668c279f2f6e462c1bfcaebffa06082f1
Parents: 561ceca
Author: Abhishek Agarwal <ab...@inmobi.com>
Authored: Fri Feb 19 00:57:40 2016 +0530
Committer: Abhishek Agarwal <ab...@inmobi.com>
Committed: Fri Feb 19 00:57:40 2016 +0530

----------------------------------------------------------------------
 .../src/clj/org/apache/storm/daemon/nimbus.clj  |  31 +++--
 .../clj/org/apache/storm/daemon/supervisor.clj  |  65 +++++++--
 .../src/clj/org/apache/storm/daemon/worker.clj  |  10 +-
 .../src/clj/org/apache/storm/local_state.clj    | 134 -------------------
 .../org/apache/storm/local_state_converter.clj  |  24 ++++
 storm-core/src/clj/org/apache/storm/testing.clj |  10 +-
 .../jvm/org/apache/storm/utils/LocalState.java  | 112 ++++++++++++++--
 7 files changed, 209 insertions(+), 177 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/4bf331d6/storm-core/src/clj/org/apache/storm/daemon/nimbus.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/daemon/nimbus.clj b/storm-core/src/clj/org/apache/storm/daemon/nimbus.clj
index a3497d6..28a6fb8 100644
--- a/storm-core/src/clj/org/apache/storm/daemon/nimbus.clj
+++ b/storm-core/src/clj/org/apache/storm/daemon/nimbus.clj
@@ -46,11 +46,11 @@
             KillOptions RebalanceOptions ClusterSummary SupervisorSummary TopologySummary TopologyInfo TopologyHistoryInfo
             ExecutorSummary AuthorizationException GetInfoOptions NumErrorsChoice SettableBlobMeta ReadableBlobMeta
             BeginDownloadResult ListBlobsResult ComponentPageInfo TopologyPageInfo LogConfig LogLevel LogLevelAction
-            ProfileRequest ProfileAction NodeInfo])
+            ProfileRequest ProfileAction NodeInfo LSTopoHistory])
   (:import [org.apache.storm.daemon Shutdownable])
   (:import [org.apache.storm.validation ConfigValidation])
   (:import [org.apache.storm.cluster ClusterStateContext DaemonType])
-  (:use [org.apache.storm util config log zookeeper local-state])
+  (:use [org.apache.storm util config log zookeeper])
   (:require [org.apache.storm [cluster :as cluster]
                             [converter :as converter]
                             [stats :as stats]])
@@ -60,7 +60,7 @@
   (:use [org.apache.storm.daemon common])
   (:use [org.apache.storm config])
   (:import [org.apache.zookeeper data.ACL ZooDefs$Ids ZooDefs$Perms])
-  (:import [org.apache.storm.utils VersionInfo]
+  (:import [org.apache.storm.utils VersionInfo LocalState]
            [org.json.simple JSONValue])
   (:require [clj-time.core :as time])
   (:require [clj-time.coerce :as coerce])
@@ -1181,11 +1181,8 @@
   [mins nimbus]
   (locking (:topology-history-lock nimbus)
     (let [cutoff-age (- (Time/currentTimeSecs) (* mins 60))
-          topo-history-state (:topo-history-state nimbus)
-          curr-history (vec (ls-topo-hist topo-history-state))
-          new-history (vec (filter (fn [line]
-                                     (> (line :timestamp) cutoff-age)) curr-history))]
-      (ls-topo-hist! topo-history-state new-history))))
+          topo-history-state (:topo-history-state nimbus)]
+          (.filterOldTopologies ^LocalState topo-history-state cutoff-age))))
 
 (defn cleanup-corrupt-topologies! [nimbus]
   (let [storm-cluster-state (:storm-cluster-state nimbus)
@@ -1275,11 +1272,9 @@
   (locking (:topology-history-lock nimbus)
     (let [topo-history-state (:topo-history-state nimbus)
           users (ConfigUtils/getTopoLogsUsers topology-conf)
-          groups (ConfigUtils/getTopoLogsGroups topology-conf)
-          curr-history (vec (ls-topo-hist topo-history-state))
-          new-history (conj curr-history {:topoid storm-id :timestamp (Time/currentTimeSecs)
-                                          :users users :groups groups})]
-      (ls-topo-hist! topo-history-state new-history))))
+          groups (ConfigUtils/getTopoLogsGroups topology-conf)]
+      (.addTopologyHistory ^LocalState topo-history-state
+                           (LSTopoHistory. storm-id (Time/currentTimeSecs) users groups)))))
 
 (defn igroup-mapper
   [storm-conf]
@@ -1295,10 +1290,18 @@
   (let [groups (user-groups user storm-conf)]
     (> (.size (set/intersection (set groups) (set groups-to-check))) 0)))
 
+(defn ->topo-history
+  [thrift-topo-hist]
+  {
+   :topoid (.get_topology_id thrift-topo-hist)
+   :timestamp (.get_time_stamp thrift-topo-hist)
+   :users (.get_users thrift-topo-hist)
+   :groups (.get_groups thrift-topo-hist)})
+
 (defn read-topology-history
   [nimbus user admin-users]
   (let [topo-history-state (:topo-history-state nimbus)
-        curr-history (vec (ls-topo-hist topo-history-state))
+        curr-history (vec (map ->topo-history (.getTopoHistoryList ^LocalState topo-history-state)))
         topo-user-can-access (fn [line user storm-conf]
                                (if (nil? user)
                                  (line :topoid)

http://git-wip-us.apache.org/repos/asf/storm/blob/4bf331d6/storm-core/src/clj/org/apache/storm/daemon/supervisor.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/daemon/supervisor.clj b/storm-core/src/clj/org/apache/storm/daemon/supervisor.clj
index ad9db76..5685a09 100644
--- a/storm-core/src/clj/org/apache/storm/daemon/supervisor.clj
+++ b/storm-core/src/clj/org/apache/storm/daemon/supervisor.clj
@@ -24,12 +24,12 @@
            [java.net JarURLConnection]
            [java.net URI URLDecoder]
            [org.apache.commons.io FileUtils])
-  (:use [org.apache.storm config util log local-state])
+  (:use [org.apache.storm config util log local-state-converter])
   (:import [org.apache.storm.generated AuthorizationException KeyNotFoundException WorkerResources])
   (:import [org.apache.storm.utils NimbusLeaderNotFoundException VersionInfo])
   (:import [java.nio.file Files StandardCopyOption])
   (:import [org.apache.storm Config])
-  (:import [org.apache.storm.generated WorkerResources ProfileAction])
+  (:import [org.apache.storm.generated WorkerResources ProfileAction LocalAssignment])
   (:import [org.apache.storm.localizer LocalResource])
   (:use [org.apache.storm.daemon common])
   (:require [org.apache.storm.command [healthcheck :as healthcheck]])
@@ -85,6 +85,10 @@
        :profiler-actions new-profiler-actions
        :versions new-assignments})))
 
+(defn mk-local-assignment
+  [storm-id executors resources]
+  {:storm-id storm-id :executors executors :resources resources})
+
 (defn- read-my-executors [assignments-snapshot storm-id assignment-id]
   (let [assignment (get assignments-snapshot storm-id)
         my-slots-resources (into {}
@@ -125,6 +129,20 @@
 (defn- read-downloaded-storm-ids [conf]
   (map #(URLDecoder/decode %) (Utils/readDirContents (ConfigUtils/supervisorStormDistRoot conf))))
 
+(defn ->executor-list
+  [executors]
+  (into []
+        (for [exec-info executors]
+          [(.get_task_start exec-info) (.get_task_end exec-info)])))
+
+(defn ls-worker-heartbeat
+  [^LocalState local-state]
+  (if-let [worker-hb (.getWorkerHeartBeat ^LocalState local-state)]
+    {:time-secs (.get_time_secs worker-hb)
+     :storm-id (.get_topology_id worker-hb)
+     :executors (->executor-list (.get_executors worker-hb))
+     :port (.get_port worker-hb)}))
+
 (defn read-worker-heartbeat [conf id]
   (let [local-state (ConfigUtils/workerState conf id)]
     (try
@@ -172,7 +190,7 @@
   (let [conf (:conf supervisor)
         ^LocalState local-state (:local-state supervisor)
         id->heartbeat (read-worker-heartbeats conf)
-        approved-ids (set (keys (ls-approved-workers local-state)))]
+        approved-ids (set (keys (clojurify-structure (.getApprovedWorkers ^LocalState local-state))))]
     (into
      {}
      (dofor [[id hb] id->heartbeat]
@@ -198,7 +216,7 @@
 (defn- wait-for-worker-launch [conf id start-time]
   (let [state (ConfigUtils/workerState conf id)]
     (loop []
-      (let [hb (ls-worker-heartbeat state)]
+      (let [hb (.getWorkerHeartBeat state)]
         (when (and
                (not hb)
                (<
@@ -209,7 +227,7 @@
           (Time/sleep 500)
           (recur)
           )))
-    (when-not (ls-worker-heartbeat state)
+    (when-not (.getWorkerHeartBeat state)
       (log-message "Worker " id " failed to start")
       )))
 
@@ -414,6 +432,19 @@
   [pred amap]
   (into {} (filter (fn [[k v]] (pred k)) amap)))
 
+(defn ->local-assignment
+  [^LocalAssignment thrift-local-assignment]
+  (mk-local-assignment
+    (.get_topology_id thrift-local-assignment)
+    (->executor-list (.get_executors thrift-local-assignment))
+    (.get_resources thrift-local-assignment)))
+
+;TODO: when translating this function, you should replace the map-val with a proper for loop HERE
+(defn ls-local-assignments
+  [^LocalState local-state]
+  (if-let [thrift-local-assignments (.getLocalAssignmentsMap local-state)]
+    (map-val ->local-assignment thrift-local-assignments)))
+
 ;TODO: when translating this function, you should replace the filter-val with a proper for loop + if condition HERE
 (defn sync-processes [supervisor]
   (let [conf (:conf supervisor)
@@ -453,9 +484,9 @@
          ", Heartbeat: " (pr-str heartbeat))
         (shutdown-worker supervisor id)))
     (let [valid-new-worker-ids (get-valid-new-worker-ids conf supervisor reassign-executors new-worker-ids)]
-      (ls-approved-workers! local-state
+      (.setApprovedWorkers ^LocalState local-state
                         (merge
-                          (select-keys (ls-approved-workers local-state)
+                          (select-keys (clojurify-structure (.getApprovedWorkers ^LocalState local-state))
                             (keys keepers))
                           valid-new-worker-ids))
       (wait-for-workers-launch conf (keys valid-new-worker-ids)))))
@@ -553,6 +584,22 @@
           (rm-topo-files conf storm-id localizer false)
           storm-id)))))
 
+(defn ->LocalAssignment
+  [{storm-id :storm-id executors :executors resources :resources}]
+  (let [assignment (LocalAssignment. storm-id (->ExecutorInfo-list executors))]
+    (if resources (.set_resources assignment
+                                  (doto (WorkerResources. )
+                                    (.set_mem_on_heap (first resources))
+                                    (.set_mem_off_heap (second resources))
+                                    (.set_cpu (last resources)))))
+    assignment))
+
+;TODO: when translating this function, you should replace the map-val with a proper for loop HERE
+(defn ls-local-assignments!
+  [^LocalState local-state assignments]
+  (let [local-assignment-map (map-val ->LocalAssignment assignments)]
+    (.setLocalAssignmentsMap local-state local-assignment-map)))
+
 (defn mk-synchronize-supervisor [supervisor sync-processes event-manager processes-event-manager]
   (fn this []
     (let [conf (:conf supervisor)
@@ -1265,10 +1312,10 @@
       (prepare [this conf local-dir]
         (reset! conf-atom conf)
         (let [state (LocalState. local-dir)
-              curr-id (if-let [id (ls-supervisor-id state)]
+              curr-id (if-let [id (.getSupervisorId state)]
                         id
                         (generate-supervisor-id))]
-          (ls-supervisor-id! state curr-id)
+          (.setSupervisorId state curr-id)
           (reset! id-atom curr-id))
         )
       (confirmAssigned [this port]

http://git-wip-us.apache.org/repos/asf/storm/blob/4bf331d6/storm-core/src/clj/org/apache/storm/daemon/worker.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/daemon/worker.clj b/storm-core/src/clj/org/apache/storm/daemon/worker.clj
index c2a767a..60bc070 100644
--- a/storm-core/src/clj/org/apache/storm/daemon/worker.clj
+++ b/storm-core/src/clj/org/apache/storm/daemon/worker.clj
@@ -15,7 +15,7 @@
 ;; limitations under the License.
 (ns org.apache.storm.daemon.worker
   (:use [org.apache.storm.daemon common])
-  (:use [org.apache.storm config log util local-state])
+  (:use [org.apache.storm config log util local-state-converter])
   (:require [clj-time.core :as time])
   (:require [clj-time.coerce :as coerce])
   (:require [org.apache.storm.daemon [executor :as executor]])
@@ -33,7 +33,7 @@
   (:import [org.apache.storm.messaging TaskMessage IContext IConnection ConnectionWithStatus ConnectionWithStatus$Status DeserializingConnectionCallback])
   (:import [org.apache.storm.daemon Shutdownable])
   (:import [org.apache.storm.serialization KryoTupleSerializer])
-  (:import [org.apache.storm.generated StormTopology])
+  (:import [org.apache.storm.generated StormTopology LSWorkerHeartbeat])
   (:import [org.apache.storm.tuple AddressedTuple Fields])
   (:import [org.apache.storm.task WorkerTopologyContext])
   (:import [org.apache.storm Constants])
@@ -84,7 +84,11 @@
   (let [conf (:conf worker)
         state (ConfigUtils/workerState conf (:worker-id worker))]
     ;; do the local-file-system heartbeat.
-    (ls-worker-heartbeat! state (Time/currentTimeSecs) (:storm-id worker) (:executors worker) (:port worker))
+    (.setWorkerHeartBeat state (LSWorkerHeartbeat.
+                                 (Time/currentTimeSecs)
+                                 (:storm-id worker)
+                                 (->ExecutorInfo-list (:executors worker))
+                                 (:port worker)))
     (.cleanup state 60) ; this is just in case supervisor is down so that disk doesn't fill up.
                          ; it shouldn't take supervisor 120 seconds between listing dir and reading it
 

http://git-wip-us.apache.org/repos/asf/storm/blob/4bf331d6/storm-core/src/clj/org/apache/storm/local_state.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/local_state.clj b/storm-core/src/clj/org/apache/storm/local_state.clj
deleted file mode 100644
index df67c5e..0000000
--- a/storm-core/src/clj/org/apache/storm/local_state.clj
+++ /dev/null
@@ -1,134 +0,0 @@
-;; Licensed to the Apache Software Foundation (ASF) under one
-;; or more contributor license agreements.  See the NOTICE file
-;; distributed with this work for additional information
-;; regarding copyright ownership.  The ASF licenses this file
-;; to you under the Apache License, Version 2.0 (the
-;; "License"); you may not use this file except in compliance
-;; with the License.  You may obtain a copy of the License at
-;;
-;; http://www.apache.org/licenses/LICENSE-2.0
-;;
-;; Unless required by applicable law or agreed to in writing, software
-;; distributed under the License is distributed on an "AS IS" BASIS,
-;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-;; See the License for the specific language governing permissions and
-;; limitations under the License.
-(ns org.apache.storm.local-state
-  (:use [org.apache.storm log util])
-  (:import [org.apache.storm.generated StormTopology
-            InvalidTopologyException GlobalStreamId
-            LSSupervisorId LSApprovedWorkers
-            LSSupervisorAssignments LocalAssignment
-            ExecutorInfo LSWorkerHeartbeat
-            LSTopoHistory LSTopoHistoryList
-            WorkerResources]
-           [org.apache.storm.utils Utils])
-  (:import [org.apache.storm.utils LocalState]))
-
-(def LS-WORKER-HEARTBEAT "worker-heartbeat")
-(def LS-ID "supervisor-id")
-(def LS-LOCAL-ASSIGNMENTS "local-assignments")
-(def LS-APPROVED-WORKERS "approved-workers")
-(def LS-TOPO-HISTORY "topo-hist")
-
-(defn ->LSTopoHistory
-  [{topoid :topoid timestamp :timestamp users :users groups :groups}]
-  (LSTopoHistory. topoid timestamp users groups))
-
-(defn ->topo-history
-  [thrift-topo-hist]
-  {
-    :topoid (.get_topology_id thrift-topo-hist)
-    :timestamp (.get_time_stamp thrift-topo-hist)
-    :users (.get_users thrift-topo-hist)
-    :groups (.get_groups thrift-topo-hist)})
-
-(defn ls-topo-hist!
-  [^LocalState local-state hist-list]
-  (.put local-state LS-TOPO-HISTORY
-    (LSTopoHistoryList. (map ->LSTopoHistory hist-list))))
-
-(defn ls-topo-hist
-  [^LocalState local-state]
-  (if-let [thrift-hist-list (.get local-state LS-TOPO-HISTORY)]
-    (map ->topo-history (.get_topo_history thrift-hist-list))))
-
-(defn ls-supervisor-id!
-  [^LocalState local-state ^String id]
-    (.put local-state LS-ID (LSSupervisorId. id)))
-
-(defn ls-supervisor-id
-  [^LocalState local-state]
-  (if-let [super-id (.get local-state LS-ID)]
-    (.get_supervisor_id super-id)))
-
-(defn ls-approved-workers!
-  [^LocalState local-state workers]
-    (.put local-state LS-APPROVED-WORKERS (LSApprovedWorkers. workers)))
-
-(defn ls-approved-workers
-  [^LocalState local-state]
-  (if-let [tmp (.get local-state LS-APPROVED-WORKERS)]
-    (into {} (.get_approved_workers tmp))))
-
-(defn ->ExecutorInfo
-  [[low high]] (ExecutorInfo. low high))
-
-(defn ->ExecutorInfo-list
-  [executors]
-  (map ->ExecutorInfo executors))
-
-(defn ->executor-list
-  [executors]
-  (into [] 
-    (for [exec-info executors] 
-      [(.get_task_start exec-info) (.get_task_end exec-info)])))
-
-(defn ->LocalAssignment
-  [{storm-id :storm-id executors :executors resources :resources}]
-  (let [assignment (LocalAssignment. storm-id (->ExecutorInfo-list executors))]
-    (if resources (.set_resources assignment
-                                  (doto (WorkerResources. )
-                                    (.set_mem_on_heap (first resources))
-                                    (.set_mem_off_heap (second resources))
-                                    (.set_cpu (last resources)))))
-    assignment))
-
-(defn mk-local-assignment
-  [storm-id executors resources]
-  {:storm-id storm-id :executors executors :resources resources})
-
-(defn ->local-assignment
-  [^LocalAssignment thrift-local-assignment]
-    (mk-local-assignment
-      (.get_topology_id thrift-local-assignment)
-      (->executor-list (.get_executors thrift-local-assignment))
-      (.get_resources thrift-local-assignment)))
-
-;TODO: when translating this function, you should replace the map-val with a proper for loop HERE
-(defn ls-local-assignments!
-  [^LocalState local-state assignments]
-  (let [local-assignment-map (map-val ->LocalAssignment assignments)]
-    (.put local-state LS-LOCAL-ASSIGNMENTS
-          (LSSupervisorAssignments. local-assignment-map))))
-
-;TODO: when translating this function, you should replace the map-val with a proper for loop HERE
-(defn ls-local-assignments
-  [^LocalState local-state]
-    (if-let [thrift-local-assignments (.get local-state LS-LOCAL-ASSIGNMENTS)]
-      (map-val
-        ->local-assignment
-        (.get_assignments thrift-local-assignments))))
-
-(defn ls-worker-heartbeat!
-  [^LocalState local-state time-secs storm-id executors port]
-  (.put local-state LS-WORKER-HEARTBEAT (LSWorkerHeartbeat. time-secs storm-id (->ExecutorInfo-list executors) port) false))
-
-(defn ls-worker-heartbeat 
-  [^LocalState local-state]
-  (if-let [worker-hb (.get local-state LS-WORKER-HEARTBEAT)]
-    {:time-secs (.get_time_secs worker-hb)
-     :storm-id (.get_topology_id worker-hb)
-     :executors (->executor-list (.get_executors worker-hb))
-     :port (.get_port worker-hb)}))
-

http://git-wip-us.apache.org/repos/asf/storm/blob/4bf331d6/storm-core/src/clj/org/apache/storm/local_state_converter.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/local_state_converter.clj b/storm-core/src/clj/org/apache/storm/local_state_converter.clj
new file mode 100644
index 0000000..e8eeaca
--- /dev/null
+++ b/storm-core/src/clj/org/apache/storm/local_state_converter.clj
@@ -0,0 +1,24 @@
+;; Licensed to the Apache Software Foundation (ASF) under one
+;; or more contributor license agreements.  See the NOTICE file
+;; distributed with this work for additional information
+;; regarding copyright ownership.  The ASF licenses this file
+;; to you under the Apache License, Version 2.0 (the
+;; "License"); you may not use this file except in compliance
+;; with the License.  You may obtain a copy of the License at
+;;
+;; http://www.apache.org/licenses/LICENSE-2.0
+;;
+;; Unless required by applicable law or agreed to in writing, software
+;; distributed under the License is distributed on an "AS IS" BASIS,
+;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+;; See the License for the specific language governing permissions and
+;; limitations under the License.
+(ns org.apache.storm.local-state-converter
+  (:import [org.apache.storm.generated ExecutorInfo]))
+
+(defn ->ExecutorInfo
+  [[low high]] (ExecutorInfo. low high))
+
+(defn ->ExecutorInfo-list
+  [executors]
+  (map ->ExecutorInfo executors))

http://git-wip-us.apache.org/repos/asf/storm/blob/4bf331d6/storm-core/src/clj/org/apache/storm/testing.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/testing.clj b/storm-core/src/clj/org/apache/storm/testing.clj
index 4ad5ff8..7817929 100644
--- a/storm-core/src/clj/org/apache/storm/testing.clj
+++ b/storm-core/src/clj/org/apache/storm/testing.clj
@@ -29,7 +29,7 @@
   (:import [java.util HashMap ArrayList])
   (:import [java.util.concurrent.atomic AtomicInteger])
   (:import [java.util.concurrent ConcurrentHashMap])
-  (:import [org.apache.storm.utils Time Utils IPredicate RegisteredGlobalState ConfigUtils])
+  (:import [org.apache.storm.utils Time Utils IPredicate RegisteredGlobalState ConfigUtils LocalState])
   (:import [org.apache.storm.tuple Fields Tuple TupleImpl])
   (:import [org.apache.storm.task TopologyContext])
   (:import [org.apache.storm.generated GlobalStreamId Bolt KillOptions])
@@ -51,7 +51,7 @@
            [org.json.simple JSONValue])
   (:require [org.apache.storm [zookeeper :as zk]])
   (:require [org.apache.storm.daemon.acker :as acker])
-  (:use [org.apache.storm cluster util config log local-state])
+  (:use [org.apache.storm cluster util config log local-state-converter])
   (:use [org.apache.storm.internal thrift]))
 
 (defn feeder-spout
@@ -395,14 +395,14 @@
 (defn find-worker-id
   [supervisor-conf port]
   (let [supervisor-state (ConfigUtils/supervisorState supervisor-conf)
-        worker->port (ls-approved-workers supervisor-state)]
+        worker->port (.getApprovedWorkers ^LocalState supervisor-state)]
     (first ((clojurify-structure (Utils/reverseMap worker->port)) port))))
 
 (defn find-worker-port
   [supervisor-conf worker-id]
   (let [supervisor-state (ConfigUtils/supervisorState supervisor-conf)
-        worker->port (ls-approved-workers supervisor-state)]
-    (worker->port worker-id)))
+        worker->port (.getApprovedWorkers ^LocalState supervisor-state)]
+    (if worker->port (.get worker->port worker-id))))
 
 (defn mk-capture-shutdown-fn
   [capture-atom]

http://git-wip-us.apache.org/repos/asf/storm/blob/4bf331d6/storm-core/src/jvm/org/apache/storm/utils/LocalState.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/utils/LocalState.java b/storm-core/src/jvm/org/apache/storm/utils/LocalState.java
index aef1c1c..2f0bb60 100644
--- a/storm-core/src/jvm/org/apache/storm/utils/LocalState.java
+++ b/storm-core/src/jvm/org/apache/storm/utils/LocalState.java
@@ -18,24 +18,28 @@
 package org.apache.storm.utils;
 
 import org.apache.commons.io.FileUtils;
+import org.apache.storm.generated.LSApprovedWorkers;
+import org.apache.storm.generated.LSSupervisorAssignments;
+import org.apache.storm.generated.LSSupervisorId;
+import org.apache.storm.generated.LSTopoHistory;
+import org.apache.storm.generated.LSTopoHistoryList;
+import org.apache.storm.generated.LSWorkerHeartbeat;
+import org.apache.storm.generated.LocalAssignment;
+import org.apache.storm.generated.LocalStateData;
+import org.apache.storm.generated.ThriftSerializedObject;
+import org.apache.thrift.TBase;
+import org.apache.thrift.TDeserializer;
+import org.apache.thrift.TSerializer;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.File;
+import java.io.IOException;
 import java.nio.ByteBuffer;
-import java.util.Map;
+import java.util.ArrayList;
 import java.util.HashMap;
-import java.io.IOException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.thrift.TBase;
-import org.apache.thrift.TDeserializer;
-import org.apache.thrift.TException;
-import org.apache.thrift.TSerializer;
-
-import org.apache.storm.generated.LocalStateData;
-import org.apache.storm.generated.ThriftSerializedObject;
+import java.util.List;
+import java.util.Map;
 
 /**
  * A simple, durable, atomic K/V database. *Very inefficient*, should only be used for occasional reads/writes.
@@ -43,6 +47,11 @@ import org.apache.storm.generated.ThriftSerializedObject;
  */
 public class LocalState {
     public static final Logger LOG = LoggerFactory.getLogger(LocalState.class);
+    public static final String LS_WORKER_HEARTBEAT = "worker-heartbeat";
+    public static final String LS_ID = "supervisor-id";
+    public static final String LS_LOCAL_ASSIGNMENTS = "local-assignments";
+    public static final String LS_APPROVED_WORKERS = "approved-workers";
+    public static final String LS_TOPO_HISTORY = "topo-hist";
     private VersionedStore _vs;
     
     public LocalState(String backingDir) throws IOException {
@@ -157,6 +166,85 @@ public class LocalState {
         _vs.cleanup(keepVersions);
     }
 
+    public List<LSTopoHistory> getTopoHistoryList() {
+        LSTopoHistoryList lsTopoHistoryListWrapper = (LSTopoHistoryList) get(LS_TOPO_HISTORY);
+        if (null != lsTopoHistoryListWrapper) {
+            return lsTopoHistoryListWrapper.get_topo_history();
+        }
+        return null;
+    }
+
+    /**
+     * Remove topologies from local state which are older than cutOffAge.
+     * @param cutOffAge
+     */
+    public void filterOldTopologies(long cutOffAge) {
+        LSTopoHistoryList lsTopoHistoryListWrapper = (LSTopoHistoryList) get(LS_TOPO_HISTORY);
+        List<LSTopoHistory> filteredTopoHistoryList = new ArrayList<>();
+        if (null != lsTopoHistoryListWrapper) {
+            for (LSTopoHistory topoHistory : lsTopoHistoryListWrapper.get_topo_history()) {
+                if (topoHistory.get_time_stamp() > cutOffAge) {
+                    filteredTopoHistoryList.add(topoHistory);
+                }
+            }
+        }
+        put(LS_TOPO_HISTORY, new LSTopoHistoryList(filteredTopoHistoryList));
+    }
+
+    public void addTopologyHistory(LSTopoHistory lsTopoHistory) {
+        LSTopoHistoryList lsTopoHistoryListWrapper = (LSTopoHistoryList) get(LS_TOPO_HISTORY);
+        List<LSTopoHistory> currentTopoHistoryList = new ArrayList<>();
+        if (null != lsTopoHistoryListWrapper) {
+            currentTopoHistoryList.addAll(lsTopoHistoryListWrapper.get_topo_history());
+        }
+        currentTopoHistoryList.add(lsTopoHistory);
+        put(LS_TOPO_HISTORY, new LSTopoHistoryList(currentTopoHistoryList));
+    }
+
+    public String getSupervisorId() {
+        LSSupervisorId lsSupervisorId = (LSSupervisorId) get(LS_ID);
+        if (null != lsSupervisorId) {
+            return lsSupervisorId.get_supervisor_id();
+        }
+        return null;
+    }
+
+    public void setSupervisorId(String supervisorId) {
+        put(LS_ID, new LSSupervisorId(supervisorId));
+    }
+
+    public Map<String, Integer> getApprovedWorkers() {
+        LSApprovedWorkers lsApprovedWorkers = (LSApprovedWorkers) get(LS_APPROVED_WORKERS);
+        if (null != lsApprovedWorkers) {
+            return lsApprovedWorkers.get_approved_workers();
+        }
+        return null;
+    }
+
+    public void setApprovedWorkers(Map<String, Integer> approvedWorkers) {
+        put(LS_APPROVED_WORKERS, new LSApprovedWorkers(approvedWorkers));
+    }
+
+    public LSWorkerHeartbeat getWorkerHeartBeat() {
+        return (LSWorkerHeartbeat) get(LS_WORKER_HEARTBEAT);
+    }
+
+    public void setWorkerHeartBeat(LSWorkerHeartbeat workerHeartBeat) {
+        put(LS_WORKER_HEARTBEAT, workerHeartBeat, false);
+    }
+
+    public Map<Integer, LocalAssignment> getLocalAssignmentsMap() {
+        LSSupervisorAssignments assignments = (LSSupervisorAssignments) get(LS_LOCAL_ASSIGNMENTS);
+        if (null != assignments) {
+            return assignments.get_assignments();
+        }
+        return null;
+    }
+
+    public void setLocalAssignmentsMap(Map<Integer, LocalAssignment> localAssignmentMap) {
+        put(LS_LOCAL_ASSIGNMENTS, new LSSupervisorAssignments(localAssignmentMap));
+    }
+
     private void persistInternal(Map<String, ThriftSerializedObject> serialized, TSerializer ser, boolean cleanup) {
         try {
             if (ser == null) {


[3/3] storm git commit: Added STORM-1246 to CHANGELOG.

Posted by sr...@apache.org.
Added STORM-1246 to CHANGELOG.


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/8052a8c7
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/8052a8c7
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/8052a8c7

Branch: refs/heads/master
Commit: 8052a8c780bc7864861fbdfe70d453ed7a87d7f0
Parents: bc38f53
Author: Sriharsha Chintalapani <ha...@hortonworks.com>
Authored: Thu Feb 18 13:57:51 2016 -0800
Committer: Sriharsha Chintalapani <ha...@hortonworks.com>
Committed: Thu Feb 18 13:57:51 2016 -0800

----------------------------------------------------------------------
 CHANGELOG.md | 1 +
 1 file changed, 1 insertion(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/8052a8c7/CHANGELOG.md
----------------------------------------------------------------------
diff --git a/CHANGELOG.md b/CHANGELOG.md
index 00e77e8..0c48b97 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -1,4 +1,5 @@
 ## 2.0.0
+ * STORM-1246: port backtype.storm.local-state to java.
  * STORM-1516: Fixed issue in writing pids with distributed cluster mode.
  * STORM-1253: port backtype.storm.timer to java
  * STORM-1258: port thrift.clj to Thrift.java


[2/3] storm git commit: Merge branch 'local-state2' of http://github.com/abhishekagarwal87/storm into STORM-1246

Posted by sr...@apache.org.
Merge branch 'local-state2' of http://github.com/abhishekagarwal87/storm into STORM-1246


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/bc38f530
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/bc38f530
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/bc38f530

Branch: refs/heads/master
Commit: bc38f530e4c192055dd4107cb92c9236e8a34b5e
Parents: abb1b85 4bf331d
Author: Sriharsha Chintalapani <ha...@hortonworks.com>
Authored: Thu Feb 18 12:58:25 2016 -0800
Committer: Sriharsha Chintalapani <ha...@hortonworks.com>
Committed: Thu Feb 18 12:58:25 2016 -0800

----------------------------------------------------------------------
 .../src/clj/org/apache/storm/daemon/nimbus.clj  |  31 +++--
 .../clj/org/apache/storm/daemon/supervisor.clj  |  65 +++++++--
 .../src/clj/org/apache/storm/daemon/worker.clj  |  10 +-
 .../src/clj/org/apache/storm/local_state.clj    | 134 -------------------
 .../org/apache/storm/local_state_converter.clj  |  24 ++++
 storm-core/src/clj/org/apache/storm/testing.clj |  10 +-
 .../jvm/org/apache/storm/utils/LocalState.java  | 112 ++++++++++++++--
 7 files changed, 209 insertions(+), 177 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/bc38f530/storm-core/src/clj/org/apache/storm/daemon/worker.clj
----------------------------------------------------------------------