You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by Apache Wiki <wi...@apache.org> on 2011/01/02 01:42:00 UTC

[Cassandra Wiki] Update of "FileFormatDesignDoc" by StuHood

Dear Wiki user,

You have subscribed to a wiki page or wiki category on "Cassandra Wiki" for change notification.

The "FileFormatDesignDoc" page has been changed by StuHood.
http://wiki.apache.org/cassandra/FileFormatDesignDoc?action=diff&rev1=5&rev2=6

--------------------------------------------------

+ = File Format Design Doc =
- ## page was copied from ArchitectureGossip
- = Gossiper =
  
- The gossiper is responsible for making sure every node in the system eventually knows important information about every other node's state, including those that are unreachable or not yet in the cluster when any given state change occurs.
+ This page describes a planned replacement file format for SSTables.
  
- == API ==
+ == Requirements ==
  
- Information to gossip is wrapped in an !ApplicationState object, which is essentially a key/value pair.  (See "Data Structures" below for more detail.)  The gossiper propagates these to other nodes, where interested classes subscribe to changes via the !IEndPointStateChangeSubscriber interface.  This provides onJoin, onAlive, and onDead methods indicating the obvious things, and onChange for !ApplicationState changes.  onChange is called once for each !ApplicationState.  There are two non-obvious properties to this:
-  1. If a node makes multiple changes to a given !ApplicationState key, other nodes are guaranteed to see the most recent one but not intermediate ones
-  1. There is no provision for deleting an !ApplicationState entirely
+ * Compression
+ * Space efficient when un-compressed: remove redundancy
+ * Random access to the middle of wide rows
+ * Arbitrary nesting
  
- == Gossiper implementation ==
+ == Influences ==
  
- Gossip timer task runs every second. During each of these runs the node initiates gossip exchange according to following rules:
+ * Google Dremel - Arbitrarily nested, field-oriented serialization - http://sergey.melnix.com/pub/melnik_VLDB10.pdf
+ * Hive RCFile - Column-group-oriented storage - http://www.cse.ohio-state.edu/hpcs/WWW/HTML/publications/papers/TR-11-4.pdf
  
+ == Current Implementation ==
-  1. Gossip to random live endpoint (if any)
-  1. Gossip to random unreachable endpoint with certain probability depending on number of unreachable and live nodes
-  1. If the node gossiped to at (1) was not seed, or the number of live nodes is less than number of seeds, gossip to random seed with certain probability depending on number of unreachable, seed and live nodes.
  
- These rules were developed to ensure that if the network is up, all nodes will eventually know about all other nodes.  (Clearly, if each node only contacts one seed and then gossips only to random nodes it knows about, you can have partitions when there are multiple seeds -- each seed will only know about a subset of the nodes in the cluster.  Step 3 avoids this and more subtle problems.)
+ To avoid overloading words too heavily, we will call a series of ordered rows in an SSTable a 'span'. Since SSTables contain data for a single column family in sorted order, spans share these properties as well. One way to visualize a simple span with depth 3 (aka, containing super columns) is to arrange it as a table:
  
- This way a node initiates gossip exchange with one to three nodes every round (or zero if it is alone in the cluster)
+ || row key || name1  || name2 || value ||
+ || cheese  || brie || flavor || 3.4   ||
+ || cheese  || gouda || flavor  || 5.6   ||
+ || cheese  || gouda || origin  || france   ||
+ || cheese  || swiss || flavor || 2.6 ||
+ || fruit   || apple || flavor || 4.2 ||
+ || fruit   || pear  || flavor || 4.9 ||
+ || fruit   || pear  || origin || china ||
  
+ This representation of a span involves a lot of redundancy in representing whether the parent of a particular column has changed. If we remove that particular redundancy, we get a tree structure:
- == Data structures ==
- ==== HeartBeatState ====
- Consists of generation and version number. Generation stays the same when server is running and grows every time the node is started. Used for distinguishing state information before and after a node restart. Version number is shared with application states and guarantees ordering. Each node has one HeartBeatState associated with it.
  
- ==== ApplicationState ====
- Consists of state and version number and represents a state of single "component" or "element" within Cassandra. For instance application state for "load information" could be (5.2, 45), which means that node load is 5.2 at version 45. Similarly a node that is bootstrapping would have "bootstrapping" application state: (bxLpassF3XD8Kyks, 56) where first one is bootstrap token, and the second is version. Version number is shared by application states and HeartBeatState to guarantee ordering and can only grow.
+ || row key || name1  || name2 || value ||
+ || cheese  || brie || flavor || 3.4   ||
+ ||         || gouda || flavor  || 5.6   ||
+ ||         ||       || origin  || france   ||
+ ||         || swiss || flavor || 2.6 ||
+ || fruit   || apple || flavor || 4.2 ||
+ ||         || pear  || flavor || 4.9 ||
+ ||         ||       || origin || china ||
  
