You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by pt...@apache.org on 2015/03/20 22:01:21 UTC

[16/50] [abbrv] storm git commit: STORM-166 Nimbus HA design documentation and sequence diagram.

STORM-166 Nimbus HA design documentation and sequence diagram.


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/07b69b7b
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/07b69b7b
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/07b69b7b

Branch: refs/heads/0.11.x-branch
Commit: 07b69b7b86828b4213873405323c0e9be03934f1
Parents: dd991e5
Author: Parth Brahmbhatt <br...@gmail.com>
Authored: Fri Dec 19 12:52:30 2014 -0800
Committer: Parth Brahmbhatt <br...@gmail.com>
Committed: Fri Dec 19 12:52:30 2014 -0800

----------------------------------------------------------------------
 .../nimbus_ha_leader_election_and_failover.png  | Bin 0 -> 154316 bytes
 .../images/nimbus_ha_topology_submission.png    | Bin 0 -> 134180 bytes
 docs/documentation/nimbus-ha-design.md          | 201 +++++++++++++++++++
 3 files changed, 201 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/07b69b7b/docs/documentation/images/nimbus_ha_leader_election_and_failover.png
----------------------------------------------------------------------
diff --git a/docs/documentation/images/nimbus_ha_leader_election_and_failover.png b/docs/documentation/images/nimbus_ha_leader_election_and_failover.png
new file mode 100644
index 0000000..60cc1b7
Binary files /dev/null and b/docs/documentation/images/nimbus_ha_leader_election_and_failover.png differ

http://git-wip-us.apache.org/repos/asf/storm/blob/07b69b7b/docs/documentation/images/nimbus_ha_topology_submission.png
----------------------------------------------------------------------
diff --git a/docs/documentation/images/nimbus_ha_topology_submission.png b/docs/documentation/images/nimbus_ha_topology_submission.png
new file mode 100644
index 0000000..7707e5a
Binary files /dev/null and b/docs/documentation/images/nimbus_ha_topology_submission.png differ

