You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@storm.apache.org by "troy ding (JIRA)" <ji...@apache.org> on 2014/12/10 04:14:14 UTC

[jira] [Created] (STORM-588) Executor-Level Rebalance Mechanism

troy ding created STORM-588:
-------------------------------

             Summary: Executor-Level Rebalance Mechanism
                 Key: STORM-588
                 URL: https://issues.apache.org/jira/browse/STORM-588
             Project: Apache Storm
          Issue Type: Improvement
    Affects Versions: 0.10.0, 0.9.3-rc2
            Reporter: troy ding
            Assignee: troy ding


I. The motivation

The current rebalance mechanism is implemented on the worker level. When rebalance operation is triggered (e.g. by adding/removing a worker), storm kills all the workers with different assignment. It means the rebalance operation has to kill certain running workers and launches them according to the new assignment. The advantage of the mechanism is the simplicity of the implementation, but possibly incurs _huge_ overhead. Actually, the restarting latency is usually more than one second, making the system almost impossible to recover under high incoming data stream rate. No system administrator dares to call rebalance, especially when the system is overloaded! To bring back the real benefits of rebalancing operation, we believe it is important to address the following problems:

*1. Resource wastage and additional initialization cost*: In most cases, the changes on worker’s assignment (if not killed) only affect a small fraction of running executors on it. Only part of them needs to be migrated or created, while the remaining can keep running on the same worker. The current implementation, however, forcefully restarts all the executors, and calls unnecessary initializations (i.e. call Bolt.prepare() and Spout.prepare()) to most of the running tasks. It not only wastes the computation resources of unaffected executors, but also amplifies the initialization costs under certain condition, e.g. index load in the bolt.

*2. Restarting workers causes avoidable in-memory data loss*: Currently, a supervisor uses “kill -9” command to kill its correspondent worker. Consequently, all the tasks on this worker have no chance to save the task data. The running states of the workers, including important information when resuming its duty, are simply lost, potentially causing unnecessary recomputation on the states.

*3. JVM restart cost, long duration and lost of HotSpot optimizations*: Restarting a JVM involves a long initialization procedure, and loses all the runtime optimizations available for the application byte-code. As far as we know, the HotSpot JVM is capable of detecting the performance-critical sections in the code and dynamically translates the Java byte codes of these hot spots into native machine code. In particular, tasks that are CPU-bound can greatly benefit from this feature. If we directly kill the worker, all the advantages of these features are lost.

II. Proposed solutions

1. At the supervisor side:
The current supervisor implementation periodically calls the “sync-processes” function to check whether a live worker should be killed: (1) the mapping relationship between the worker and the topology has changed (e.g. this worker is re-assigned to another topology or the serving topology is killed); (2) the worker’s assignment has updated (e.g. the parallelism of some bolts increases/decreases). 

In order to reuse the worker’s JVM instance as much as possible, we propose that we do not kill the workers mentioned in condition (2), but only kill those that do not belong to the topology anymore (condition (1)).

2. At the worker side: 
Because of the reuse of the JVM instance, workers needs to periodically synchronize its assigned executors. To achieve this, a new thread which is similar to the existing “refresh-connections” is launched, to kill the non-existing executors, and to start newly assigned ones. Note that, in practice, the “refresh-connections“ threads already retrieves the assignment information from the ZK, and this information can be shared with this new thread, which reduce the load of the ZK.

Due to the change of the binging from the running executors to the worker, re-routing tuple is also required. To fulfill this prepose, we need to rewrite the following two functions, “transfer-local-fn” and “transfer-fn” (note the rewrite is compulsive because these two functions are immutable in the current implementation). 

Another function needs careful modification is “WorkerTopologyContext.getThisWorkerTasks()”, because the (defn- mk-grouper … :local-or-shuffle) in “excutor.clj” depends on this function to get required context information. Therefore, in the case that an end user calls “WorkerTopologyContext.getThisWorkerTasks()” in the “prepare()”, and stores the results, if the executor has not restarted, using these results may potentially leads to inconsistency.

In summary, we propose this new executor-level rebalance mechanism, which tries to maximize the resource usage and minimize the rebalance cost. This is essential for the whole system, especially important for the the ultimate purpose on elasticity features for Storm.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)