+ The current implementation of SSTables lays data out on disk in approximately this way: data for rows is stored contiguously, and one must seek to the "root" of the tree for a row in order to read the row index and determine where the next level of the tree is stored. But only the first level of the tree is indexed: in order to find a particular column at the level labeled "name2", you would need to deserialize all columns at that level.
- ==== EndPointState ====
- Includes all ApplicationStates and HeartBeatState for certain endpoint (node). EndPointState can include only one of each type of ApplicationState, so if EndPointState already includes, say, load information, new load information will overwrite the old one. ApplicationState version number guarantees that old value will not overwrite new one.
  
+ Additionally, there is a second type of redundancy that we do not tackle at the moment: the column names at level "name2" are frequently repeated, but since rows are stored independently, we don't normalize those values. For narrow rows (like those shown), removing this redundancy will be our largest win.
- ==== endPointStateMap ====
- Internal structure in Gossiper that has EndPointState for all nodes (including itself) that it has heard about.
  
+ == High level ==
- == Gossip Exchange ==
- === GossipDigestSynMessage ===
- Node starting gossip exchange sends GossipDigestSynMessage, which includes a list of gossip digests. A single gossip digest consists of endpoint address, generation number and maximum version that has been seen for the endpoint. In this context, maximum version number is the biggest version number in EndPointState for this endpoint. An example to illustrate this better:
  
- Suppose that node 10.0.0.1 has following information in its endPointStateMap (remember that endPointStateMap includes also node itself):
+ Because we will be storing multiple columns per SSTable, our design will bear the most similarity to RCFile (rather than the column-per-file approach taken in Dremel). But because we allow for nesting via super columns, we need to take hints from Dremel's serialization to allow for efficient storage of parent and null information.
  
+ == Schema ==
- {{{
- EndPointState 10.0.0.1
-   HeartBeatState: generation 1259909635, version 325
-   ApplicationState "load-information": 5.2, generation 1259909635, version 45
-   ApplicationState "bootstrapping": bxLpassF3XD8Kyks, generation 1259909635, version 56
-   ApplicationState "normal": bxLpassF3XD8Kyks, generation 1259909635, version 87
- EndPointState 10.0.0.2
-   HeartBeatState: generation 1259911052, version 61
-   ApplicationState "load-information": 2.7, generation 1259911052, version 2
-   ApplicationState "bootstrapping": AujDMftpyUvebtnn, generation 1259911052, version 31
- EndPointState 10.0.0.3
-   HeartBeatState: generation 1259912238, version 5
-   ApplicationState "load-information": 12.0, generation 1259912238, version 3
- EndPointState 10.0.0.4
-   HeartBeatState: generation 1259912942, version 18
-   ApplicationState "load-information": 6.7, generation 1259912942, version 3
-   ApplicationState "normal": bj05IVc0lvRXw2xH, generation 1259912942, version 7
- }}}
- In this case max version number for these endpoints are 325, 61, 5 and 18 respectively. A gossip digest for endpoint 10.0.0.2 would be "10.0.0.2:1259911052:61" and essentially says "AFAIK endpoint 10.0.0.2 is running generation 1259911052 and maximum version is 61". When the node sends GossipDigestSynMessage, there will be exactly one gossip digest per known endpoint. That is, in this case GossipDigestSynMessage contents would be: "10.0.0.1:1259909635:325 10.0.0.2:1259911052:61 10.0.0.3:1259912238:5 10.0.0.4:1259912942:18". HeartBeatState version number is not necessarily always the biggest, but that is the most common situation by far.
  
+ To iterate quickly, the initial versions of the file format will be implemented using Avro.
- === Main code pointers: ===
- {{{
- Gossiper.GossipTimerTask.run: Main gossiper loop
- Gossiper.makeRandomGossipDigest: Constructs gossip digest list to be used in GossipDigestSynMessage
- Gossiper.makeGossipDigestSynMessage: Constructs GossipDigestSynMessage from a list of gossip digests
- }}}
  