http://git-wip-us.apache.org/repos/asf/storm/blob/07b69b7b/docs/documentation/nimbus-ha-design.md
----------------------------------------------------------------------
diff --git a/docs/documentation/nimbus-ha-design.md b/docs/documentation/nimbus-ha-design.md
new file mode 100644
index 0000000..51b15f8
--- /dev/null
+++ b/docs/documentation/nimbus-ha-design.md
@@ -0,0 +1,201 @@
+#Highly Available Nimbus design proposal
+##Problem Statement:
+Currently the storm master aka nimbus, is a process that runs on a single machine under supervision. In most cases the 
+nimbus failure is transient and it is restarted by the supervisor. However sometimes when disks fail and networks 
+partitions occur, nimbus goes down. Under these circumstances the topologies run normally but no new topologies can be 
+submitted, no existing topologies can be killed/deactivated/activated and if a supervisor node fails then the 
+reassignments are not performed resulting in performance degradation or topology failures. With this project we intend 
+to resolve this problem by running nimbus in a primary backup mode to guarantee that even if a nimbus server fails one 
+of the backups will take over.
+##Requirements:
+* Increase overall availability of nimbus.
+* Allow nimbus hosts to leave and join the cluster at will any time. A newly joined host should auto catch up and join 
+the list of potential leaders automatically. 
+* No topology resubmissions required in case of nimbus fail overs.
+* No active topology should ever be lost. 
+##Components:
+Following are different components to achieve the above goals.
+###Leader Election:
+The nimbus server will use the following interface:
+
+```java
+public interface ILeaderElector {
+    /**
+     * queue up for leadership lock. The call returns immediately and the caller                     
+     * must check isLeader() to perform any leadership action.
+     */
+    void addToLeaderLockQueue();
+
+    /**
+     * Removes the caller from the leader lock queue. If the caller is leader
+     * also releases the lock.
+     */
+    void removeFromLeaderLockQueue();
+
+    /**
+     *
+     * @return true if the caller currently has the leader lock.
+     */
+    boolean isLeader();
+
+    /**
+     *
+     * @return the current leader's address , throws exception if noone has has    lock.
+     */
+    InetSocketAddress getLeaderAddress();
+
+    /**
+     * 
+     * @return list of current nimbus addresses, includes leader.
+     */
+    List<InetSocketAddress> getAllNimbusAddresses();
+}
+```
+On startup nimbus will check if it has code for all active topologies available locally. Once it gets to this state it 
+will call addToLeaderLockQueue() function. When a nimbus is notified to become a leader it will check if it has all the
+code locally before assuming the leadership role. If any active topology code is missing, the node will not accept the 
+leadership role instead it will release the lock and wait till it has all the code before requeueing for leader lock. 
+
+The first implementation will be Zookeeper based. If the zookeeper connection is lost/resetted resulting in loss of lock
+or the spot in queue the implementation will take care of updating the state such that isLeader() will reflect the 
+current status.The leader like actions must finish in less than minimumOf(connectionTimeout, SessionTimeout) to ensure
+the lock was held by nimbus for the entire duration of the action (Not sure if we want to just state this expectation 
+and ensure that zk configurations are set high enough which will result in higher failover time or we actually want to 
+create some sort of rollback mechanism for all actions, the second option needs a lot of code). If a nimbus that is not 
+leader receives a request that only a leader can perform it will throw a RunTimeException.
+
+Following steps describes a nimbus failover scenario:
+* Let’s say we have 4 topologies running with 3 nimbus nodes and code-replication-factor = 2. We assume that the 
+invariant “The leader nimbus has code for all topologies locally” holds true at the beginning. nonleader-1 has code for 
+the first 2 topologies and nonLeader-2 has code for the other 2 topologies.
+* Leader nimbus dies, hard disk failure so no recovery possible.
+* nonLeader-1 gets a zookeeper notification to indicate it is now the new leader. before accepting the leadership it 
+checks if it has code available for all 4 topologies(these are topologies under /storm/storms/). It realizes it only has
+code for 2 topologies so it relinquishes the lock and looks under  /storm/code-distributor/topologyId to find out from 
+where can it download the code/metafile for the missing topologies. it finds entries for the leader nimbus and 
+nonleader-2. It will try downloading from both as part of its retry mechanism.
+* nonLeader-2’s code sync thread also realizes that it is missing code for 2 topologies and follows the same process 
+described in step-3 to download code for missing topologies. 
+* eventually at least one of the nimbuses will have all the code locally and will accept leadership.
+This sequence diagram describes how leader election and failover would work with multiple components.
+
+![Nimbus Fail Over](images/nimbus_ha_leader_election_and_failover.png)
+
+###Nimbus state store:
+
+Currently the nimbus stores 2 kind of data
+* Meta information like supervisor info, assignment info which is stored in zookeeper
+* Actual topology configs and jars that is stored on nimbus host’s local disk.
+
+To achieve fail over from primary to backup servers nimbus state/data needs to be replicated across all nimbus hosts or 
+needs to be stored in a distributed storage. Replicating the data correctly involves state management, consistency checks
+and it is hard to test for correctness.However many storm users do not want to take extra dependency on another replicated
+storage system like HDFS and still need high availability.Eventually, we want to move to the bittorrent protocol for code 
+distribution given the size of the jars and to achieve better scaling when the total number of supervisors is very high. 
+The current file system based model for code distribution works fine with systems that have file system like structure
+but it fails to support a non file system based approach like bit torrent. To support bit torrent we can go with the 
+following interface instead of the storage interface described above. The interface described below can still be used with
+HDFS,S3 and local file system, so this is a more extensible interface. 
+```java
+/**
+ * Interface responsible to distribute code in the cluster.
+ */
+public interface ICodeDistributor {
+    /**
+     * Prepare this code distributor.
+     * @param conf
+     */
+    void prepare(Map conf);
+
+    /**
+     * This API will perform the actual upload of the code to the distributed implementation.
+     * The API should return a Meta file which should have enough information for downloader 
+     * so it can download the code e.g. for bittorrent it will be a torrent file, in case of something         
+     * like HDFS or s3  it might have the actual directory or paths for files to be downloaded.
+     * @param dirPath local directory where all the code to be distributed exists.
+     * @param topologyId the topologyId for which the meta file needs to be created.
+     * @return metaFile
+     */
+    File upload(Path dirPath, String topologyId);
+
+    /**
+     * Given the topologyId and metafile, download the actual code and return the downloaded file's list.
+     * @param topologyid
+     * @param metafile 
+     * @param destDirPath the folder where all the files will be downloaded.
+     * @return
+     */
+    List<File> download(Path destDirPath, String topologyid, File metafile);
+
+    /**
+      * Given the topologyId, returns number of hosts where the code has been replicated.
+      */
+    int getReplicationCount(String topologyId);
+    
+   /**
+     * Performs the cleanup.
+     * @param topologyid
+     */
+    void cleanup(String topologyid);
+
+    /**
+     * Close this distributor.
+     * @param conf
+     */
+    void close(Map conf);
+}
+```
+To support replication we will allow the user to define a code replication factor which would reflect number of nimbus 
+hosts to which the code must be replicated before starting the topology. With replication comes the issue of consistency. 
+We will treat zookeeper’s list of active topologies as our authority for topologies for which the code must exist on a 
+nimbus host. Any nimbus host that does not have all the code for all the topologies which are marked as active in zookeeper 
+will relinquish it’s lock so some other nimbus host could become leader. A background thread on all nimbus host will 
+continuously try to sync code from other hosts where the code was successfully replicated so eventually at least one nimbus 
+will accept leadership as long as at least one seed hosts exists for each active topology. 
+		
+Following steps describe code replication amongst nimbus hosts for a topology:
+* When client uploads jar, nothing changes.
+* When client submits a topology, leader nimbus calls code distributor’s upload function which will create a metafile stored 
+locally on leader nimbus. Leader nimbus will write new entries under /storm/code-distributor/topologyId to notify all 
+nonleader nimbuses that they should download this new code.
+* We wait on the leader nimbus to ensure at least N non leader nimbus has the code replicated, with a user configurable timeout.
+* When a non leader nimbus receives the notification about new code, it downloads the meta file from leader nimbus and then
+downloads the real code by calling code distributor’s download function with metafile as input.
+* Once non leader finishes downloading code, it will write an entry under /storm/code-distributor/topologyId to indicate 
+it is one of the possible places to download the code/metafile in case the leader nimbus dies.
+* leader nimbus goes ahead and does all the usual things it does as part of submit topologies.
+
+The following sequence diagram describes the communication between different components involved in code distribution.
+
+![Nimbus HA Topology Submission](images/nimbus_ha_topology_submission.png)
+
+##Thrift and Rest API 
+
+This section only exists to track and document how we can reduce the added load on zookeeper for nimbus discovery if the 
+performance numbers indicated any degradation. The actual implementation will not be part of nimbus HA unless we have 
+performance tests to indicate degradation.  
+
+In order to avoid workers/supervisors/ui talking to zookeeper for getting master nimbus address we can add following new API:
+
+```java
+/**
+* Returns list of all nimbus hosts that are either currently in queue or has
+* the leadership lock.
+*/
+List<NimbusInfo> getNimbusHosts();
+
+/**
+* NimbusInfo
+*/
+Class NimbusInfo {
+	String host;
+	short port;
+	boolean isLeader;
+}
+```
+
+These apis will be used by StormSubmitter, Nimbus clients,supervisors and ui to discover the current leaders and participating 
+nimbus hosts. Any nimbus host will be able to respond to these requests. The nimbus hosts can read this information once 
+from zookeeper and cache it and keep updating the cache when the watchers are fired to indicate any changes,which should be 
+rare in general case. In addition we should update all the existing thrift and rest apis’s to throw redirect 
+exceptions when a non leader receives a request that only a leader should serve.