You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@storm.apache.org by "Rick Kellogg (JIRA)" <ji...@apache.org> on 2015/10/09 02:53:27 UTC

[jira] [Updated] (STORM-155) Storm rebalancing code causes multiple topologies assigned to a single port

     [ https://issues.apache.org/jira/browse/STORM-155?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Rick Kellogg updated STORM-155:
-------------------------------
    Component/s: storm-core

> Storm rebalancing code causes multiple topologies assigned to a single port
> ---------------------------------------------------------------------------
>
>                 Key: STORM-155
>                 URL: https://issues.apache.org/jira/browse/STORM-155
>             Project: Apache Storm
>          Issue Type: Bug
>          Components: storm-core
>            Reporter: James Xu
>            Assignee: Masatake Iwasaki
>             Fix For: 0.9.3
>
>
> https://github.com/nathanmarz/storm/issues/551
> We're seeing an issue when rebalancing topologies on the clusters causes workers being assigned to multiple topologies which causes supervisors to fail. This can be easily reproduced locally by starting a single supervisor with 4 workers and nimbus, running several topologies and rebalancing all of them to use 1 worker.
> I tracked the issue to the mk-assignments function in nimbus.clj. In this function, the "existing-assignments" binding is assigned a list of all topologies with assignments except the one being rebalanced. The comment implies this is done to treat all the workers of the topology being rebalanced as unused, and that's what actually happens. However, the lack of the topology being rebalanced in the "existing-assignments" list causes this topology being ignore completely by scheduler and other code, so as the result all workers assigned to that topology will be taken over by other topologies, but
> no changes to the topology being rebalanced will be made, effective making all it's workers
> assigned to 2 topologies.
> I think that was not the intended behavior. I made a small change to treat all the workers of the topology being rebalanced dead instead, causing them to be reassigned fairly between all the topologies. This seems to work well and reliably for us.
> Let me know what do you think.
> --- a/src/clj/backtype/storm/daemon/nimbus.clj
> +++ b/src/clj/backtype/storm/daemon/nimbus.clj
> @@ -437,8 +437,11 @@
>    (into {} (for [[tid assignment] existing-assignments
>                   :let [topology-details (.getById topologies tid)
>                         all-executors (topology->executors tid)
> +                       ;; for the topology which wants rebalance (specified by the scratch-topology-id)
> +                       ;; we consider all its execturors to be dead so they will be treated
> +                       ;; as free slots in the scheduler code.
>                         alive-executors (if (and scratch-topology-id (= scratch-topology-id tid))
> -                                         all-executors
> +                                         (set nil)
>                                           (set (alive-executors nimbus topology-details all-executors assignment)))]]
>               {tid alive-executors})))
> @@ -638,11 +641,7 @@
>          ;; read all the assignments
>          assigned-topology-ids (.assignments storm-cluster-state nil)
>          existing-assignments (into {} (for [tid assigned-topology-ids]
> -                                        ;; for the topology which wants rebalance (specified by the scratch-topology-id)
> -                                        ;; we exclude its assignment, meaning that all the slots occupied by its assignment
> -                                        ;; will be treated as free slot in the scheduler code.
> -                                        (when (or (nil? scratch-topology-id) (not= tid scratch-topology-id))
> -                                          {tid (.assignment-info storm-cluster-state tid nil)})))
> +                                        {tid (.assignment-info storm-cluster-state tid nil)}))
>          ;; make the new assignments for topologies
>          topology->executor->node+port (compute-new-topology->executor->node+port
>                                         nimbus
> ----------
> xumingming: The following code needs to be synchonized(mk-assignments):
> (defn do-rebalance [nimbus storm-id status]
>   (.update-storm! (:storm-cluster-state nimbus)
>                   storm-id
>                   (assoc-non-nil
>                     {:component->executors (:executor-overrides status)}
>                     :num-workers
>                     (:num-workers status)))
>   (mk-assignments nimbus :scratch-topology-id storm-id))
> otherwise it will cause race condition here: https://github.com/nathanmarz/storm/blob/master/storm-core/src/jvm/backtype/storm/scheduler/Cluster.java#L264
> then one port will be assigned to multiple topologies.
> ----------
> stass: That's not the issue here. I initially thought it was a race condition as well, but my problem with rebalancing was in the sequential part of the algorithm (as described in analysis). Needless to say, it fixed the issue for us and I have not seen any multiple assignments in months when running with the patch I submitted.
> ----------
> d2r: We are also hitting this issue and would welcome a fix.
> ----------
> nathanmarz: The problem with the proposed patch is that it doesn't treat the slots of the topology that's rebalancing as free. It will certainly consider its executors as dead, but it won't make use of its slots during re-scheduling.
> ----------
> stass: Hi, Nathan, thanks for comment.
> If my reading of the code is right, the workers of the topology being rebalanced will be treated as free since they will be marked as dead (that's the purpose of the top hunk of the diff), and the scheduling code only looks at free workers when assigning workers to topologies. That is also what I observed in practice with this patch running.
> Am I missing something?
> ----------
> vinceyang: @nathanmarz , We are also hitting this issue , I read the nimbus code,it seems when do rebalance , in mk-assignment function,the rebalancing topolgy's assignment info has out of date , but supervisor not kown this information. when other topolgy's assignment has the same port with the out of date assignment the problem occur. if we remove the out of date assignment in ZK this problem will not occur. if my Idea is OK,I will work on it to fix this issue.
> ----------
> revans2: We have hit this issue too, and so I have been looking into it. It seems that it can happen in two different situations.
> First a topology is not assigned anything after it previously had slots assigned to it.
> This happens most commonly when re-balancing because the scheduler is not aware the rebalanced topology had anything assigned to it previously, but I have been able to reproduce this with other hacked up schedulers.
> When this happens the supervisor in question will crash continuously until one of the topologies is killed.
> The fix seems to be that we should include assigned-topology-ids in topology->executor->node+port when missing but with the topology pointing to nil.
> Second the supervisor uses partially written scheduling data from ZK.
> (.set-assignment! storm-cluster-state topology-id assignment) is atomic for a single topology, but not for multiple topologies. This means that the supervisor can read data from ZK that has had some topologies updated, but not all of them.
> When this happens the supervisor will crash and then come back up and recover because the rest of the scheduling data was written to ZK.
> The fix for this seems to be that we need to "lock" zookeeper with a watch file during the update. The supervisors would not read the data until nimbus is done updating. I don't think this is as critical to fix because the supervisor recovers fairly quickly.
> Does my analysis seem correct? I don't understand all of the code perfectly, so I want to be sure I am not missing something.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)