- 
- == GossipDigestAckMessage ==
- A node receiving GossipDigestSynMessage will examine it and reply with GossipDigestAckMessage, which includes _two_ parts: gossip digest list and endpoint state list. From the gossip digest list arriving in GossipDigestSynMessage we will know for each endpoint whether the sending node has newer or older information than we do. An example to illustrate this:
- 
- Suppose that we're now in node 10.0.0.2 and our endPointState is as follows:
- 
- {{{
- EndPointState 10.0.0.1
-   HeartBeatState: generation 1259909635, version 324
-   ApplicationState "load-information": 5.2, generation 1259909635, version 45
-   ApplicationState "bootstrapping": bxLpassF3XD8Kyks, generation 1259909635, version 56
-   ApplicationState "normal": bxLpassF3XD8Kyks, generation 1259909635, version 87
- EndPointState 10.0.0.2
-   HeartBeatState: generation 1259911052, version 63
-   ApplicationState "load-information": 2.7, generation 1259911052, version 2
-   ApplicationState "bootstrapping": AujDMftpyUvebtnn, generation 1259911052, version 31
-   ApplicationState "normal": AujDMftpyUvebtnn, generation 1259911052, version 62
- EndPointState 10.0.0.3
-   HeartBeatState: generation 1259812143, version 2142
-   ApplicationState "load-information": 16.0, generation 1259812143, version 1803
-   ApplicationState "normal": W2U1XYUC3wMppcY7, generation 1259812143, version 6
- }}}
- Remember that the arriving gossip digest list is: "10.0.0.1:1259909635:325 10.0.0.2:1259911052:61 10.0.0.3:1259912238:5 10.0.0.4:1259912942:18". When the receiving end is handling this, following steps are done:
- 
- ==== Sort gossip digest list ====
- Sort gossip digest list according to the difference in max version number between sender's digest and our own information in descending order. That is, handle those digests first that differ mostly in version number. Number of endpoint information that fits in one gossip message is limited. This step is to guarantee that we favor sending information about nodes where information difference is biggest (sending node has very old information compared to us).
- 
- ==== Examine gossip digest list ====
- At this stage we go through the arriving gossip digest list and construct the two parts of GossipDigestAckMessage mentioned above (gossip digest list and endpoint state list). Let us go through the example digest one by one:
- 
- '''10.0.0.1:1259909635:325''' In our own endPointStateMap the generation is the same, so 10.0.0.1 has not rebooted since we have last heard of it. Version number in the digest is bigger than our max version number (325 > 324), so we have to ask the sender what has happened since version 324. For this purpose we include a gossip digest 10.0.0.1:1259909635:324, which says "I know about 10.0.0.1 only until generation 1259909635, version 324, please tell me anything that is newer than this".
- 
- '''10.0.0.2:1259911052:61''' When examining this, we notice that we know more than the sender about 10.0.0.2 (generations match, but our version is bigger 63 > 61). Sender's max version is 61, so we look for any states that are newer than this. As we can see from the endPointStateMap, there are two: Application state "normal" (version 62) and HeartBeatState (version 63). We send these ApplicationStates to the sender. Please note that in this case we are not sending digests, as digest only tells the maximum version number. In this case we already know that there is difference, so we will send full ApplicationStates.
- 
- '''10.0.0.3:1259912238:5''' In this case generations do not match. Our generation is smaller than the arriving, so 10.0.0.3 must have rebooted. We will ask all data from the sender for generation 1259912238 starting from smallest version number 0. That is, we insert gossip digest 10.0.0.3:1259912238:0 to the reply.
- 
- '''10.0.0.4:1259912942:18''' We do not know anything about this endpoint, so we proceed in the same manner as 10.0.0.3 and ask for all information. Insert digest 10.0.0.4:1259912942:0 to the reply.
- 
- At this point we have constructed GossipDigestAckMessage, which includes following information:
- 
- {{{
- 10.0.0.1:1259909635:324
- 10.0.0.3:1259912238:0
- 10.0.0.4:1259912942:0
- 10.0.0.2:[ApplicationState "normal": AujDMftpyUvebtnn, generation 1259911052, version 62], [HeartBeatState, generation 1259911052, version 63]
- }}}
- We now send this GossipAckMessage to the sender of GossipSynMessage
- 
- === Main Code Pointers: ===
- {{{
- GossipDigestSynVerbHandler.doVerb: Main function for handling GossipDigestSynMessage
- GossipDigestSynVerbHandler.doSort: Sorts gossip digest list
- Gossiper.examineGossiper: Examine gossip digest list
- Gossiper.makeGossipDigestAckMessage: Constructs GossipDigestAckMessage from a list of gossip digests
- }}}
- [[GossipDigestSynVerbHandler|]]
- 
- == GossipDigestAck2Message ==
- Rest of gossip process here....
-