You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by bo...@apache.org on 2015/12/04 16:04:05 UTC
[10/17] storm git commit: Blobstore API STORM- 876
http://git-wip-us.apache.org/repos/asf/storm/blob/7029aee5/storm-core/src/clj/backtype/storm/daemon/supervisor.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/backtype/storm/daemon/supervisor.clj b/storm-core/src/clj/backtype/storm/daemon/supervisor.clj
index d659d57..e066269 100644
--- a/storm-core/src/clj/backtype/storm/daemon/supervisor.clj
+++ b/storm-core/src/clj/backtype/storm/daemon/supervisor.clj
@@ -14,19 +14,21 @@
;; See the License for the specific language governing permissions and
;; limitations under the License.
(ns backtype.storm.daemon.supervisor
- (:import [java.io OutputStreamWriter BufferedWriter IOException])
+ (:import [java.io File IOException FileOutputStream])
(:import [backtype.storm.scheduler ISupervisor]
[backtype.storm.utils LocalState Time Utils]
[backtype.storm.daemon Shutdownable]
[backtype.storm Constants]
[java.net JarURLConnection]
[java.net URI]
- [org.apache.commons.io FileUtils]
- [java.io File])
+ [org.apache.commons.io FileUtils])
(:use [backtype.storm config util log timer local-state])
+ (:import [backtype.storm.generated AuthorizationException KeyNotFoundException WorkerResources])
(:import [backtype.storm.utils VersionInfo])
+ (:import [java.nio.file Files StandardCopyOption])
(:import [backtype.storm Config])
(:import [backtype.storm.generated WorkerResources ProfileAction])
+ (:import [backtype.storm.localizer LocalResource])
(:use [backtype.storm.daemon common])
(:require [backtype.storm.command [healthcheck :as healthcheck]])
(:require [backtype.storm.daemon [worker :as worker]]
@@ -44,7 +46,6 @@
(defmulti download-storm-code cluster-mode)
(defmulti launch-worker (fn [supervisor & _] (cluster-mode (:conf supervisor))))
-(defmulti mk-code-distributor cluster-mode)
(defprotocol SupervisorDaemon
(get-id [this])
@@ -238,20 +239,21 @@
(defn- rmr-as-user
"Launches a process owned by the given user that deletes the given path
recursively. Throws RuntimeException if the directory is not removed."
- [conf id user path]
- (worker-launcher-and-wait conf
- user
- ["rmr" path]
- :log-prefix (str "rmr " id))
- (if (exists-file? path)
- (throw (RuntimeException. (str path " was not deleted")))))
-
-(defn try-cleanup-worker [conf id user]
+ [conf id path]
+ (let [user (Utils/getFileOwner path)]
+ (worker-launcher-and-wait conf
+ user
+ ["rmr" path]
+ :log-prefix (str "rmr " id))
+ (if (exists-file? path)
+ (throw (RuntimeException. (str path " was not deleted"))))))
+
+(defn try-cleanup-worker [conf id]
(try
(if (.exists (File. (worker-root conf id)))
(do
(if (conf SUPERVISOR-RUN-WORKER-AS-USER)
- (rmr-as-user conf id user (worker-root conf id))
+ (rmr-as-user conf id (worker-root conf id))
(do
(rmr (worker-heartbeats-root conf id))
;; this avoids a race condition with worker or subprocess writing pid around same time
@@ -290,11 +292,11 @@
(worker-launcher-and-wait conf user ["signal" pid "9"] :log-prefix (str "kill -9 " pid))
(force-kill-process pid))
(if as-user
- (rmr-as-user conf id user (worker-pid-path conf id pid))
+ (rmr-as-user conf id (worker-pid-path conf id pid))
(try
(rmpath (worker-pid-path conf id pid))
(catch Exception e)))) ;; on windows, the supervisor may still holds the lock on the worker directory
- (try-cleanup-worker conf id user))
+ (try-cleanup-worker conf id))
(log-message "Shut down " (:supervisor-id supervisor) ":" id))
(def SUPERVISOR-ZK-ACLS
@@ -326,16 +328,62 @@
(log-error t "Error when processing event")
(exit-process! 20 "Error when processing an event")
))
+ :blob-update-timer (mk-timer :kill-fn (defn blob-update-timer
+ [t]
+ (log-error t "Error when processing event")
+ (exit-process! 20 "Error when processing a event"))
+ :timer-name "blob-update-timer")
+ :localizer (Utils/createLocalizer conf (supervisor-local-dir conf))
:assignment-versions (atom {})
:sync-retry (atom 0)
- :code-distributor (mk-code-distributor conf)
:download-lock (Object.)
:stormid->profiler-actions (atom {})
})
+(defn required-topo-files-exist?
+ [conf storm-id]
+ (let [stormroot (supervisor-stormdist-root conf storm-id)
+ stormjarpath (supervisor-stormjar-path stormroot)
+ stormcodepath (supervisor-stormcode-path stormroot)
+ stormconfpath (supervisor-stormconf-path stormroot)]
+ (and (every? exists-file? [stormroot stormconfpath stormcodepath])
+ (or (local-mode? conf)
+ (exists-file? stormjarpath)))))
+
+(defn get-worker-assignment-helper-msg
+ [assignment supervisor port id]
+ (str (pr-str assignment) " for this supervisor " (:supervisor-id supervisor) " on port "
+ port " with id " id))
+
+(defn get-valid-new-worker-ids
+ [conf supervisor reassign-executors new-worker-ids]
+ (into {}
+ (remove nil?
+ (dofor [[port assignment] reassign-executors]
+ (let [id (new-worker-ids port)
+ storm-id (:storm-id assignment)
+ ^WorkerResources resources (:resources assignment)
+ mem-onheap (.get_mem_on_heap resources)]
+ ;; This condition checks for required files exist before launching the worker
+ (if (required-topo-files-exist? conf storm-id)
+ (do
+ (log-message "Launching worker with assignment "
+ (get-worker-assignment-helper-msg assignment supervisor port id))
+ (local-mkdirs (worker-pids-root conf id))
+ (local-mkdirs (worker-heartbeats-root conf id))
+ (launch-worker supervisor
+ (:storm-id assignment)
+ port
+ id
+ mem-onheap)
+ [id port])
+ (do
+ (log-message "Missing topology storm code, so can't launch worker with assignment "
+ (get-worker-assignment-helper-msg assignment supervisor port id))
+ nil)))))))
+
(defn sync-processes [supervisor]
(let [conf (:conf supervisor)
- download-lock (:download-lock supervisor)
^LocalState local-state (:local-state supervisor)
storm-cluster-state (:storm-cluster-state supervisor)
assigned-executors (defaulted (ls-local-assignments local-state) {})
@@ -349,8 +397,7 @@
new-worker-ids (into
{}
(for [port (keys reassign-executors)]
- [port (uuid)]))
- ]
+ [port (uuid)]))]
;; 1. to kill are those in allocated that are dead or disallowed
;; 2. kill the ones that should be dead
;; - read pids, kill -9 and individually remove file
@@ -371,67 +418,14 @@
". Current supervisor time: " now
". State: " state
", Heartbeat: " (pr-str heartbeat))
- (shutdown-worker supervisor id)
- (if (:code-distributor supervisor)
- (.cleanup (:code-distributor supervisor) id))
- ))
-
- (doseq [id (vals new-worker-ids)]
- (local-mkdirs (worker-pids-root conf id))
- (local-mkdirs (worker-heartbeats-root conf id)))
- (ls-approved-workers! local-state
- (merge
- (select-keys (ls-approved-workers local-state)
- (keys keepers))
- (zipmap (vals new-worker-ids) (keys new-worker-ids))
- ))
-
- ;; check storm topology code dir exists before launching workers
- (doseq [[port assignment] reassign-executors]
- (let [downloaded-storm-ids (set (read-downloaded-storm-ids conf))
- storm-id (:storm-id assignment)
- cached-assignment-info @(:assignment-versions supervisor)
- assignment-info (if (and (not-nil? cached-assignment-info) (contains? cached-assignment-info storm-id ))
- (get cached-assignment-info storm-id)
- (.assignment-info-with-version storm-cluster-state storm-id nil))
- storm-code-map (read-storm-code-locations assignment-info)
- master-code-dir (if (contains? storm-code-map :data) (storm-code-map :data))
- stormroot (supervisor-stormdist-root conf storm-id)]
- (if-not (or (contains? downloaded-storm-ids storm-id) (.exists (File. stormroot)) (nil? master-code-dir))
- (download-storm-code conf storm-id master-code-dir supervisor download-lock))
- ))
-
- (wait-for-workers-launch
- conf
- (dofor [[port assignment] reassign-executors]
- (let [id (new-worker-ids port)
- storm-id (:storm-id assignment)
- ^WorkerResources resources (:resources assignment)
- mem-onheap (.get_mem_on_heap resources)]
- (try
- (log-message "Launching worker with assignment "
- (pr-str assignment)
- " for this supervisor "
- (:supervisor-id supervisor)
- " on port "
- port
- " with id "
- id
- )
- (launch-worker supervisor
- (:storm-id assignment)
- port
- id
- mem-onheap)
- (mark! supervisor:num-workers-launched)
- (catch java.io.FileNotFoundException e
- (log-message "Unable to launch worker due to "
- (.getMessage e)))
- (catch java.io.IOException e
- (log-message "Unable to launch worker due to "
- (.getMessage e))))
- id)))
- ))
+ (shutdown-worker supervisor id)))
+ (let [valid-new-worker-ids (get-valid-new-worker-ids conf supervisor reassign-executors new-worker-ids)]
+ (ls-approved-workers! local-state
+ (merge
+ (select-keys (ls-approved-workers local-state)
+ (keys keepers))
+ valid-new-worker-ids))
+ (wait-for-workers-launch conf (keys valid-new-worker-ids)))))
(defn assigned-storm-ids-from-port-assignments [assignment]
(->> assignment
@@ -454,10 +448,80 @@
(shutdown-worker supervisor id))
))
+(defn get-blob-localname
+ "Given the blob information either gets the localname field if it exists,
+ else routines the default value passed in."
+ [blob-info defaultValue]
+ (or (get blob-info "localname") defaultValue))
+
+(defn should-uncompress-blob?
+ "Given the blob information returns the value of the uncompress field, handling it either being
+ a string or a boolean value, or if it's not specified then returns false"
+ [blob-info]
+ (Boolean. (get blob-info "uncompress")))
+
+(defn remove-blob-references
+ "Remove a reference to a blob when its no longer needed."
+ [localizer storm-id conf]
+ (let [storm-conf (read-supervisor-storm-conf conf storm-id)
+ blobstore-map (storm-conf TOPOLOGY-BLOBSTORE-MAP)
+ user (storm-conf TOPOLOGY-SUBMITTER-USER)
+ topo-name (storm-conf TOPOLOGY-NAME)]
+ (if blobstore-map
+ (doseq [[k, v] blobstore-map]
+ (.removeBlobReference localizer
+ k
+ user
+ topo-name
+ (should-uncompress-blob? v))))))
+
+(defn blobstore-map-to-localresources
+ "Returns a list of LocalResources based on the blobstore-map passed in."
+ [blobstore-map]
+ (if blobstore-map
+ (for [[k, v] blobstore-map] (LocalResource. k (should-uncompress-blob? v)))
+ ()))
+
+(defn add-blob-references
+ "For each of the downloaded topologies, adds references to the blobs that the topologies are
+ using. This is used to reconstruct the cache on restart."
+ [localizer storm-id conf]
+ (let [storm-conf (read-supervisor-storm-conf conf storm-id)
+ blobstore-map (storm-conf TOPOLOGY-BLOBSTORE-MAP)
+ user (storm-conf TOPOLOGY-SUBMITTER-USER)
+ topo-name (storm-conf TOPOLOGY-NAME)
+ localresources (blobstore-map-to-localresources blobstore-map)]
+ (if blobstore-map
+ (.addReferences localizer localresources user topo-name))))
+
+(defn rm-topo-files
+ [conf storm-id localizer rm-blob-refs?]
+ (let [path (supervisor-stormdist-root conf storm-id)]
+ (try
+ (if rm-blob-refs?
+ (remove-blob-references localizer storm-id conf))
+ (if (conf SUPERVISOR-RUN-WORKER-AS-USER)
+ (rmr-as-user conf storm-id path)
+ (rmr (supervisor-stormdist-root conf storm-id)))
+ (catch Exception e
+ (log-message e (str "Exception removing: " storm-id))))))
+
+(defn verify-downloaded-files
+ "Check for the files exists to avoid supervisor crashing
+ Also makes sure there is no necessity for locking"
+ [conf localizer assigned-storm-ids all-downloaded-storm-ids]
+ (remove nil?
+ (into #{}
+ (for [storm-id all-downloaded-storm-ids
+ :when (contains? assigned-storm-ids storm-id)]
+ (when-not (required-topo-files-exist? conf storm-id)
+ (log-debug "Files not present in topology directory")
+ (rm-topo-files conf storm-id localizer false)
+ storm-id)))))
+
(defn mk-synchronize-supervisor [supervisor sync-processes event-manager processes-event-manager]
(fn this []
(let [conf (:conf supervisor)
- download-lock (:download-lock supervisor)
storm-cluster-state (:storm-cluster-state supervisor)
^ISupervisor isupervisor (:isupervisor supervisor)
^LocalState local-state (:local-state supervisor)
@@ -468,7 +532,7 @@
versions :versions}
(assignments-snapshot storm-cluster-state sync-callback assignment-versions)
storm-code-map (read-storm-code-locations assignments-snapshot)
- downloaded-storm-ids (set (read-downloaded-storm-ids conf))
+ all-downloaded-storm-ids (set (read-downloaded-storm-ids conf))
existing-assignment (ls-local-assignments local-state)
all-assignment (read-assignments assignments-snapshot
(:assignment-id supervisor)
@@ -476,14 +540,20 @@
(:sync-retry supervisor))
new-assignment (->> all-assignment
(filter-key #(.confirmAssigned isupervisor %)))
- assigned-storm-ids (assigned-storm-ids-from-port-assignments new-assignment)]
+ assigned-storm-ids (assigned-storm-ids-from-port-assignments new-assignment)
+ localizer (:localizer supervisor)
+ checked-downloaded-storm-ids (set (verify-downloaded-files conf localizer assigned-storm-ids all-downloaded-storm-ids))
+ downloaded-storm-ids (set/difference all-downloaded-storm-ids checked-downloaded-storm-ids)]
+
(log-debug "Synchronizing supervisor")
(log-debug "Storm code map: " storm-code-map)
- (log-debug "Downloaded storm ids: " downloaded-storm-ids)
(log-debug "All assignment: " all-assignment)
(log-debug "New assignment: " new-assignment)
- (log-debug "Storm Ids Profiler Actions" storm-id->profiler-actions)
-
+ (log-debug "Assigned Storm Ids " assigned-storm-ids)
+ (log-debug "All Downloaded Ids " all-downloaded-storm-ids)
+ (log-debug "Checked Downloaded Ids " checked-downloaded-storm-ids)
+ (log-debug "Downloaded Ids " downloaded-storm-ids)
+ (log-debug "Storm Ids Profiler Actions " storm-id->profiler-actions)
;; download code first
;; This might take awhile
;; - should this be done separately from usual monitoring?
@@ -491,7 +561,9 @@
(doseq [[storm-id master-code-dir] storm-code-map]
(when (and (not (downloaded-storm-ids storm-id))
(assigned-storm-ids storm-id))
- (download-storm-code conf storm-id master-code-dir supervisor download-lock)))
+ (log-message "Downloading code for storm id " storm-id)
+ (download-storm-code conf storm-id master-code-dir localizer)
+ (log-message "Finished downloading code for storm id " storm-id)))
(log-debug "Writing new assignment "
(pr-str new-assignment))
@@ -510,22 +582,52 @@
;; synchronize-supervisor doesn't try to launch workers for which the
;; resources don't exist
(if on-windows? (shutdown-disallowed-workers supervisor))
- (doseq [storm-id downloaded-storm-ids]
+ (doseq [storm-id all-downloaded-storm-ids]
(when-not (storm-code-map storm-id)
(log-message "Removing code for storm id "
storm-id)
- (try
- (rmr (supervisor-stormdist-root conf storm-id))
- (catch Exception e (log-message (.getMessage e))))
- ))
- (.add processes-event-manager sync-processes)
- )))
+ (rm-topo-files conf storm-id localizer true)))
+ (.add processes-event-manager sync-processes))))
(defn mk-supervisor-capacities
[conf]
{Config/SUPERVISOR_MEMORY_CAPACITY_MB (double (conf SUPERVISOR-MEMORY-CAPACITY-MB))
Config/SUPERVISOR_CPU_CAPACITY (double (conf SUPERVISOR-CPU-CAPACITY))})
+(defn update-blobs-for-topology!
+ "Update each blob listed in the topology configuration if the latest version of the blob
+ has not been downloaded."
+ [conf storm-id localizer]
+ (let [storm-conf (read-supervisor-storm-conf conf storm-id)
+ blobstore-map (storm-conf TOPOLOGY-BLOBSTORE-MAP)
+ user (storm-conf TOPOLOGY-SUBMITTER-USER)
+ localresources (blobstore-map-to-localresources blobstore-map)]
+ (try
+ (.updateBlobs localizer localresources user)
+ (catch AuthorizationException authExp
+ (log-error authExp))
+ (catch KeyNotFoundException knf
+ (log-error knf)))))
+
+(defn update-blobs-for-all-topologies-fn
+ "Returns a function that downloads all blobs listed in the topology configuration for all topologies assigned
+ to this supervisor, and creates version files with a suffix. The returned function is intended to be run periodically
+ by a timer, created elsewhere."
+ [supervisor]
+ (fn []
+ (try
+ (let [conf (:conf supervisor)
+ downloaded-storm-ids (set (read-downloaded-storm-ids conf))
+ new-assignment @(:curr-assignment supervisor)
+ assigned-storm-ids (assigned-storm-ids-from-port-assignments new-assignment)]
+ (doseq [topology-id downloaded-storm-ids]
+ (let [storm-root (supervisor-stormdist-root conf topology-id)]
+ (when (assigned-storm-ids topology-id)
+ (log-debug "Checking Blob updates for storm topology id " topology-id " With target_dir: " storm-root)
+ (update-blobs-for-topology! conf topology-id (:localizer supervisor))))))
+ (catch Exception e
+ (log-error e "Error updating blobs, will retry again later")))))
+
(defn jvm-cmd [cmd]
(let [java-home (.get (System/getenv) "JAVA_HOME")]
(if (nil? java-home)
@@ -650,6 +752,8 @@
[event-manager processes-event-manager :as managers] [(event/event-manager false) (event/event-manager false)]
sync-processes (partial sync-processes supervisor)
synchronize-supervisor (mk-synchronize-supervisor supervisor sync-processes event-manager processes-event-manager)
+ synchronize-blobs-fn (update-blobs-for-all-topologies-fn supervisor)
+ downloaded-storm-ids (set (read-downloaded-storm-ids conf))
run-profiler-actions-fn (mk-run-profiler-actions-for-all-topologies supervisor)
heartbeat-fn (fn [] (.supervisor-heartbeat!
(:storm-cluster-state supervisor)
@@ -671,6 +775,12 @@
0
(conf SUPERVISOR-HEARTBEAT-FREQUENCY-SECS)
heartbeat-fn)
+ (doseq [storm-id downloaded-storm-ids]
+ (add-blob-references (:localizer supervisor) storm-id
+ conf))
+ ;; do this after adding the references so we don't try to clean things being used
+ (.startCleaner (:localizer supervisor))
+
(when (conf SUPERVISOR-ENABLE)
;; This isn't strictly necessary, but it doesn't hurt and ensures that the machine stays up
;; to date even if callbacks don't all work exactly right
@@ -679,6 +789,13 @@
0
(conf SUPERVISOR-MONITOR-FREQUENCY-SECS)
(fn [] (.add processes-event-manager sync-processes)))
+
+ ;; Blob update thread. Starts with 30 seconds delay, every 30 seconds
+ (schedule-recurring (:blob-update-timer supervisor)
+ 30
+ 30
+ (fn [] (.add event-manager synchronize-blobs-fn)))
+
(schedule-recurring (:event-timer supervisor)
(* 60 5)
(* 60 5)
@@ -689,6 +806,7 @@
(doseq [id ids]
(shutdown-worker supervisor id))
(throw (RuntimeException. "Supervisor failed health check. Exiting.")))))))
+
;; Launch a thread that Runs profiler commands . Starts with 30 seconds delay, every 30 seconds
(schedule-recurring (:event-timer supervisor)
30
@@ -702,8 +820,10 @@
(reset! (:active supervisor) false)
(cancel-timer (:heartbeat-timer supervisor))
(cancel-timer (:event-timer supervisor))
+ (cancel-timer (:blob-update-timer supervisor))
(.shutdown event-manager)
(.shutdown processes-event-manager)
+ (.shutdown (:localizer supervisor))
(.disconnect (:storm-cluster-state supervisor)))
SupervisorDaemon
(get-conf [this]
@@ -728,29 +848,92 @@
(.shutdown supervisor)
)
-(defn setup-storm-code-dir [conf storm-conf dir]
+(defn setup-storm-code-dir
+ [conf storm-conf dir]
(if (conf SUPERVISOR-RUN-WORKER-AS-USER)
(worker-launcher-and-wait conf (storm-conf TOPOLOGY-SUBMITTER-USER) ["code-dir" dir] :log-prefix (str "setup conf for " dir))))
+(defn setup-blob-permission
+ [conf storm-conf path]
+ (if (conf SUPERVISOR-RUN-WORKER-AS-USER)
+ (worker-launcher-and-wait conf (storm-conf TOPOLOGY-SUBMITTER-USER) ["blob" path] :log-prefix (str "setup blob permissions for " path))))
+
+(defn download-blobs-for-topology!
+ "Download all blobs listed in the topology configuration for a given topology."
+ [conf stormconf-path localizer tmproot]
+ (let [storm-conf (read-supervisor-storm-conf-given-path conf stormconf-path)
+ blobstore-map (storm-conf TOPOLOGY-BLOBSTORE-MAP)
+ user (storm-conf TOPOLOGY-SUBMITTER-USER)
+ topo-name (storm-conf TOPOLOGY-NAME)
+ user-dir (.getLocalUserFileCacheDir localizer user)
+ localresources (blobstore-map-to-localresources blobstore-map)]
+ (when localresources
+ (when-not (.exists user-dir)
+ (FileUtils/forceMkdir user-dir)
+ (setup-blob-permission conf storm-conf (.toString user-dir)))
+ (try
+ (let [localized-resources (.getBlobs localizer localresources user topo-name user-dir)]
+ (setup-blob-permission conf storm-conf (.toString user-dir))
+ (doseq [local-rsrc localized-resources]
+ (let [rsrc-file-path (File. (.getFilePath local-rsrc))
+ key-name (.getName rsrc-file-path)
+ blob-symlink-target-name (.getName (File. (.getCurrentSymlinkPath local-rsrc)))
+ symlink-name (get-blob-localname (get blobstore-map key-name) key-name)]
+ (create-symlink! tmproot (.getParent rsrc-file-path) symlink-name
+ blob-symlink-target-name))))
+ (catch AuthorizationException authExp
+ (log-error authExp))
+ (catch KeyNotFoundException knf
+ (log-error knf))))))
+
+(defn get-blob-file-names
+ [blobstore-map]
+ (if blobstore-map
+ (for [[k, data] blobstore-map]
+ (get-blob-localname data k))))
+
+(defn download-blobs-for-topology-succeed?
+ "Assert if all blobs are downloaded for the given topology"
+ [stormconf-path target-dir]
+ (let [storm-conf (clojurify-structure (Utils/fromCompressedJsonConf (FileUtils/readFileToByteArray (File. stormconf-path))))
+ blobstore-map (storm-conf TOPOLOGY-BLOBSTORE-MAP)
+ file-names (get-blob-file-names blobstore-map)]
+ (if-not (empty? file-names)
+ (every? #(Utils/checkFileExists target-dir %) file-names)
+ true)))
+
;; distributed implementation
(defmethod download-storm-code
- :distributed [conf storm-id master-code-dir supervisor download-lock]
- ;; Downloading to permanent location is atomic
- (let [tmproot (str (supervisor-tmp-dir conf) file-path-separator (uuid))
- stormroot (supervisor-stormdist-root conf storm-id)
- master-meta-file-path (master-storm-metafile-path master-code-dir)
- supervisor-meta-file-path (supervisor-storm-metafile-path tmproot)]
- (locking download-lock
- (log-message "Downloading code for storm id " storm-id " from " master-code-dir)
- (FileUtils/forceMkdir (File. tmproot))
- (Utils/downloadFromMaster conf master-meta-file-path supervisor-meta-file-path)
- (if (:code-distributor supervisor)
- (.download (:code-distributor supervisor) storm-id (File. supervisor-meta-file-path)))
- (extract-dir-from-jar (supervisor-stormjar-path tmproot) RESOURCES-SUBDIR tmproot)
- (if (.exists (File. stormroot)) (FileUtils/forceDelete (File. stormroot)))
- (FileUtils/moveDirectory (File. tmproot) (File. stormroot))
- (setup-storm-code-dir conf (read-supervisor-storm-conf conf storm-id) stormroot)
- (log-message "Finished downloading code for storm id " storm-id " from " master-code-dir))))
+ :distributed [conf storm-id master-code-dir localizer]
+ ;; Downloading to permanent location is atomic
+ (let [tmproot (str (supervisor-tmp-dir conf) file-path-separator (uuid))
+ stormroot (supervisor-stormdist-root conf storm-id)
+ blobstore (Utils/getClientBlobStoreForSupervisor conf)]
+ (FileUtils/forceMkdir (File. tmproot))
+ (if-not on-windows?
+ (Utils/restrictPermissions tmproot)
+ (if (conf SUPERVISOR-RUN-WORKER-AS-USER)
+ (throw-runtime (str "ERROR: Windows doesn't implement setting the correct permissions"))))
+ (Utils/downloadResourcesAsSupervisor (master-stormjar-key storm-id)
+ (supervisor-stormjar-path tmproot) blobstore)
+ (Utils/downloadResourcesAsSupervisor (master-stormcode-key storm-id)
+ (supervisor-stormcode-path tmproot) blobstore)
+ (Utils/downloadResourcesAsSupervisor (master-stormconf-key storm-id)
+ (supervisor-stormconf-path tmproot) blobstore)
+ (.shutdown blobstore)
+ (extract-dir-from-jar (supervisor-stormjar-path tmproot) RESOURCES-SUBDIR tmproot)
+ (download-blobs-for-topology! conf (supervisor-stormconf-path tmproot) localizer
+ tmproot)
+ (if (download-blobs-for-topology-succeed? (supervisor-stormconf-path tmproot) tmproot)
+ (do
+ (log-message "Successfully downloaded blob resources for storm-id " storm-id)
+ (FileUtils/forceMkdir (File. stormroot))
+ (Files/move (.toPath (File. tmproot)) (.toPath (File. stormroot))
+ (doto (make-array StandardCopyOption 1) (aset 0 StandardCopyOption/ATOMIC_MOVE)))
+ (setup-storm-code-dir conf (read-supervisor-storm-conf conf storm-id) stormroot))
+ (do
+ (log-message "Failed to download blob resources for storm-id " storm-id)
+ (rmr tmproot)))))
(defn write-log-metadata-to-yaml-file! [storm-id port data conf]
(let [file (get-log-metadata-file conf storm-id port)]
@@ -781,11 +964,6 @@
(storm-conf TOPOLOGY-USERS)))))}]
(write-log-metadata-to-yaml-file! storm-id port data conf)))
-(defmethod mk-code-distributor :distributed [conf]
- (let [code-distributor (new-instance (conf STORM-CODE-DISTRIBUTOR-CLASS))]
- (.prepare code-distributor conf)
- code-distributor))
-
(defn jlp [stormroot conf]
(let [resource-root (str stormroot File/separator RESOURCES-SUBDIR)
os (clojure.string/replace (System/getProperty "os.name") #"\s+" "_")
@@ -811,6 +989,21 @@
:else (-> value sub-fn (clojure.string/split #"\s+")))))
+(defn create-blobstore-links
+ "Create symlinks in worker launch directory for all blobs"
+ [conf storm-id worker-id]
+ (let [stormroot (supervisor-stormdist-root conf storm-id)
+ storm-conf (read-supervisor-storm-conf conf storm-id)
+ workerroot (worker-root conf worker-id)
+ blobstore-map (storm-conf TOPOLOGY-BLOBSTORE-MAP)
+ blob-file-names (get-blob-file-names blobstore-map)
+ resource-file-names (cons RESOURCES-SUBDIR blob-file-names)]
+ (log-message "Creating symlinks for worker-id: " worker-id " storm-id: "
+ storm-id " for files(" (count resource-file-names) "): " (pr-str resource-file-names))
+ (create-symlink! workerroot stormroot RESOURCES-SUBDIR)
+ (doseq [file-name blob-file-names]
+ (create-symlink! workerroot stormroot file-name file-name))))
+
(defn create-artifacts-link
"Create a symlink from workder directory to its port artifacts directory"
[conf storm-id port worker-id]
@@ -913,6 +1106,7 @@
(add-dead-worker worker-id))
worker-dir (worker-root conf worker-id)]
(remove-dead-worker worker-id)
+ (create-blobstore-links conf storm-id worker-id)
(if run-worker-as-user
(worker-launcher conf user ["worker" worker-dir (write-script worker-dir command :environment topology-worker-environment)] :log-prefix log-prefix :exit-code-callback callback :directory (File. worker-dir))
(launch-process command :environment topology-worker-environment :log-prefix log-prefix :exit-code-callback callback :directory (File. worker-dir)))
@@ -927,31 +1121,31 @@
first ))
(defmethod download-storm-code
- :local [conf storm-id master-code-dir supervisor download-lock]
- (let [stormroot (supervisor-stormdist-root conf storm-id)]
- (locking download-lock
- (FileUtils/copyDirectory (File. master-code-dir) (File. stormroot))
- (let [classloader (.getContextClassLoader (Thread/currentThread))
- resources-jar (resources-jar)
- url (.getResource classloader RESOURCES-SUBDIR)
- target-dir (str stormroot file-path-separator RESOURCES-SUBDIR)]
- (cond
- resources-jar
- (do
- (log-message "Extracting resources from jar at " resources-jar " to " target-dir)
- (extract-dir-from-jar resources-jar RESOURCES-SUBDIR stormroot))
- url
- (do
- (log-message "Copying resources at " (URI. (str url)) " to " target-dir)
- (if (= (.getProtocol url) "jar" )
- (extract-dir-from-jar (.getFile (.getJarFileURL (.openConnection url))) RESOURCES-SUBDIR stormroot)
- (FileUtils/copyDirectory (File. (.getPath (URI. (str url)))) (File. target-dir)))
- )
- )
- )
- )))
-
-(defmethod mk-code-distributor :local [conf] nil)
+ :local [conf storm-id master-code-dir localizer]
+ (let [tmproot (str (supervisor-tmp-dir conf) file-path-separator (uuid))
+ stormroot (supervisor-stormdist-root conf storm-id)
+ blob-store (Utils/getNimbusBlobStore conf master-code-dir nil)]
+ (try
+ (FileUtils/forceMkdir (File. tmproot))
+ (.readBlobTo blob-store (master-stormcode-key storm-id) (FileOutputStream. (supervisor-stormcode-path tmproot)) nil)
+ (.readBlobTo blob-store (master-stormconf-key storm-id) (FileOutputStream. (supervisor-stormconf-path tmproot)) nil)
+ (finally
+ (.shutdown blob-store)))
+ (FileUtils/moveDirectory (File. tmproot) (File. stormroot))
+ (setup-storm-code-dir conf (read-supervisor-storm-conf conf storm-id) stormroot)
+ (let [classloader (.getContextClassLoader (Thread/currentThread))
+ resources-jar (resources-jar)
+ url (.getResource classloader RESOURCES-SUBDIR)
+ target-dir (str stormroot file-path-separator RESOURCES-SUBDIR)]
+ (cond
+ resources-jar
+ (do
+ (log-message "Extracting resources from jar at " resources-jar " to " target-dir)
+ (extract-dir-from-jar resources-jar RESOURCES-SUBDIR stormroot))
+ url
+ (do
+ (log-message "Copying resources at " (str url) " to " target-dir)
+ (FileUtils/copyDirectory (File. (.getFile url)) (File. target-dir)))))))
(defmethod launch-worker
:local [supervisor storm-id port worker-id mem-onheap]
http://git-wip-us.apache.org/repos/asf/storm/blob/7029aee5/storm-core/src/clj/backtype/storm/testing.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/backtype/storm/testing.clj b/storm-core/src/clj/backtype/storm/testing.clj
index 2c98b07..c552519 100644
--- a/storm-core/src/clj/backtype/storm/testing.clj
+++ b/storm-core/src/clj/backtype/storm/testing.clj
@@ -126,7 +126,8 @@
ZMQ-LINGER-MILLIS 0
TOPOLOGY-ENABLE-MESSAGE-TIMEOUTS false
TOPOLOGY-TRIDENT-BATCH-EMIT-INTERVAL-MILLIS 50
- STORM-CLUSTER-MODE "local"}
+ STORM-CLUSTER-MODE "local"
+ BLOBSTORE-SUPERUSER (System/getProperty "user.name")}
(if-not (contains? daemon-conf STORM-ZOOKEEPER-SERVERS)
{STORM-ZOOKEEPER-PORT zk-port
STORM-ZOOKEEPER-SERVERS ["localhost"]})
@@ -628,7 +629,7 @@
track-id (-> tracked-topology :cluster ::track-id)
waiting? (fn []
(or (not= target (global-amt track-id "spout-emitted"))
- (not= (global-amt track-id "transferred")
+ (not= (global-amt track-id "transferred")
(global-amt track-id "processed"))))]
(while-timeout timeout-ms (waiting?)
;; (println "Spout emitted: " (global-amt track-id "spout-emitted"))
http://git-wip-us.apache.org/repos/asf/storm/blob/7029aee5/storm-core/src/clj/backtype/storm/util.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/backtype/storm/util.clj b/storm-core/src/clj/backtype/storm/util.clj
index 9b22e70..cbe5bf9 100644
--- a/storm-core/src/clj/backtype/storm/util.clj
+++ b/storm-core/src/clj/backtype/storm/util.clj
@@ -22,6 +22,8 @@
(:import [backtype.storm Config])
(:import [backtype.storm.utils Time Container ClojureTimerTask Utils
MutableObject MutableInt])
+ (:import [backtype.storm.security.auth NimbusPrincipal])
+ (:import [javax.security.auth Subject])
(:import [java.util UUID Random ArrayList List Collections])
(:import [java.util.zip ZipFile])
(:import [java.util.concurrent.locks ReentrantReadWriteLock])
@@ -1099,7 +1101,19 @@
(assoc coll k (apply str (repeat (count (coll k)) "#")))
coll))
-(defn log-thrift-access [request-id remoteAddress principal operation]
+(defn log-thrift-access
+ [request-id remoteAddress principal operation]
(doto
(ThriftAccessLogger.)
(.log (str "Request ID: " request-id " access from: " remoteAddress " principal: " principal " operation: " operation))))
+
+(def DISALLOWED-KEY-NAME-STRS #{"/" "." ":" "\\"})
+
+(defn validate-key-name!
+ [name]
+ (if (some #(.contains name %) DISALLOWED-KEY-NAME-STRS)
+ (throw (RuntimeException.
+ (str "Key name cannot contain any of the following: " (pr-str DISALLOWED-KEY-NAME-STRS))))
+ (if (clojure.string/blank? name)
+ (throw (RuntimeException.
+ ("Key name cannot be blank"))))))
http://git-wip-us.apache.org/repos/asf/storm/blob/7029aee5/storm-core/src/clj/backtype/storm/zookeeper.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/backtype/storm/zookeeper.clj b/storm-core/src/clj/backtype/storm/zookeeper.clj
index 26def33..c91ffa4 100644
--- a/storm-core/src/clj/backtype/storm/zookeeper.clj
+++ b/storm-core/src/clj/backtype/storm/zookeeper.clj
@@ -114,6 +114,7 @@
(try-cause (.. zk (delete) (deletingChildrenIfNeeded) (forPath (normalize-path path)))
(catch KeeperException$NoNodeException e
;; do nothing
+ (log-message "exception" e)
)
(catch Exception e (throw (wrap-in-runtime e)))))))
@@ -129,7 +130,6 @@
))
)))
-
(defn sync-path
[^CuratorFramework zk ^String path]
(try
@@ -186,6 +186,19 @@
(.. zk (getChildren) (forPath (normalize-path path))))
(catch Exception e (throw (wrap-in-runtime e)))))
+(defn delete-node-blobstore
+ "Deletes the state inside the zookeeper for a key, for which the
+ contents of the key starts with nimbus host port information"
+ [^CuratorFramework zk ^String parent-path ^String host-port-info]
+ (let [parent-path (normalize-path parent-path)
+ child-path-list (if (exists-node? zk parent-path false)
+ (into [] (get-children zk parent-path false))
+ [])]
+ (doseq [child child-path-list]
+ (when (.startsWith child host-port-info)
+ (log-debug "delete-node " "child" child)
+ (delete-node zk (str parent-path "/" child))))))
+
(defn set-data
[^CuratorFramework zk ^String path ^bytes data]
(try
@@ -232,22 +245,10 @@
(defn leader-latch-listener-impl
"Leader latch listener that will be invoked when we either gain or lose leadership"
[conf zk leader-latch]
- (let [hostname (.getCanonicalHostName (InetAddress/getLocalHost))
- STORMS-ROOT (str (conf STORM-ZOOKEEPER-ROOT) "/storms")]
+ (let [hostname (.getCanonicalHostName (InetAddress/getLocalHost))]
(reify LeaderLatchListener
(^void isLeader[this]
- (log-message (str hostname " gained leadership, checking if it has all the topology code locally."))
- (let [active-topology-ids (set (get-children zk STORMS-ROOT false))
- local-topology-ids (set (.list (File. (master-stormdist-root conf))))
- diff-topology (first (set-delta active-topology-ids local-topology-ids))]
- (log-message "active-topology-ids [" (clojure.string/join "," active-topology-ids)
- "] local-topology-ids [" (clojure.string/join "," local-topology-ids)
- "] diff-topology [" (clojure.string/join "," diff-topology) "]")
- (if (empty? diff-topology)
- (log-message "Accepting leadership, all active topology found localy.")
- (do
- (log-message "code for all active topologies not available locally, giving up leadership.")
- (.close leader-latch)))))
+ (log-message (str hostname " gained leadership")))
(^void notLeader[this]
(log-message (str hostname " lost leadership."))))))
http://git-wip-us.apache.org/repos/asf/storm/blob/7029aee5/storm-core/src/jvm/backtype/storm/Config.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/Config.java b/storm-core/src/jvm/backtype/storm/Config.java
index 89422f6..b663dcb 100644
--- a/storm-core/src/jvm/backtype/storm/Config.java
+++ b/storm-core/src/jvm/backtype/storm/Config.java
@@ -1056,6 +1056,122 @@ public class Config extends HashMap<String, Object> {
public static final String SUPERVISOR_SLOTS_PORTS = "supervisor.slots.ports";
/**
+ * What blobstore implementation the supervisor should use.
+ */
+ @isString
+ public static final String SUPERVISOR_BLOBSTORE = "supervisor.blobstore.class";
+
+ /**
+ * The distributed cache target size in MB. This is a soft limit to the size of the distributed
+ * cache contents.
+ */
+ @isPositiveNumber
+ @isInteger
+ public static final String SUPERVISOR_LOCALIZER_CACHE_TARGET_SIZE_MB = "supervisor.localizer.cache.target.size.mb";
+
+ /**
+ * The distributed cache cleanup interval. Controls how often it scans to attempt to cleanup
+ * anything over the cache target size.
+ */
+ @isPositiveNumber
+ @isInteger
+ public static final String SUPERVISOR_LOCALIZER_CACHE_CLEANUP_INTERVAL_MS = "supervisor.localizer.cleanup.interval.ms";
+
+ /**
+ * What blobstore implementation the storm client should use.
+ */
+ @isString
+ public static final String CLIENT_BLOBSTORE = "client.blobstore.class";
+
+ /**
+ * What blobstore download parallelism the supervisor should use.
+ */
+ @isPositiveNumber
+ @isInteger
+ public static final String SUPERVISOR_BLOBSTORE_DOWNLOAD_THREAD_COUNT = "supervisor.blobstore.download.thread.count";
+
+ /**
+ * Maximum number of retries a supervisor is allowed to make for downloading a blob.
+ */
+ @isPositiveNumber
+ @isInteger
+ public static final String SUPERVISOR_BLOBSTORE_DOWNLOAD_MAX_RETRIES = "supervisor.blobstore.download.max_retries";
+
+ /**
+ * The blobstore super user has all read/write/admin permissions to all blobs - user running
+ * the blobstore.
+ */
+ @isString
+ public static final String BLOBSTORE_SUPERUSER = "blobstore.superuser";
+
+ /**
+ * What directory to use for the blobstore. The directory is expected to be an
+ * absolute path when using HDFS blobstore, for LocalFsBlobStore it could be either
+ * absolute or relative.
+ */
+ @isString
+ public static final String BLOBSTORE_DIR = "blobstore.dir";
+
+ /**
+ * What buffer size to use for the blobstore uploads.
+ */
+ @isPositiveNumber
+ @isInteger
+ public static final String STORM_BLOBSTORE_INPUTSTREAM_BUFFER_SIZE_BYTES = "storm.blobstore.inputstream.buffer.size.bytes";
+
+ /**
+ * Enable the blobstore cleaner. Certain blobstores may only want to run the cleaner
+ * on one daemon. Currently Nimbus handles setting this.
+ */
+ @isBoolean
+ public static final String BLOBSTORE_CLEANUP_ENABLE = "blobstore.cleanup.enable";
+
+ /**
+ * principal for nimbus/supervisor to use to access secure hdfs for the blobstore.
+ */
+ @isString
+ public static final String BLOBSTORE_HDFS_PRINCIPAL = "blobstore.hdfs.principal";
+
+ /**
+ * keytab for nimbus/supervisor to use to access secure hdfs for the blobstore.
+ */
+ @isString
+ public static final String BLOBSTORE_HDFS_KEYTAB = "blobstore.hdfs.keytab";
+
+ /**
+ * Set replication factor for a blob in HDFS Blobstore Implementation
+ */
+ @isPositiveNumber
+ @isInteger
+ public static final String STORM_BLOBSTORE_REPLICATION_FACTOR = "storm.blobstore.replication.factor";
+
+ /**
+ * What blobstore implementation nimbus should use.
+ */
+ @isString
+ public static final String NIMBUS_BLOBSTORE = "nimbus.blobstore.class";
+
+ /**
+ * During operations with the blob store, via master, how long a connection
+ * is idle before nimbus considers it dead and drops the session and any
+ * associated connections.
+ */
+ @isPositiveNumber
+ @isInteger
+ public static final String NIMBUS_BLOBSTORE_EXPIRATION_SECS = "nimbus.blobstore.expiration.secs";
+
+ /**
+ * A map with blobstore keys mapped to each filename the worker will have access to in the
+ * launch directory to the blob by local file name and uncompress flag. Both localname and
+ * uncompress flag are optional. It uses the key is localname is not specified. Each topology
+ * will have different map of blobs. Example: topology.blobstore.map: {"blobstorekey" :
+ * {"localname": "myblob", "uncompress": false}, "blobstorearchivekey" :
+ * {"localname": "myarchive", "uncompress": true}}
+ */
+ @CustomValidator(validatorClass = MapOfStringToMapOfStringToObjectValidator.class)
+ public static final String TOPOLOGY_BLOBSTORE_MAP = "topology.blobstore.map";
+
+ /**
* A number representing the maximum number of workers any single topology can acquire.
*/
@isInteger
@@ -1847,13 +1963,6 @@ public class Config extends HashMap<String, Object> {
public static final String TOPOLOGY_DISRUPTOR_BATCH_TIMEOUT_MILLIS="topology.disruptor.batch.timeout.millis";
/**
- * Which implementation of {@link backtype.storm.codedistributor.ICodeDistributor} should be used by storm for code
- * distribution.
- */
- @isString
- public static final String STORM_CODE_DISTRIBUTOR_CLASS = "storm.codedistributor.class";
-
- /**
* Minimum number of nimbus hosts where the code must be replicated before leader nimbus
* is allowed to perform topology activation tasks like setting up heartbeats/assignments
* and marking the topology as active. default is 0.
http://git-wip-us.apache.org/repos/asf/storm/blob/7029aee5/storm-core/src/jvm/backtype/storm/blobstore/AtomicOutputStream.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/blobstore/AtomicOutputStream.java b/storm-core/src/jvm/backtype/storm/blobstore/AtomicOutputStream.java
new file mode 100644
index 0000000..f35b7a7
--- /dev/null
+++ b/storm-core/src/jvm/backtype/storm/blobstore/AtomicOutputStream.java
@@ -0,0 +1,32 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package backtype.storm.blobstore;
+
+import java.io.IOException;
+import java.io.OutputStream;
+
+/**
+ * An output stream where all of the data is committed on close,
+ * or can be canceled with cancel.
+ */
+public abstract class AtomicOutputStream extends OutputStream {
+ /**
+ * Cancel all of the writes associated with this stream and close it.
+ */
+ public abstract void cancel() throws IOException;
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/7029aee5/storm-core/src/jvm/backtype/storm/blobstore/BlobKeySequenceInfo.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/blobstore/BlobKeySequenceInfo.java b/storm-core/src/jvm/backtype/storm/blobstore/BlobKeySequenceInfo.java
new file mode 100644
index 0000000..53cfa15
--- /dev/null
+++ b/storm-core/src/jvm/backtype/storm/blobstore/BlobKeySequenceInfo.java
@@ -0,0 +1,40 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package backtype.storm.blobstore;
+
+public class BlobKeySequenceInfo {
+ private String nimbusHostPort;
+ private String sequenceNumber;
+
+ public void setNimbusHostPort(String nimbusHostPort) {
+ this.nimbusHostPort = nimbusHostPort;
+ }
+
+ public void setSequenceNumber(String sequenceNumber) {
+ this.sequenceNumber = sequenceNumber;
+ }
+
+ public String getNimbusHostPort() {
+ return nimbusHostPort;
+ }
+
+ public String getSequenceNumber() {
+ return sequenceNumber;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/7029aee5/storm-core/src/jvm/backtype/storm/blobstore/BlobStore.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/blobstore/BlobStore.java b/storm-core/src/jvm/backtype/storm/blobstore/BlobStore.java
new file mode 100644
index 0000000..a714b76
--- /dev/null
+++ b/storm-core/src/jvm/backtype/storm/blobstore/BlobStore.java
@@ -0,0 +1,445 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package backtype.storm.blobstore;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.NoSuchElementException;
+import java.util.Set;
+import java.util.regex.Pattern;
+
+import javax.security.auth.Subject;
+
+import backtype.storm.nimbus.NimbusInfo;
+import org.apache.commons.lang.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import backtype.storm.daemon.Shutdownable;
+import backtype.storm.generated.AuthorizationException;
+import backtype.storm.generated.KeyNotFoundException;
+import backtype.storm.generated.KeyAlreadyExistsException;
+import backtype.storm.generated.ReadableBlobMeta;
+import backtype.storm.generated.SettableBlobMeta;
+
+/**
+ * Provides a way to store blobs that can be downloaded.
+ * Blobs must be able to be uploaded and listed from Nimbus,
+ * and downloaded from the Supervisors. It is a key value based
+ * store. Key being a string and value being the blob data.
+ *
+ * ACL checking must take place against the provided subject.
+ * If the blob store does not support Security it must validate
+ * that all ACLs set are always WORLD, everything.
+ *
+ * The users can upload their blobs through the blob store command
+ * line. The command line also allows us to update and delete blobs.
+ *
+ * Modifying the replication factor only works for HdfsBlobStore
+ * as for the LocalFsBlobStore the replication is dependent on
+ * the number of Nimbodes available.
+ */
+public abstract class BlobStore implements Shutdownable {
+ private static final Logger LOG = LoggerFactory.getLogger(BlobStore.class);
+ private static final Pattern KEY_PATTERN = Pattern.compile("^[\\w \\t\\.:_-]+$");
+ protected static final String BASE_BLOBS_DIR_NAME = "blobs";
+
+ /**
+ * Allows us to initialize the blob store
+ * @param conf The storm configuration
+ * @param baseDir The directory path to store the blobs
+ * @param nimbusInfo Contains the nimbus host, port and leadership information.
+ */
+ public abstract void prepare(Map conf, String baseDir, NimbusInfo nimbusInfo);
+
+ /**
+ * Creates the blob.
+ * @param key Key for the blob.
+ * @param meta Metadata which contains the acls information
+ * @param who Is the subject creating the blob.
+ * @return AtomicOutputStream returns a stream into which the data
+ * can be written.
+ * @throws AuthorizationException
+ * @throws KeyAlreadyExistsException
+ */
+ public abstract AtomicOutputStream createBlob(String key, SettableBlobMeta meta, Subject who) throws AuthorizationException, KeyAlreadyExistsException;
+
+ /**
+ * Updates the blob data.
+ * @param key Key for the blob.
+ * @param who Is the subject having the write privilege for the blob.
+ * @return AtomicOutputStream returns a stream into which the data
+ * can be written.
+ * @throws AuthorizationException
+ * @throws KeyNotFoundException
+ */
+ public abstract AtomicOutputStream updateBlob(String key, Subject who) throws AuthorizationException, KeyNotFoundException;
+
+ /**
+ * Gets the current version of metadata for a blob
+ * to be viewed by the user or downloaded by the supervisor.
+ * @param key Key for the blob.
+ * @param who Is the subject having the read privilege for the blob.
+ * @return AtomicOutputStream returns a stream into which the data
+ * can be written.
+ * @throws AuthorizationException
+ * @throws KeyNotFoundException
+ */
+ public abstract ReadableBlobMeta getBlobMeta(String key, Subject who) throws AuthorizationException, KeyNotFoundException;
+
+ /**
+ * Sets the metadata with renewed acls for the blob.
+ * @param key Key for the blob.
+ * @param meta Metadata which contains the updated
+ * acls information.
+ * @param who Is the subject having the write privilege for the blob.
+ * @throws AuthorizationException
+ * @throws KeyNotFoundException
+ */
+ public abstract void setBlobMeta(String key, SettableBlobMeta meta, Subject who) throws AuthorizationException, KeyNotFoundException;
+
+ /**
+ * Deletes the blob data and metadata.
+ * @param key Key for the blob.
+ * @param who Is the subject having write privilege for the blob.
+ * @throws AuthorizationException
+ * @throws KeyNotFoundException
+ */
+ public abstract void deleteBlob(String key, Subject who) throws AuthorizationException, KeyNotFoundException;
+
+ /**
+ * Gets the InputStream to read the blob details
+ * @param key Key for the blob.
+ * @param who Is the subject having the read privilege for the blob.
+ * @return InputStreamWithMeta has the additional
+ * file length and version information.
+ * @throws AuthorizationException
+ * @throws KeyNotFoundException
+ */
+ public abstract InputStreamWithMeta getBlob(String key, Subject who) throws AuthorizationException, KeyNotFoundException;
+
+ /**
+ * Returns an iterator with all the list of
+ * keys currently available on the blob store.
+ * @return Iterator<String>
+ */
+ public abstract Iterator<String> listKeys();
+
+ /**
+ * Gets the replication factor of the blob.
+ * @param key Key for the blob.
+ * @param who Is the subject having the read privilege for the blob.
+ * @return BlobReplication object containing the
+ * replication factor for the blob.
+ * @throws Exception
+ */
+ public abstract int getBlobReplication(String key, Subject who) throws Exception;
+
+ /**
+ * Modifies the replication factor of the blob.
+ * @param key Key for the blob.
+ * @param replication The replication factor the
+ * blob has to be set.
+ * @param who Is the subject having the update privilege for the blob
+ * @return BlobReplication object containing the
+ * updated replication factor for the blob.
+ * @throws AuthorizationException
+ * @throws KeyNotFoundException
+ * @throws IOException
+ */
+ public abstract int updateBlobReplication(String key, int replication, Subject who) throws AuthorizationException, KeyNotFoundException, IOException;
+
+ /**
+ * Filters keys based on the KeyFilter
+ * passed as the argument.
+ * @param filter KeyFilter
+ * @param <R> Type
+ * @return Set of filtered keys
+ */
+ public <R> Set<R> filterAndListKeys(KeyFilter<R> filter) {
+ Set<R> ret = new HashSet<R>();
+ Iterator<String> keys = listKeys();
+ while (keys.hasNext()) {
+ String key = keys.next();
+ R filtered = filter.filter(key);
+ if (filtered != null) {
+ ret.add(filtered);
+ }
+ }
+ return ret;
+ }
+
+ /**
+ * Validates key checking for potentially harmful patterns
+ * @param key Key for the blob.
+ */
+ public static final void validateKey(String key) {
+ if (StringUtils.isEmpty(key) || "..".equals(key) || ".".equals(key) || !KEY_PATTERN.matcher(key).matches()) {
+ LOG.error("'{}' does not appear to be valid {}", key, KEY_PATTERN);
+ throw new IllegalArgumentException(key+" does not appear to be a valid blob key");
+ }
+ }
+
+ /**
+ * Wrapper called to create the blob which contains
+ * the byte data
+ * @param key Key for the blob.
+ * @param data Byte data that needs to be uploaded.
+ * @param meta Metadata which contains the acls information
+ * @param who Is the subject creating the blob.
+ * @throws AuthorizationException
+ * @throws KeyAlreadyExistsException
+ * @throws IOException
+ */
+ public void createBlob(String key, byte [] data, SettableBlobMeta meta, Subject who) throws AuthorizationException, KeyAlreadyExistsException, IOException {
+ AtomicOutputStream out = null;
+ try {
+ out = createBlob(key, meta, who);
+ out.write(data);
+ out.close();
+ out = null;
+ } finally {
+ if (out != null) {
+ out.cancel();
+ }
+ }
+ }
+
+ /**
+ * Wrapper called to create the blob which contains
+ * the byte data
+ * @param key Key for the blob.
+ * @param in InputStream from which the data is read to be
+ * written as a part of the blob.
+ * @param meta Metadata which contains the acls information
+ * @param who Is the subject creating the blob.
+ * @throws AuthorizationException
+ * @throws KeyAlreadyExistsException
+ * @throws IOException
+ */
+ public void createBlob(String key, InputStream in, SettableBlobMeta meta, Subject who) throws AuthorizationException, KeyAlreadyExistsException, IOException {
+ AtomicOutputStream out = null;
+ try {
+ out = createBlob(key, meta, who);
+ byte[] buffer = new byte[2048];
+ int len = 0;
+ while ((len = in.read(buffer)) > 0) {
+ out.write(buffer, 0, len);
+ }
+ out.close();
+ } catch (AuthorizationException | IOException | RuntimeException e) {
+ if (out !=null) {
+ out.cancel();
+ }
+ } finally {
+ in.close();
+ }
+ }
+
+ /**
+ * Reads the blob from the blob store
+ * and writes it into the output stream.
+ * @param key Key for the blob.
+ * @param out Output stream
+ * @param who Is the subject having read
+ * privilege for the blob.
+ * @throws IOException
+ * @throws KeyNotFoundException
+ * @throws AuthorizationException
+ */
+ public void readBlobTo(String key, OutputStream out, Subject who) throws IOException, KeyNotFoundException, AuthorizationException {
+ InputStreamWithMeta in = getBlob(key, who);
+ if (in == null) {
+ throw new IOException("Could not find " + key);
+ }
+ byte[] buffer = new byte[2048];
+ int len = 0;
+ try{
+ while ((len = in.read(buffer)) > 0) {
+ out.write(buffer, 0, len);
+ }
+ } finally {
+ in.close();
+ out.flush();
+ }
+ }
+
+ /**
+ * Wrapper around readBlobTo which
+ * returns a ByteArray output stream.
+ * @param key Key for the blob.
+ * @param who Is the subject having
+ * the read privilege for the blob.
+ * @return ByteArrayOutputStream
+ * @throws IOException
+ * @throws KeyNotFoundException
+ * @throws AuthorizationException
+ */
+ public byte[] readBlob(String key, Subject who) throws IOException, KeyNotFoundException, AuthorizationException {
+ ByteArrayOutputStream out = new ByteArrayOutputStream();
+ readBlobTo(key, out, who);
+ return out.toByteArray();
+ }
+
+ /**
+ * Output stream implementation used for reading the
+ * metadata and data information.
+ */
+ protected class BlobStoreFileOutputStream extends AtomicOutputStream {
+ private BlobStoreFile part;
+ private OutputStream out;
+
+ public BlobStoreFileOutputStream(BlobStoreFile part) throws IOException {
+ this.part = part;
+ this.out = part.getOutputStream();
+ }
+
+ @Override
+ public void close() throws IOException {
+ try {
+ //close means commit
+ out.close();
+ part.commit();
+ } catch (IOException | RuntimeException e) {
+ cancel();
+ throw e;
+ }
+ }
+
+ @Override
+ public void cancel() throws IOException {
+ try {
+ out.close();
+ } finally {
+ part.cancel();
+ }
+ }
+
+ @Override
+ public void write(int b) throws IOException {
+ out.write(b);
+ }
+
+ @Override
+ public void write(byte []b) throws IOException {
+ out.write(b);
+ }
+
+ @Override
+ public void write(byte []b, int offset, int len) throws IOException {
+ out.write(b, offset, len);
+ }
+ }
+
+ /**
+ * Input stream implementation used for writing
+ * both the metadata containing the acl information
+ * and the blob data.
+ */
+ protected class BlobStoreFileInputStream extends InputStreamWithMeta {
+ private BlobStoreFile part;
+ private InputStream in;
+
+ public BlobStoreFileInputStream(BlobStoreFile part) throws IOException {
+ this.part = part;
+ this.in = part.getInputStream();
+ }
+
+ @Override
+ public long getVersion() throws IOException {
+ return part.getModTime();
+ }
+
+ @Override
+ public int read() throws IOException {
+ return in.read();
+ }
+
+ @Override
+ public int read(byte[] b, int off, int len) throws IOException {
+ return in.read(b, off, len);
+ }
+
+ @Override
+ public int read(byte[] b) throws IOException {
+ return in.read(b);
+ }
+
+ @Override
+ public int available() throws IOException {
+ return in.available();
+ }
+
+ @Override
+ public long getFileLength() throws IOException {
+ return part.getFileLength();
+ }
+ }
+
+ /**
+ * Blob store implements its own version of iterator
+ * to list the blobs
+ */
+ public static class KeyTranslationIterator implements Iterator<String> {
+ private Iterator<String> it = null;
+ private String next = null;
+ private String prefix = null;
+
+ public KeyTranslationIterator(Iterator<String> it, String prefix) throws IOException {
+ this.it = it;
+ this.prefix = prefix;
+ primeNext();
+ }
+
+ private void primeNext() {
+ next = null;
+ while (it.hasNext()) {
+ String tmp = it.next();
+ if (tmp.startsWith(prefix)) {
+ next = tmp.substring(prefix.length());
+ return;
+ }
+ }
+ }
+
+ @Override
+ public boolean hasNext() {
+ return next != null;
+ }
+
+ @Override
+ public String next() {
+ if (!hasNext()) {
+ throw new NoSuchElementException();
+ }
+ String current = next;
+ primeNext();
+ return current;
+ }
+
+ @Override
+ public void remove() {
+ throw new UnsupportedOperationException("Delete Not Supported");
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/7029aee5/storm-core/src/jvm/backtype/storm/blobstore/BlobStoreAclHandler.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/blobstore/BlobStoreAclHandler.java b/storm-core/src/jvm/backtype/storm/blobstore/BlobStoreAclHandler.java
new file mode 100644
index 0000000..c0c4e5c
--- /dev/null
+++ b/storm-core/src/jvm/backtype/storm/blobstore/BlobStoreAclHandler.java
@@ -0,0 +1,399 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package backtype.storm.blobstore;
+
+import backtype.storm.Config;
+import backtype.storm.generated.AccessControl;
+import backtype.storm.generated.AccessControlType;
+import backtype.storm.generated.AuthorizationException;
+import backtype.storm.generated.SettableBlobMeta;
+import backtype.storm.security.auth.AuthUtils;
+import backtype.storm.security.auth.IPrincipalToLocal;
+import backtype.storm.security.auth.NimbusPrincipal;
+import org.apache.commons.lang.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.security.auth.Subject;
+import java.security.Principal;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * Provides common handling of acls for Blobstores.
+ * Also contains some static utility functions related to Blobstores.
+ */
+public class BlobStoreAclHandler {
+ public static final Logger LOG = LoggerFactory.getLogger(BlobStoreAclHandler.class);
+ private final IPrincipalToLocal _ptol;
+
+ public static final int READ = 0x01;
+ public static final int WRITE = 0x02;
+ public static final int ADMIN = 0x04;
+ public static final List<AccessControl> WORLD_EVERYTHING =
+ Arrays.asList(new AccessControl(AccessControlType.OTHER, READ | WRITE | ADMIN));
+ public static final List<AccessControl> DEFAULT = new ArrayList<AccessControl>();
+ private Set<String> _supervisors;
+ private Set<String> _admins;
+
+ public BlobStoreAclHandler(Map conf) {
+ _ptol = AuthUtils.GetPrincipalToLocalPlugin(conf);
+ _supervisors = new HashSet<String>();
+ _admins = new HashSet<String>();
+ if (conf.containsKey(Config.NIMBUS_SUPERVISOR_USERS)) {
+ _supervisors.addAll((List<String>)conf.get(Config.NIMBUS_SUPERVISOR_USERS));
+ }
+ if (conf.containsKey(Config.NIMBUS_ADMINS)) {
+ _admins.addAll((List<String>)conf.get(Config.NIMBUS_ADMINS));
+ }
+ }
+
+ private static AccessControlType parseACLType(String type) {
+ if ("other".equalsIgnoreCase(type) || "o".equalsIgnoreCase(type)) {
+ return AccessControlType.OTHER;
+ } else if ("user".equalsIgnoreCase(type) || "u".equalsIgnoreCase(type)) {
+ return AccessControlType.USER;
+ }
+ throw new IllegalArgumentException(type+" is not a valid access control type");
+ }
+
+ private static int parseAccess(String access) {
+ int ret = 0;
+ for (char c: access.toCharArray()) {
+ if ('r' == c) {
+ ret = ret | READ;
+ } else if ('w' == c) {
+ ret = ret | WRITE;
+ } else if ('a' == c) {
+ ret = ret | ADMIN;
+ } else if ('-' == c) {
+ //ignored
+ } else {
+ throw new IllegalArgumentException("");
+ }
+ }
+ return ret;
+ }
+
+ public static AccessControl parseAccessControl(String str) {
+ String[] parts = str.split(":");
+ String type = "other";
+ String name = "";
+ String access = "-";
+ if (parts.length > 3) {
+ throw new IllegalArgumentException("Don't know how to parse "+str+" into an ACL value");
+ } else if (parts.length == 1) {
+ type = "other";
+ name = "";
+ access = parts[0];
+ } else if (parts.length == 2) {
+ type = "user";
+ name = parts[0];
+ access = parts[1];
+ } else if (parts.length == 3) {
+ type = parts[0];
+ name = parts[1];
+ access = parts[2];
+ }
+ AccessControl ret = new AccessControl();
+ ret.set_type(parseACLType(type));
+ ret.set_name(name);
+ ret.set_access(parseAccess(access));
+ return ret;
+ }
+
+ private static String accessToString(int access) {
+ StringBuilder ret = new StringBuilder();
+ ret.append(((access & READ) > 0) ? "r" : "-");
+ ret.append(((access & WRITE) > 0) ? "w" : "-");
+ ret.append(((access & ADMIN) > 0) ? "a" : "-");
+ return ret.toString();
+ }
+
+ public static String accessControlToString(AccessControl ac) {
+ StringBuilder ret = new StringBuilder();
+ switch(ac.get_type()) {
+ case OTHER:
+ ret.append("o");
+ break;
+ case USER:
+ ret.append("u");
+ break;
+ default:
+ throw new IllegalArgumentException("Don't know what a type of "+ac.get_type()+" means ");
+ }
+ ret.append(":");
+ if (ac.is_set_name()) {
+ ret.append(ac.get_name());
+ }
+ ret.append(":");
+ ret.append(accessToString(ac.get_access()));
+ return ret.toString();
+ }
+
+ public static void validateSettableACLs(String key, List<AccessControl> acls) throws AuthorizationException {
+ Set<String> aclUsers = new HashSet<>();
+ List<String> duplicateUsers = new ArrayList<>();
+ for (AccessControl acl : acls) {
+ String aclUser = acl.get_name();
+ if (!StringUtils.isEmpty(aclUser) && !aclUsers.add(aclUser)) {
+ LOG.error("'{}' user can't appear more than once in the ACLs", aclUser);
+ duplicateUsers.add(aclUser);
+ }
+ }
+ if (duplicateUsers.size() > 0) {
+ String errorMessage = "user " + Arrays.toString(duplicateUsers.toArray())
+ + " can't appear more than once in the ACLs for key [" + key +"].";
+ throw new AuthorizationException(errorMessage);
+ }
+ }
+
+ private Set<String> constructUserFromPrincipals(Subject who) {
+ Set<String> user = new HashSet<String>();
+ if (who != null) {
+ for (Principal p : who.getPrincipals()) {
+ user.add(_ptol.toLocal(p));
+ }
+ }
+ return user;
+ }
+
+ private boolean isAdmin(Subject who) {
+ Set<String> user = constructUserFromPrincipals(who);
+ for (String u : user) {
+ if (_admins.contains(u)) {
+ return true;
+ }
+ }
+ return false;
+ }
+
+ private boolean isReadOperation(int operation) {
+ if (operation == 1) {
+ return true;
+ }
+ return false;
+ }
+
+ private boolean isSupervisor(Subject who, int operation) {
+ Set<String> user = constructUserFromPrincipals(who);
+ if (isReadOperation(operation)) {
+ for (String u : user) {
+ if (_supervisors.contains(u)) {
+ return true;
+ }
+ }
+ }
+ return false;
+ }
+
+ private boolean isNimbus(Subject who) {
+ Set<Principal> principals;
+ boolean isNimbusInstance = false;
+ if (who != null) {
+ principals = who.getPrincipals();
+ for (Principal principal : principals) {
+ if (principal instanceof NimbusPrincipal) {
+ isNimbusInstance = true;
+ }
+ }
+ }
+ return isNimbusInstance;
+ }
+
+ public boolean checkForValidUsers(Subject who, int mask) {
+ return isNimbus(who) || isAdmin(who) || isSupervisor(who,mask);
+ }
+
+ /**
+ * The user should be able to see the metadata if and only if they have any of READ, WRITE, or ADMIN
+ */
+ public void validateUserCanReadMeta(List<AccessControl> acl, Subject who, String key) throws AuthorizationException {
+ hasAnyPermissions(acl, (READ|WRITE|ADMIN), who, key);
+ }
+
+ /**
+ * Validates if the user has any of the permissions
+ * mentioned in the mask.
+ * @param acl ACL for the key.
+ * @param mask mask holds the cumulative value of
+ * READ = 1, WRITE = 2 or ADMIN = 4 permissions.
+ * mask = 1 implies READ privilege.
+ * mask = 5 implies READ and ADMIN privileges.
+ * @param who Is the user against whom the permissions
+ * are validated for a key using the ACL and the mask.
+ * @param key Key used to identify the blob.
+ * @throws AuthorizationException
+ */
+ public void hasAnyPermissions(List<AccessControl> acl, int mask, Subject who, String key) throws AuthorizationException {
+ Set<String> user = constructUserFromPrincipals(who);
+ LOG.debug("user {}", user);
+ if (checkForValidUsers(who, mask)) {
+ return;
+ }
+ for (AccessControl ac : acl) {
+ int allowed = getAllowed(ac, user);
+ LOG.debug(" user: {} allowed: {} key: {}", user, allowed, key);
+ if ((allowed & mask) > 0) {
+ return;
+ }
+ }
+ throw new AuthorizationException(
+ user + " does not have access to " + key);
+ }
+
+ /**
+ * Validates if the user has at least the set of permissions
+ * mentioned in the mask.
+ * @param acl ACL for the key.
+ * @param mask mask holds the cumulative value of
+ * READ = 1, WRITE = 2 or ADMIN = 4 permissions.
+ * mask = 1 implies READ privilege.
+ * mask = 5 implies READ and ADMIN privileges.
+ * @param who Is the user against whom the permissions
+ * are validated for a key using the ACL and the mask.
+ * @param key Key used to identify the blob.
+ * @throws AuthorizationException
+ */
+ public void hasPermissions(List<AccessControl> acl, int mask, Subject who, String key) throws AuthorizationException {
+ Set<String> user = constructUserFromPrincipals(who);
+ LOG.debug("user {}", user);
+ if (checkForValidUsers(who, mask)) {
+ return;
+ }
+ for (AccessControl ac : acl) {
+ int allowed = getAllowed(ac, user);
+ mask = ~allowed & mask;
+ LOG.debug(" user: {} allowed: {} disallowed: {} key: {}", user, allowed, mask, key);
+ }
+ if (mask == 0) {
+ return;
+ }
+ throw new AuthorizationException(
+ user + " does not have " + namedPerms(mask) + " access to " + key);
+ }
+
+ public void normalizeSettableBlobMeta(String key, SettableBlobMeta meta, Subject who, int opMask) {
+ meta.set_acl(normalizeSettableACLs(key, meta.get_acl(), who, opMask));
+ }
+
+ private String namedPerms(int mask) {
+ StringBuilder b = new StringBuilder();
+ b.append("[");
+ if ((mask & READ) > 0) {
+ b.append("READ ");
+ }
+ if ((mask & WRITE) > 0) {
+ b.append("WRITE ");
+ }
+ if ((mask & ADMIN) > 0) {
+ b.append("ADMIN ");
+ }
+ b.append("]");
+ return b.toString();
+ }
+
+ private int getAllowed(AccessControl ac, Set<String> users) {
+ switch (ac.get_type()) {
+ case OTHER:
+ return ac.get_access();
+ case USER:
+ if (users.contains(ac.get_name())) {
+ return ac.get_access();
+ }
+ return 0;
+ default:
+ return 0;
+ }
+ }
+
+ private List<AccessControl> removeBadACLs(List<AccessControl> accessControls) {
+ List<AccessControl> resultAcl = new ArrayList<AccessControl>();
+ for (AccessControl control : accessControls) {
+ if(control.get_type().equals(AccessControlType.OTHER) && (control.get_access() == 0 )) {
+ LOG.debug("Removing invalid blobstore world ACL " +
+ BlobStoreAclHandler.accessControlToString(control));
+ continue;
+ }
+ resultAcl.add(control);
+ }
+ return resultAcl;
+ }
+
+ private final List<AccessControl> normalizeSettableACLs(String key, List<AccessControl> acls, Subject who,
+ int opMask) {
+ List<AccessControl> cleanAcls = removeBadACLs(acls);
+ Set<String> userNames = getUserNamesFromSubject(who);
+ for (String user : userNames) {
+ fixACLsForUser(cleanAcls, user, opMask);
+ }
+ if ((who == null || userNames.isEmpty()) && !worldEverything(acls)) {
+ cleanAcls.addAll(BlobStoreAclHandler.WORLD_EVERYTHING);
+ LOG.debug("Access Control for key {} is normalized to world everything {}", key, cleanAcls);
+ if (!acls.isEmpty())
+ LOG.warn("Access control for blob with key {} is normalized to WORLD_EVERYTHING", key);
+ }
+ return cleanAcls;
+ }
+
+ private boolean worldEverything(List<AccessControl> acls) {
+ boolean isWorldEverything = false;
+ for (AccessControl acl : acls) {
+ if (acl.get_type() == AccessControlType.OTHER && acl.get_access() == (READ|WRITE|ADMIN)) {
+ isWorldEverything = true;
+ break;
+ }
+ }
+ return isWorldEverything;
+ }
+
+ private void fixACLsForUser(List<AccessControl> acls, String user, int mask) {
+ boolean foundUserACL = false;
+ for (AccessControl control : acls) {
+ if (control.get_type() == AccessControlType.USER && control.get_name().equals(user)) {
+ int currentAccess = control.get_access();
+ if ((currentAccess & mask) != mask) {
+ control.set_access(currentAccess | mask);
+ }
+ foundUserACL = true;
+ break;
+ }
+ }
+ if (!foundUserACL) {
+ AccessControl userACL = new AccessControl();
+ userACL.set_type(AccessControlType.USER);
+ userACL.set_name(user);
+ userACL.set_access(mask);
+ acls.add(userACL);
+ }
+ }
+
+ private Set<String> getUserNamesFromSubject(Subject who) {
+ Set<String> user = new HashSet<String>();
+ if (who != null) {
+ for(Principal p: who.getPrincipals()) {
+ user.add(_ptol.toLocal(p));
+ }
+ }
+ return user;
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/7029aee5/storm-core/src/jvm/backtype/storm/blobstore/BlobStoreFile.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/blobstore/BlobStoreFile.java b/storm-core/src/jvm/backtype/storm/blobstore/BlobStoreFile.java
new file mode 100644
index 0000000..22ccf97
--- /dev/null
+++ b/storm-core/src/jvm/backtype/storm/blobstore/BlobStoreFile.java
@@ -0,0 +1,50 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package backtype.storm.blobstore;
+
+import backtype.storm.generated.SettableBlobMeta;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.regex.Pattern;
+
+/**
+ * Provides an base implementation for creating a blobstore based on file backed storage.
+ */
+public abstract class BlobStoreFile {
+ public static final Logger LOG = LoggerFactory.getLogger(BlobStoreFile.class);
+
+ protected static final String TMP_EXT = ".tmp";
+ protected static final Pattern TMP_NAME_PATTERN = Pattern.compile("^\\d+\\" + TMP_EXT + "$");
+ protected static final String BLOBSTORE_DATA_FILE = "data";
+
+ public abstract void delete() throws IOException;
+ public abstract String getKey();
+ public abstract boolean isTmp();
+ public abstract void setMetadata(SettableBlobMeta meta);
+ public abstract SettableBlobMeta getMetadata();
+ public abstract long getModTime() throws IOException;
+ public abstract InputStream getInputStream() throws IOException;
+ public abstract OutputStream getOutputStream() throws IOException;
+ public abstract void commit() throws IOException;
+ public abstract void cancel() throws IOException;
+ public abstract long getFileLength() throws IOException;
+}