You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by sr...@apache.org on 2016/02/18 23:03:07 UTC
[1/3] storm git commit: STORM-1246: port backtype.storm.local-state
to java
Repository: storm
Updated Branches:
refs/heads/master abb1b85f2 -> 8052a8c78
STORM-1246: port backtype.storm.local-state to java
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/4bf331d6
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/4bf331d6
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/4bf331d6
Branch: refs/heads/master
Commit: 4bf331d668c279f2f6e462c1bfcaebffa06082f1
Parents: 561ceca
Author: Abhishek Agarwal <ab...@inmobi.com>
Authored: Fri Feb 19 00:57:40 2016 +0530
Committer: Abhishek Agarwal <ab...@inmobi.com>
Committed: Fri Feb 19 00:57:40 2016 +0530
----------------------------------------------------------------------
.../src/clj/org/apache/storm/daemon/nimbus.clj | 31 +++--
.../clj/org/apache/storm/daemon/supervisor.clj | 65 +++++++--
.../src/clj/org/apache/storm/daemon/worker.clj | 10 +-
.../src/clj/org/apache/storm/local_state.clj | 134 -------------------
.../org/apache/storm/local_state_converter.clj | 24 ++++
storm-core/src/clj/org/apache/storm/testing.clj | 10 +-
.../jvm/org/apache/storm/utils/LocalState.java | 112 ++++++++++++++--
7 files changed, 209 insertions(+), 177 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/4bf331d6/storm-core/src/clj/org/apache/storm/daemon/nimbus.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/daemon/nimbus.clj b/storm-core/src/clj/org/apache/storm/daemon/nimbus.clj
index a3497d6..28a6fb8 100644
--- a/storm-core/src/clj/org/apache/storm/daemon/nimbus.clj
+++ b/storm-core/src/clj/org/apache/storm/daemon/nimbus.clj
@@ -46,11 +46,11 @@
KillOptions RebalanceOptions ClusterSummary SupervisorSummary TopologySummary TopologyInfo TopologyHistoryInfo
ExecutorSummary AuthorizationException GetInfoOptions NumErrorsChoice SettableBlobMeta ReadableBlobMeta
BeginDownloadResult ListBlobsResult ComponentPageInfo TopologyPageInfo LogConfig LogLevel LogLevelAction
- ProfileRequest ProfileAction NodeInfo])
+ ProfileRequest ProfileAction NodeInfo LSTopoHistory])
(:import [org.apache.storm.daemon Shutdownable])
(:import [org.apache.storm.validation ConfigValidation])
(:import [org.apache.storm.cluster ClusterStateContext DaemonType])
- (:use [org.apache.storm util config log zookeeper local-state])
+ (:use [org.apache.storm util config log zookeeper])
(:require [org.apache.storm [cluster :as cluster]
[converter :as converter]
[stats :as stats]])
@@ -60,7 +60,7 @@
(:use [org.apache.storm.daemon common])
(:use [org.apache.storm config])
(:import [org.apache.zookeeper data.ACL ZooDefs$Ids ZooDefs$Perms])
- (:import [org.apache.storm.utils VersionInfo]
+ (:import [org.apache.storm.utils VersionInfo LocalState]
[org.json.simple JSONValue])
(:require [clj-time.core :as time])
(:require [clj-time.coerce :as coerce])
@@ -1181,11 +1181,8 @@
[mins nimbus]
(locking (:topology-history-lock nimbus)
(let [cutoff-age (- (Time/currentTimeSecs) (* mins 60))
- topo-history-state (:topo-history-state nimbus)
- curr-history (vec (ls-topo-hist topo-history-state))
- new-history (vec (filter (fn [line]
- (> (line :timestamp) cutoff-age)) curr-history))]
- (ls-topo-hist! topo-history-state new-history))))
+ topo-history-state (:topo-history-state nimbus)]
+ (.filterOldTopologies ^LocalState topo-history-state cutoff-age))))
(defn cleanup-corrupt-topologies! [nimbus]
(let [storm-cluster-state (:storm-cluster-state nimbus)
@@ -1275,11 +1272,9 @@
(locking (:topology-history-lock nimbus)
(let [topo-history-state (:topo-history-state nimbus)
users (ConfigUtils/getTopoLogsUsers topology-conf)
- groups (ConfigUtils/getTopoLogsGroups topology-conf)
- curr-history (vec (ls-topo-hist topo-history-state))
- new-history (conj curr-history {:topoid storm-id :timestamp (Time/currentTimeSecs)
- :users users :groups groups})]
- (ls-topo-hist! topo-history-state new-history))))
+ groups (ConfigUtils/getTopoLogsGroups topology-conf)]
+ (.addTopologyHistory ^LocalState topo-history-state
+ (LSTopoHistory. storm-id (Time/currentTimeSecs) users groups)))))
(defn igroup-mapper
[storm-conf]
@@ -1295,10 +1290,18 @@
(let [groups (user-groups user storm-conf)]
(> (.size (set/intersection (set groups) (set groups-to-check))) 0)))
+(defn ->topo-history
+ [thrift-topo-hist]
+ {
+ :topoid (.get_topology_id thrift-topo-hist)
+ :timestamp (.get_time_stamp thrift-topo-hist)
+ :users (.get_users thrift-topo-hist)
+ :groups (.get_groups thrift-topo-hist)})
+
(defn read-topology-history
[nimbus user admin-users]
(let [topo-history-state (:topo-history-state nimbus)
- curr-history (vec (ls-topo-hist topo-history-state))
+ curr-history (vec (map ->topo-history (.getTopoHistoryList ^LocalState topo-history-state)))
topo-user-can-access (fn [line user storm-conf]
(if (nil? user)
(line :topoid)
http://git-wip-us.apache.org/repos/asf/storm/blob/4bf331d6/storm-core/src/clj/org/apache/storm/daemon/supervisor.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/daemon/supervisor.clj b/storm-core/src/clj/org/apache/storm/daemon/supervisor.clj
index ad9db76..5685a09 100644
--- a/storm-core/src/clj/org/apache/storm/daemon/supervisor.clj
+++ b/storm-core/src/clj/org/apache/storm/daemon/supervisor.clj
@@ -24,12 +24,12 @@
[java.net JarURLConnection]
[java.net URI URLDecoder]
[org.apache.commons.io FileUtils])
- (:use [org.apache.storm config util log local-state])
+ (:use [org.apache.storm config util log local-state-converter])
(:import [org.apache.storm.generated AuthorizationException KeyNotFoundException WorkerResources])
(:import [org.apache.storm.utils NimbusLeaderNotFoundException VersionInfo])
(:import [java.nio.file Files StandardCopyOption])
(:import [org.apache.storm Config])
- (:import [org.apache.storm.generated WorkerResources ProfileAction])
+ (:import [org.apache.storm.generated WorkerResources ProfileAction LocalAssignment])
(:import [org.apache.storm.localizer LocalResource])
(:use [org.apache.storm.daemon common])
(:require [org.apache.storm.command [healthcheck :as healthcheck]])
@@ -85,6 +85,10 @@
:profiler-actions new-profiler-actions
:versions new-assignments})))
+(defn mk-local-assignment
+ [storm-id executors resources]
+ {:storm-id storm-id :executors executors :resources resources})
+
(defn- read-my-executors [assignments-snapshot storm-id assignment-id]
(let [assignment (get assignments-snapshot storm-id)
my-slots-resources (into {}
@@ -125,6 +129,20 @@
(defn- read-downloaded-storm-ids [conf]
(map #(URLDecoder/decode %) (Utils/readDirContents (ConfigUtils/supervisorStormDistRoot conf))))
+(defn ->executor-list
+ [executors]
+ (into []
+ (for [exec-info executors]
+ [(.get_task_start exec-info) (.get_task_end exec-info)])))
+
+(defn ls-worker-heartbeat
+ [^LocalState local-state]
+ (if-let [worker-hb (.getWorkerHeartBeat ^LocalState local-state)]
+ {:time-secs (.get_time_secs worker-hb)
+ :storm-id (.get_topology_id worker-hb)
+ :executors (->executor-list (.get_executors worker-hb))
+ :port (.get_port worker-hb)}))
+
(defn read-worker-heartbeat [conf id]
(let [local-state (ConfigUtils/workerState conf id)]
(try
@@ -172,7 +190,7 @@
(let [conf (:conf supervisor)
^LocalState local-state (:local-state supervisor)
id->heartbeat (read-worker-heartbeats conf)
- approved-ids (set (keys (ls-approved-workers local-state)))]
+ approved-ids (set (keys (clojurify-structure (.getApprovedWorkers ^LocalState local-state))))]
(into
{}
(dofor [[id hb] id->heartbeat]
@@ -198,7 +216,7 @@
(defn- wait-for-worker-launch [conf id start-time]
(let [state (ConfigUtils/workerState conf id)]
(loop []
- (let [hb (ls-worker-heartbeat state)]
+ (let [hb (.getWorkerHeartBeat state)]
(when (and
(not hb)
(<
@@ -209,7 +227,7 @@
(Time/sleep 500)
(recur)
)))
- (when-not (ls-worker-heartbeat state)
+ (when-not (.getWorkerHeartBeat state)
(log-message "Worker " id " failed to start")
)))
@@ -414,6 +432,19 @@
[pred amap]
(into {} (filter (fn [[k v]] (pred k)) amap)))
+(defn ->local-assignment
+ [^LocalAssignment thrift-local-assignment]
+ (mk-local-assignment
+ (.get_topology_id thrift-local-assignment)
+ (->executor-list (.get_executors thrift-local-assignment))
+ (.get_resources thrift-local-assignment)))
+
+;TODO: when translating this function, you should replace the map-val with a proper for loop HERE
+(defn ls-local-assignments
+ [^LocalState local-state]
+ (if-let [thrift-local-assignments (.getLocalAssignmentsMap local-state)]
+ (map-val ->local-assignment thrift-local-assignments)))
+
;TODO: when translating this function, you should replace the filter-val with a proper for loop + if condition HERE
(defn sync-processes [supervisor]
(let [conf (:conf supervisor)
@@ -453,9 +484,9 @@
", Heartbeat: " (pr-str heartbeat))
(shutdown-worker supervisor id)))
(let [valid-new-worker-ids (get-valid-new-worker-ids conf supervisor reassign-executors new-worker-ids)]
- (ls-approved-workers! local-state
+ (.setApprovedWorkers ^LocalState local-state
(merge
- (select-keys (ls-approved-workers local-state)
+ (select-keys (clojurify-structure (.getApprovedWorkers ^LocalState local-state))
(keys keepers))
valid-new-worker-ids))
(wait-for-workers-launch conf (keys valid-new-worker-ids)))))
@@ -553,6 +584,22 @@
(rm-topo-files conf storm-id localizer false)
storm-id)))))
+(defn ->LocalAssignment
+ [{storm-id :storm-id executors :executors resources :resources}]
+ (let [assignment (LocalAssignment. storm-id (->ExecutorInfo-list executors))]
+ (if resources (.set_resources assignment
+ (doto (WorkerResources. )
+ (.set_mem_on_heap (first resources))
+ (.set_mem_off_heap (second resources))
+ (.set_cpu (last resources)))))
+ assignment))
+
+;TODO: when translating this function, you should replace the map-val with a proper for loop HERE
+(defn ls-local-assignments!
+ [^LocalState local-state assignments]
+ (let [local-assignment-map (map-val ->LocalAssignment assignments)]
+ (.setLocalAssignmentsMap local-state local-assignment-map)))
+
(defn mk-synchronize-supervisor [supervisor sync-processes event-manager processes-event-manager]
(fn this []
(let [conf (:conf supervisor)
@@ -1265,10 +1312,10 @@
(prepare [this conf local-dir]
(reset! conf-atom conf)
(let [state (LocalState. local-dir)
- curr-id (if-let [id (ls-supervisor-id state)]
+ curr-id (if-let [id (.getSupervisorId state)]
id
(generate-supervisor-id))]
- (ls-supervisor-id! state curr-id)
+ (.setSupervisorId state curr-id)
(reset! id-atom curr-id))
)
(confirmAssigned [this port]
http://git-wip-us.apache.org/repos/asf/storm/blob/4bf331d6/storm-core/src/clj/org/apache/storm/daemon/worker.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/daemon/worker.clj b/storm-core/src/clj/org/apache/storm/daemon/worker.clj
index c2a767a..60bc070 100644
--- a/storm-core/src/clj/org/apache/storm/daemon/worker.clj
+++ b/storm-core/src/clj/org/apache/storm/daemon/worker.clj
@@ -15,7 +15,7 @@
;; limitations under the License.
(ns org.apache.storm.daemon.worker
(:use [org.apache.storm.daemon common])
- (:use [org.apache.storm config log util local-state])
+ (:use [org.apache.storm config log util local-state-converter])
(:require [clj-time.core :as time])
(:require [clj-time.coerce :as coerce])
(:require [org.apache.storm.daemon [executor :as executor]])
@@ -33,7 +33,7 @@
(:import [org.apache.storm.messaging TaskMessage IContext IConnection ConnectionWithStatus ConnectionWithStatus$Status DeserializingConnectionCallback])
(:import [org.apache.storm.daemon Shutdownable])
(:import [org.apache.storm.serialization KryoTupleSerializer])
- (:import [org.apache.storm.generated StormTopology])
+ (:import [org.apache.storm.generated StormTopology LSWorkerHeartbeat])
(:import [org.apache.storm.tuple AddressedTuple Fields])
(:import [org.apache.storm.task WorkerTopologyContext])
(:import [org.apache.storm Constants])
@@ -84,7 +84,11 @@
(let [conf (:conf worker)
state (ConfigUtils/workerState conf (:worker-id worker))]
;; do the local-file-system heartbeat.
- (ls-worker-heartbeat! state (Time/currentTimeSecs) (:storm-id worker) (:executors worker) (:port worker))
+ (.setWorkerHeartBeat state (LSWorkerHeartbeat.
+ (Time/currentTimeSecs)
+ (:storm-id worker)
+ (->ExecutorInfo-list (:executors worker))
+ (:port worker)))
(.cleanup state 60) ; this is just in case supervisor is down so that disk doesn't fill up.
; it shouldn't take supervisor 120 seconds between listing dir and reading it
http://git-wip-us.apache.org/repos/asf/storm/blob/4bf331d6/storm-core/src/clj/org/apache/storm/local_state.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/local_state.clj b/storm-core/src/clj/org/apache/storm/local_state.clj
deleted file mode 100644
index df67c5e..0000000
--- a/storm-core/src/clj/org/apache/storm/local_state.clj
+++ /dev/null
@@ -1,134 +0,0 @@
-;; Licensed to the Apache Software Foundation (ASF) under one
-;; or more contributor license agreements. See the NOTICE file
-;; distributed with this work for additional information
-;; regarding copyright ownership. The ASF licenses this file
-;; to you under the Apache License, Version 2.0 (the
-;; "License"); you may not use this file except in compliance
-;; with the License. You may obtain a copy of the License at
-;;
-;; http://www.apache.org/licenses/LICENSE-2.0
-;;
-;; Unless required by applicable law or agreed to in writing, software
-;; distributed under the License is distributed on an "AS IS" BASIS,
-;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-;; See the License for the specific language governing permissions and
-;; limitations under the License.
-(ns org.apache.storm.local-state
- (:use [org.apache.storm log util])
- (:import [org.apache.storm.generated StormTopology
- InvalidTopologyException GlobalStreamId
- LSSupervisorId LSApprovedWorkers
- LSSupervisorAssignments LocalAssignment
- ExecutorInfo LSWorkerHeartbeat
- LSTopoHistory LSTopoHistoryList
- WorkerResources]
- [org.apache.storm.utils Utils])
- (:import [org.apache.storm.utils LocalState]))
-
-(def LS-WORKER-HEARTBEAT "worker-heartbeat")
-(def LS-ID "supervisor-id")
-(def LS-LOCAL-ASSIGNMENTS "local-assignments")
-(def LS-APPROVED-WORKERS "approved-workers")
-(def LS-TOPO-HISTORY "topo-hist")
-
-(defn ->LSTopoHistory
- [{topoid :topoid timestamp :timestamp users :users groups :groups}]
- (LSTopoHistory. topoid timestamp users groups))
-
-(defn ->topo-history
- [thrift-topo-hist]
- {
- :topoid (.get_topology_id thrift-topo-hist)
- :timestamp (.get_time_stamp thrift-topo-hist)
- :users (.get_users thrift-topo-hist)
- :groups (.get_groups thrift-topo-hist)})
-
-(defn ls-topo-hist!
- [^LocalState local-state hist-list]
- (.put local-state LS-TOPO-HISTORY
- (LSTopoHistoryList. (map ->LSTopoHistory hist-list))))
-
-(defn ls-topo-hist
- [^LocalState local-state]
- (if-let [thrift-hist-list (.get local-state LS-TOPO-HISTORY)]
- (map ->topo-history (.get_topo_history thrift-hist-list))))
-
-(defn ls-supervisor-id!
- [^LocalState local-state ^String id]
- (.put local-state LS-ID (LSSupervisorId. id)))
-
-(defn ls-supervisor-id
- [^LocalState local-state]
- (if-let [super-id (.get local-state LS-ID)]
- (.get_supervisor_id super-id)))
-
-(defn ls-approved-workers!
- [^LocalState local-state workers]
- (.put local-state LS-APPROVED-WORKERS (LSApprovedWorkers. workers)))
-
-(defn ls-approved-workers
- [^LocalState local-state]
- (if-let [tmp (.get local-state LS-APPROVED-WORKERS)]
- (into {} (.get_approved_workers tmp))))
-
-(defn ->ExecutorInfo
- [[low high]] (ExecutorInfo. low high))
-
-(defn ->ExecutorInfo-list
- [executors]
- (map ->ExecutorInfo executors))
-
-(defn ->executor-list
- [executors]
- (into []
- (for [exec-info executors]
- [(.get_task_start exec-info) (.get_task_end exec-info)])))
-
-(defn ->LocalAssignment
- [{storm-id :storm-id executors :executors resources :resources}]
- (let [assignment (LocalAssignment. storm-id (->ExecutorInfo-list executors))]
- (if resources (.set_resources assignment
- (doto (WorkerResources. )
- (.set_mem_on_heap (first resources))
- (.set_mem_off_heap (second resources))
- (.set_cpu (last resources)))))
- assignment))
-
-(defn mk-local-assignment
- [storm-id executors resources]
- {:storm-id storm-id :executors executors :resources resources})
-
-(defn ->local-assignment
- [^LocalAssignment thrift-local-assignment]
- (mk-local-assignment
- (.get_topology_id thrift-local-assignment)
- (->executor-list (.get_executors thrift-local-assignment))
- (.get_resources thrift-local-assignment)))
-
-;TODO: when translating this function, you should replace the map-val with a proper for loop HERE
-(defn ls-local-assignments!
- [^LocalState local-state assignments]
- (let [local-assignment-map (map-val ->LocalAssignment assignments)]
- (.put local-state LS-LOCAL-ASSIGNMENTS
- (LSSupervisorAssignments. local-assignment-map))))
-
-;TODO: when translating this function, you should replace the map-val with a proper for loop HERE
-(defn ls-local-assignments
- [^LocalState local-state]
- (if-let [thrift-local-assignments (.get local-state LS-LOCAL-ASSIGNMENTS)]
- (map-val
- ->local-assignment
- (.get_assignments thrift-local-assignments))))
-
-(defn ls-worker-heartbeat!
- [^LocalState local-state time-secs storm-id executors port]
- (.put local-state LS-WORKER-HEARTBEAT (LSWorkerHeartbeat. time-secs storm-id (->ExecutorInfo-list executors) port) false))
-
-(defn ls-worker-heartbeat
- [^LocalState local-state]
- (if-let [worker-hb (.get local-state LS-WORKER-HEARTBEAT)]
- {:time-secs (.get_time_secs worker-hb)
- :storm-id (.get_topology_id worker-hb)
- :executors (->executor-list (.get_executors worker-hb))
- :port (.get_port worker-hb)}))
-
http://git-wip-us.apache.org/repos/asf/storm/blob/4bf331d6/storm-core/src/clj/org/apache/storm/local_state_converter.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/local_state_converter.clj b/storm-core/src/clj/org/apache/storm/local_state_converter.clj
new file mode 100644
index 0000000..e8eeaca
--- /dev/null
+++ b/storm-core/src/clj/org/apache/storm/local_state_converter.clj
@@ -0,0 +1,24 @@
+;; Licensed to the Apache Software Foundation (ASF) under one
+;; or more contributor license agreements. See the NOTICE file
+;; distributed with this work for additional information
+;; regarding copyright ownership. The ASF licenses this file
+;; to you under the Apache License, Version 2.0 (the
+;; "License"); you may not use this file except in compliance
+;; with the License. You may obtain a copy of the License at
+;;
+;; http://www.apache.org/licenses/LICENSE-2.0
+;;
+;; Unless required by applicable law or agreed to in writing, software
+;; distributed under the License is distributed on an "AS IS" BASIS,
+;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+;; See the License for the specific language governing permissions and
+;; limitations under the License.
+(ns org.apache.storm.local-state-converter
+ (:import [org.apache.storm.generated ExecutorInfo]))
+
+(defn ->ExecutorInfo
+ [[low high]] (ExecutorInfo. low high))
+
+(defn ->ExecutorInfo-list
+ [executors]
+ (map ->ExecutorInfo executors))
http://git-wip-us.apache.org/repos/asf/storm/blob/4bf331d6/storm-core/src/clj/org/apache/storm/testing.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/testing.clj b/storm-core/src/clj/org/apache/storm/testing.clj
index 4ad5ff8..7817929 100644
--- a/storm-core/src/clj/org/apache/storm/testing.clj
+++ b/storm-core/src/clj/org/apache/storm/testing.clj
@@ -29,7 +29,7 @@
(:import [java.util HashMap ArrayList])
(:import [java.util.concurrent.atomic AtomicInteger])
(:import [java.util.concurrent ConcurrentHashMap])
- (:import [org.apache.storm.utils Time Utils IPredicate RegisteredGlobalState ConfigUtils])
+ (:import [org.apache.storm.utils Time Utils IPredicate RegisteredGlobalState ConfigUtils LocalState])
(:import [org.apache.storm.tuple Fields Tuple TupleImpl])
(:import [org.apache.storm.task TopologyContext])
(:import [org.apache.storm.generated GlobalStreamId Bolt KillOptions])
@@ -51,7 +51,7 @@
[org.json.simple JSONValue])
(:require [org.apache.storm [zookeeper :as zk]])
(:require [org.apache.storm.daemon.acker :as acker])
- (:use [org.apache.storm cluster util config log local-state])
+ (:use [org.apache.storm cluster util config log local-state-converter])
(:use [org.apache.storm.internal thrift]))
(defn feeder-spout
@@ -395,14 +395,14 @@
(defn find-worker-id
[supervisor-conf port]
(let [supervisor-state (ConfigUtils/supervisorState supervisor-conf)
- worker->port (ls-approved-workers supervisor-state)]
+ worker->port (.getApprovedWorkers ^LocalState supervisor-state)]
(first ((clojurify-structure (Utils/reverseMap worker->port)) port))))
(defn find-worker-port
[supervisor-conf worker-id]
(let [supervisor-state (ConfigUtils/supervisorState supervisor-conf)
- worker->port (ls-approved-workers supervisor-state)]
- (worker->port worker-id)))
+ worker->port (.getApprovedWorkers ^LocalState supervisor-state)]
+ (if worker->port (.get worker->port worker-id))))
(defn mk-capture-shutdown-fn
[capture-atom]
http://git-wip-us.apache.org/repos/asf/storm/blob/4bf331d6/storm-core/src/jvm/org/apache/storm/utils/LocalState.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/utils/LocalState.java b/storm-core/src/jvm/org/apache/storm/utils/LocalState.java
index aef1c1c..2f0bb60 100644
--- a/storm-core/src/jvm/org/apache/storm/utils/LocalState.java
+++ b/storm-core/src/jvm/org/apache/storm/utils/LocalState.java
@@ -18,24 +18,28 @@
package org.apache.storm.utils;
import org.apache.commons.io.FileUtils;
+import org.apache.storm.generated.LSApprovedWorkers;
+import org.apache.storm.generated.LSSupervisorAssignments;
+import org.apache.storm.generated.LSSupervisorId;
+import org.apache.storm.generated.LSTopoHistory;
+import org.apache.storm.generated.LSTopoHistoryList;
+import org.apache.storm.generated.LSWorkerHeartbeat;
+import org.apache.storm.generated.LocalAssignment;
+import org.apache.storm.generated.LocalStateData;
+import org.apache.storm.generated.ThriftSerializedObject;
+import org.apache.thrift.TBase;
+import org.apache.thrift.TDeserializer;
+import org.apache.thrift.TSerializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.File;
+import java.io.IOException;
import java.nio.ByteBuffer;
-import java.util.Map;
+import java.util.ArrayList;
import java.util.HashMap;
-import java.io.IOException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.thrift.TBase;
-import org.apache.thrift.TDeserializer;
-import org.apache.thrift.TException;
-import org.apache.thrift.TSerializer;
-
-import org.apache.storm.generated.LocalStateData;
-import org.apache.storm.generated.ThriftSerializedObject;
+import java.util.List;
+import java.util.Map;
/**
* A simple, durable, atomic K/V database. *Very inefficient*, should only be used for occasional reads/writes.
@@ -43,6 +47,11 @@ import org.apache.storm.generated.ThriftSerializedObject;
*/
public class LocalState {
public static final Logger LOG = LoggerFactory.getLogger(LocalState.class);
+ public static final String LS_WORKER_HEARTBEAT = "worker-heartbeat";
+ public static final String LS_ID = "supervisor-id";
+ public static final String LS_LOCAL_ASSIGNMENTS = "local-assignments";
+ public static final String LS_APPROVED_WORKERS = "approved-workers";
+ public static final String LS_TOPO_HISTORY = "topo-hist";
private VersionedStore _vs;
public LocalState(String backingDir) throws IOException {
@@ -157,6 +166,85 @@ public class LocalState {
_vs.cleanup(keepVersions);
}
+ public List<LSTopoHistory> getTopoHistoryList() {
+ LSTopoHistoryList lsTopoHistoryListWrapper = (LSTopoHistoryList) get(LS_TOPO_HISTORY);
+ if (null != lsTopoHistoryListWrapper) {
+ return lsTopoHistoryListWrapper.get_topo_history();
+ }
+ return null;
+ }
+
+ /**
+ * Remove topologies from local state which are older than cutOffAge.
+ * @param cutOffAge
+ */
+ public void filterOldTopologies(long cutOffAge) {
+ LSTopoHistoryList lsTopoHistoryListWrapper = (LSTopoHistoryList) get(LS_TOPO_HISTORY);
+ List<LSTopoHistory> filteredTopoHistoryList = new ArrayList<>();
+ if (null != lsTopoHistoryListWrapper) {
+ for (LSTopoHistory topoHistory : lsTopoHistoryListWrapper.get_topo_history()) {
+ if (topoHistory.get_time_stamp() > cutOffAge) {
+ filteredTopoHistoryList.add(topoHistory);
+ }
+ }
+ }
+ put(LS_TOPO_HISTORY, new LSTopoHistoryList(filteredTopoHistoryList));
+ }
+
+ public void addTopologyHistory(LSTopoHistory lsTopoHistory) {
+ LSTopoHistoryList lsTopoHistoryListWrapper = (LSTopoHistoryList) get(LS_TOPO_HISTORY);
+ List<LSTopoHistory> currentTopoHistoryList = new ArrayList<>();
+ if (null != lsTopoHistoryListWrapper) {
+ currentTopoHistoryList.addAll(lsTopoHistoryListWrapper.get_topo_history());
+ }
+ currentTopoHistoryList.add(lsTopoHistory);
+ put(LS_TOPO_HISTORY, new LSTopoHistoryList(currentTopoHistoryList));
+ }
+
+ public String getSupervisorId() {
+ LSSupervisorId lsSupervisorId = (LSSupervisorId) get(LS_ID);
+ if (null != lsSupervisorId) {
+ return lsSupervisorId.get_supervisor_id();
+ }
+ return null;
+ }
+
+ public void setSupervisorId(String supervisorId) {
+ put(LS_ID, new LSSupervisorId(supervisorId));
+ }
+
+ public Map<String, Integer> getApprovedWorkers() {
+ LSApprovedWorkers lsApprovedWorkers = (LSApprovedWorkers) get(LS_APPROVED_WORKERS);
+ if (null != lsApprovedWorkers) {
+ return lsApprovedWorkers.get_approved_workers();
+ }
+ return null;
+ }
+
+ public void setApprovedWorkers(Map<String, Integer> approvedWorkers) {
+ put(LS_APPROVED_WORKERS, new LSApprovedWorkers(approvedWorkers));
+ }
+
+ public LSWorkerHeartbeat getWorkerHeartBeat() {
+ return (LSWorkerHeartbeat) get(LS_WORKER_HEARTBEAT);
+ }
+
+ public void setWorkerHeartBeat(LSWorkerHeartbeat workerHeartBeat) {
+ put(LS_WORKER_HEARTBEAT, workerHeartBeat, false);
+ }
+
+ public Map<Integer, LocalAssignment> getLocalAssignmentsMap() {
+ LSSupervisorAssignments assignments = (LSSupervisorAssignments) get(LS_LOCAL_ASSIGNMENTS);
+ if (null != assignments) {
+ return assignments.get_assignments();
+ }
+ return null;
+ }
+
+ public void setLocalAssignmentsMap(Map<Integer, LocalAssignment> localAssignmentMap) {
+ put(LS_LOCAL_ASSIGNMENTS, new LSSupervisorAssignments(localAssignmentMap));
+ }
+
private void persistInternal(Map<String, ThriftSerializedObject> serialized, TSerializer ser, boolean cleanup) {
try {
if (ser == null) {
[3/3] storm git commit: Added STORM-1246 to CHANGELOG.
Posted by sr...@apache.org.
Added STORM-1246 to CHANGELOG.
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/8052a8c7
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/8052a8c7
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/8052a8c7
Branch: refs/heads/master
Commit: 8052a8c780bc7864861fbdfe70d453ed7a87d7f0
Parents: bc38f53
Author: Sriharsha Chintalapani <ha...@hortonworks.com>
Authored: Thu Feb 18 13:57:51 2016 -0800
Committer: Sriharsha Chintalapani <ha...@hortonworks.com>
Committed: Thu Feb 18 13:57:51 2016 -0800
----------------------------------------------------------------------
CHANGELOG.md | 1 +
1 file changed, 1 insertion(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/8052a8c7/CHANGELOG.md
----------------------------------------------------------------------
diff --git a/CHANGELOG.md b/CHANGELOG.md
index 00e77e8..0c48b97 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -1,4 +1,5 @@
## 2.0.0
+ * STORM-1246: port backtype.storm.local-state to java.
* STORM-1516: Fixed issue in writing pids with distributed cluster mode.
* STORM-1253: port backtype.storm.timer to java
* STORM-1258: port thrift.clj to Thrift.java
[2/3] storm git commit: Merge branch 'local-state2' of
http://github.com/abhishekagarwal87/storm into STORM-1246
Posted by sr...@apache.org.
Merge branch 'local-state2' of http://github.com/abhishekagarwal87/storm into STORM-1246
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/bc38f530
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/bc38f530
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/bc38f530
Branch: refs/heads/master
Commit: bc38f530e4c192055dd4107cb92c9236e8a34b5e
Parents: abb1b85 4bf331d
Author: Sriharsha Chintalapani <ha...@hortonworks.com>
Authored: Thu Feb 18 12:58:25 2016 -0800
Committer: Sriharsha Chintalapani <ha...@hortonworks.com>
Committed: Thu Feb 18 12:58:25 2016 -0800
----------------------------------------------------------------------
.../src/clj/org/apache/storm/daemon/nimbus.clj | 31 +++--
.../clj/org/apache/storm/daemon/supervisor.clj | 65 +++++++--
.../src/clj/org/apache/storm/daemon/worker.clj | 10 +-
.../src/clj/org/apache/storm/local_state.clj | 134 -------------------
.../org/apache/storm/local_state_converter.clj | 24 ++++
storm-core/src/clj/org/apache/storm/testing.clj | 10 +-
.../jvm/org/apache/storm/utils/LocalState.java | 112 ++++++++++++++--
7 files changed, 209 insertions(+), 177 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/bc38f530/storm-core/src/clj/org/apache/storm/daemon/worker.clj
----------------------------------------------------------------------