You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2013/09/09 05:20:03 UTC

svn commit: r1520956 - /kafka/site/08/design.html

Author: junrao
Date: Mon Sep  9 03:20:03 2013
New Revision: 1520956

URL: http://svn.apache.org/r1520956
Log:
minor changes to 0.8 replication design doc

Modified:
    kafka/site/08/design.html

Modified: kafka/site/08/design.html
URL: http://svn.apache.org/viewvc/kafka/site/08/design.html?rev=1520956&r1=1520955&r2=1520956&view=diff
==============================================================================
--- kafka/site/08/design.html (original)
+++ kafka/site/08/design.html Mon Sep  9 03:20:03 2013
@@ -37,7 +37,7 @@ This style of pagecache-centric design i
 
 <h4>Constant Time Suffices</h4>
 <p>
-The persistent data structure used in messaging systems are often a per-consumer queue with an associated BTree or other general-purpose random access data structureto maintain metadata about messages. BTrees are the most versatile data structure available, and make it possible to support a wide variety of transactional and non-transactional semantics in the messaging system. They do come with a fairly high cost, though: Btree operations are O(log N). Normally O(log N) is considered essentially equivalent to constant time, but this is not true for disk operations. Disk seeks come at 10 ms a pop, and each disk can do only one seek at a time so parallelism is limited. Hence even a handful of disk seeks leads to very high overhead. Since storage systems mix very fast cached operations with very slow physical disk operations, the observed performance of tree structures is often superlinear as data increases with fixed cache--i.e. doubling your data makes things much worse then twice as 
 slow.
+The persistent data structure used in messaging systems are often a per-consumer queue with an associated BTree or other general-purpose random access data structures to maintain metadata about messages. BTrees are the most versatile data structure available, and make it possible to support a wide variety of transactional and non-transactional semantics in the messaging system. They do come with a fairly high cost, though: Btree operations are O(log N). Normally O(log N) is considered essentially equivalent to constant time, but this is not true for disk operations. Disk seeks come at 10 ms a pop, and each disk can do only one seek at a time so parallelism is limited. Hence even a handful of disk seeks leads to very high overhead. Since storage systems mix very fast cached operations with very slow physical disk operations, the observed performance of tree structures is often superlinear as data increases with fixed cache--i.e. doubling your data makes things much worse then twice a
 s slow.
 <p>
 Intuitively a persistent queue could be built on simple reads and appends to files as is commonly the case with logging solutions. This structure has the advantage that all operations are O(1) and reads do not block writes or each other. This has obvious performance advantages since the performance is completely decoupled from the data size&mdash;one server can now take full advantage of a number of cheap, low-rotational speed 1+TB SATA drives. Though they have poor seek performance, these drives have acceptable performance for large reads and writes and come at 1/3 the price and 3x the capacity.
 <p>
@@ -160,7 +160,7 @@ Now let's describe the semantics from th
   <li>So what about exactly once semantics (i.e. the thing you actually want)? The limitation here is not actually a feature of the messaging system but rather the need to co-ordinate the consumers position with what is actually stored as output. The classic way of achieving this would be to introduce a two-phase commit between the storage for the consumer position and the storage of the consumers output. But this can be handled more simply and generally by simply letting the consumer store its offset in the same place as its output. This is better because many of the output systems a consumer might want to write to will not support a two-phase commit. As example of this our Hadoop ETL that populates data in HDFS stores its offsets in HDFS with the data it reads so that it is guaranteed that either data and offsets are both updated or neither is. We follow similar patterns for many other data systems which require these stronger semantics and for which the messages do not have a pri
 mary key to allow for deduplication.
 </ol>
 <p>
-So effectively Kafka guarantees at-least-once delivery by default and allows the user to implement at most once delivery by disabling retries on the producer and commiting its offset prior to processing a batch of messages. Exactly-once delivery requires co-operation with the destination storage system but Kafka gives the offset which makes implementing this straight-forward.
+So effectively Kafka guarantees at-least-once delivery by default and allows the user to implement at most once delivery by disabling retries on the producer and committing its offset prior to processing a batch of messages. Exactly-once delivery requires co-operation with the destination storage system but Kafka gives the offset which makes implementing this straight-forward.
 
 <h3><a id="replication">4.7 Replication</a></h3>
 <p>
@@ -168,7 +168,7 @@ Kafka replicates the log for each topic'
 <p>
 Other messaging systems provide some replication-related features, but, in our (totally biased) opinion, this appears to be a tacked-on thing, not heavily used, and with large downsides: slaves are inactive, throughput is heavily impacted, it requires fiddly manual configuration, etc. Kafka is meant to be used with replication by default&mdash;in fact we implement un-replicated topics as replicated topics where the replication factor is one.
 <p>
-The unit of replication is the topic partition. Under non-failure conditions each partition Kafka has a single leader and zero or more followers. We call the total number of replicas including the leader the replication factor. All reads and writes go to the leader of the partition; each node is the leader for a share of partitions and a follower for others. The logs on the followers are identical to the leader's log&mdash;all have the same offsets and messages in the same order (though, of course, at any given time the leader may have a few as-yet unreplicated messages at the end of its log).
+The unit of replication is the topic partition. Under non-failure conditions, each partition Kafka has a single leader and zero or more followers. We call the total number of replicas including the leader the replication factor. All reads and writes go to the leader of the partition. Typically, there are many more partitions than brokers and the leaders are evenly distributed among brokers. The logs on the followers are identical to the leader's log&mdash;all have the same offsets and messages in the same order (though, of course, at any given time the leader may have a few as-yet unreplicated messages at the end of its log).
 <p>
 Followers consume messages from the leader just as a normal Kafka consumer would and apply them to their own log. Having the followers pull from the leader has the nice property of allowing the follower to naturally batch together log entries they are applying to their log.
 <p>
@@ -177,11 +177,11 @@ As with most distributed systems automat
 	<li>A node must be able to maintain its session with Zookeeper (via Zookeeper's heartbeat mechanism)
 	<li>If it is a slave it must replicate the writes happening on the leader and not fall "too far" behind
 </ol>
-We refer to nodes satisfying these two conditions as being "in sync" to avoid the vagueness of "alive" or "failed". The leader keeps track of the set of "in sync" nodes. If a consumer dies or falls behind, the leader will remove it from the list of in sync replicas. The definition of how far behind is too far is controlled by the replica.lag.max.messages configuration.
+We refer to nodes satisfying these two conditions as being "in sync" to avoid the vagueness of "alive" or "failed". The leader keeps track of the set of "in sync" nodes. If a follower dies or falls behind, the leader will remove it from the list of in sync replicas. The definition of how far behind is too far is controlled by the replica.lag.max.messages configuration.
 <p>
 In distributed systems terminology we only attempt to handle a "fail/recover" model of failures where nodes suddenly cease working and then later recover (perhaps without knowing that they have died). Kafka does not handle so-called "Byzantine" failures in which nodes produce arbitrary or malicious responses (perhaps due to bugs or foul play).
 <p>
-A message is considered "committed" when all in sync replicas for that partition have applied it to their log. Only committed messages are ever given out to the consumer. This means that the consumer need not worry about potentially seeing a message that could be lost if the leader fails. Producers, on the other hand, have the option of either waiting for the message to be committed or not, depending on their preference for latency and durability. This preference is controlled by the request.required.acks setting the producer uses.
+A message is considered "committed" when all in sync replicas for that partition have applied it to their log. Only committed messages are ever given out to the consumer. This means that the consumer need not worry about potentially seeing a message that could be lost if the leader fails. Producers, on the other hand, have the option of either waiting for the message to be committed or not, depending on their preference for latency and durability. This preference is controlled by the request.required.acks setting that the producer uses.
 <p>
 The guarantee that Kafka offers is that a committed message will not be lost as long as a single in sync replica remains.
 <p>
@@ -191,42 +191,42 @@ Kafka will remain available in the prese
 
 At it's heart a Kafka partition is a replicated log. The replicated log is one of the most basic primitives in distributed data systems, and there are many approaches to implementation. A replicated log can be used by other systems as a primitive for implementing other distributed systems in the <a href="http://en.wikipedia.org/wiki/State_machine_replication">state-machine style</a>.
 <p>
-A replicated log models the process of coming into consensus on the order of a series of values (generally numbering the log entries 0, 1, 2, ...). There are many ways to implement this, but the simplest and fastest is with a leader who chooses the ordering of values provided to it. As long as the leader remains alive and all followers need only copy the values and ordering it chooses.
+A replicated log models the process of coming into consensus on the order of a series of values (generally numbering the log entries 0, 1, 2, ...). There are many ways to implement this, but the simplest and fastest is with a leader who chooses the ordering of values provided to it. As long as the leader remains alive and all followers need only copy the values and ordering the leader chooses.
 <p>
 Of course if leaders didn't fail we wouldn't need followers! When the leader does die we need to choose a new leader from among the followers. But followers themselves may fall behind or crash so we must ensure we choose an up-to-date follower. The fundamental guarantee a log replication algorithm must provide is that if we tell the client a message is committed, and the leader fails, the new leader we elect must also have that message. This yields a tradeoff: if the leader waits for more followers to acknowledge a message before declaring it committed then there will be more potentially electable leaders.
 <p>
-If you choose the number of acknowledgements required and the number of logs that must be compared to elect a leader such that there is guaranteed to be an overlap then this is called a Quorum.
+If you choose the number of acknowledgements required and the number of logs that must be compared to elect a leader such that there is guaranteed to be an overlap, then this is called a Quorum.
 <p>
-A common approach to this tradeoff is to use a majority vote for both the commit decision and the leader election. This is not what Kafka does, but let's explore it anyway to understand the tradeoffs. Let's say we want to tolerate <i>f</i> failures. If 2<i>f</i>+1 servers must replicate a message prior to a commit being declared by the leader, and if we elect a new leader by electing the follower with the most complete log from at least 2<i>f</i>+1 replicas, then, with no more than <i>f</i> failures there must be at least one server in common between those that committed the message and the servers that were available to take over as leader and hence the message will be preserved.  There are many remaining details that each algorithm must handle (such as precisely defined what makes a log more complete, ensuring log consistency during leader failure or changing the set of servers in the replica set) but we will ignore these for now.
+A common approach to this tradeoff is to use a majority vote for both the commit decision and the leader election. This is not what Kafka does, but let's explore it anyway to understand the tradeoffs. Let's say we have 2<i>f</i>+1 replicas. If <i>f</i>+1 replicas must receive a message prior to a commit being declared by the leader, and if we elect a new leader by electing the follower with the most complete log from at least <i>f</i>+1 replicas, then, with no more than <i>f</i> failures, the leader is guaranteed to have all committed messages. This is because among any <i>f</i>+1 replicas, there must be at least one replica that contains all committed messages. That replica's log will be the most complete and therefore will be selected as the new leader. There are many remaining details that each algorithm must handle (such as precisely defined what makes a log more complete, ensuring log consistency during leader failure or changing the set of servers in the replica set) but we wi
 ll ignore these for now.
 <p>
 This majority vote approach has a very nice property: the latency is dependent on only the fastest servers. That is, if the replication factor is three, the latency is determined by the faster slave not the slower one.
 <p>
 There are a rich variety of algorithms in this family including Zookeeper's <a href="http://www.stanford.edu/class/cs347/reading/zab.pdf">Zab</a>, <a href="https://ramcloud.stanford.edu/wiki/download/attachments/11370504/raft.pdf">Raft</a>, and <a href="http://pmg.csail.mit.edu/papers/vr-revisited.pdf">Viewstamped Replication</a>. The most similar academic publication we are aware of to Kafka's actual implementation is <a href="http://research.microsoft.com/apps/pubs/default.aspx?id=66814">PacificA</a> from Microsoft.
 <p>
-The downside of majority vote is that it doesn't take many failures to leave you with no electable leaders. To tolerate one failure requires three copies of the data, and to tolerate two failures requires five copies of the data. In our experience having only enough redundancy to tolerate a single failure is not enough for a practical system, but doing every write five times, with 5x the disk space requirements and 1/5th the throughput, is not very practical for large volume data problems. This is likely why quorum algorithms more commonly appear for shared cluster configuration such as Zookeeper but are less common for primary data storage. For example in HDFS the namenode's high-availability feature is built on a <a href="http://blog.cloudera.com/blog/2012/10/quorum-based-journaling-in-cdh4-1">majority-vote-based journal</a> but this more expensive approach is not used for the data itself.
+The downside of majority vote is that it doesn't take many failures to leave you with no electable leaders. To tolerate one failure requires three copies of the data, and to tolerate two failures requires five copies of the data. In our experience having only enough redundancy to tolerate a single failure is not enough for a practical system, but doing every write five times, with 5x the disk space requirements and 1/5th the throughput, is not very practical for large volume data problems. This is likely why quorum algorithms more commonly appear for shared cluster configuration such as Zookeeper but are less common for primary data storage. For example in HDFS the namenode's high-availability feature is built on a <a href="http://blog.cloudera.com/blog/2012/10/quorum-based-journaling-in-cdh4-1">majority-vote-based journal</a>, but this more expensive approach is not used for the data itself.
 <p>
-Kafka takes a slightly different approach to choosing its quorum set. Instead of majority vote Kafka dynamically maintains a set of in-sync replicas that are caught-up to the leader. Only members of this set are eligible for election as leader. A write to a Kafka partition is not considered committed until <i>all</i> in-sync replicas have received the write. This ISR set is persisted to zookeeper whenever it changes. Because of this, any replica in the in-sync set is eligible to be elected leader. This is an important factor for Kafka's usage model where there are many partitions and ensuring leadership balance is important. With this ISR model and <i>N</i> replicas a Kafka topic can tolerate <i>N</i>-1 failures without losing committed messages.
+Kafka takes a slightly different approach to choosing its quorum set. Instead of majority vote, Kafka dynamically maintains a set of in-sync replicas (ISR) that are caught-up to the leader. Only members of this set are eligible for election as leader. A write to a Kafka partition is not considered committed until <i>all</i> in-sync replicas have received the write. This ISR set is persisted to zookeeper whenever it changes. Because of this, any replica in the ISR is eligible to be elected leader. This is an important factor for Kafka's usage model where there are many partitions and ensuring leadership balance is important. With this ISR model and <i>f+1</i> replicas, a Kafka topic can tolerate <i>f</i> failures without losing committed messages.
 <p>
-In practice for most use cases we hope to handle we think this tradeoff is a reasonable one. In practice to tolerate <i>f</i> failures both the majority vote and ISR approach will wait for the same number of replicas to acknowledge before committing a message (e.g. to survive one failure a majority quorum needs three replicas and one acknowledgement and the ISR approach requires two replicas and one acknowledgement). The ability to commit without the slowest servers is an advantage of the majority vote approach but we think it is ameliorated by allowing the client to choose whether they block on the message commit or not, and the additional throughput and disk space due to the lower required replication factor is worth it.
+For most use cases we hope to handle, we think this tradeoff is a reasonable one. In practice, to tolerate <i>f</i> failures, both the majority vote and the ISR approach will wait for the same number of replicas to acknowledge before committing a message (e.g. to survive one failure a majority quorum needs three replicas and one acknowledgement and the ISR approach requires two replicas and one acknowledgement). The ability to commit without the slowest servers is an advantage of the majority vote approach. However, we think it is ameliorated by allowing the client to choose whether they block on the message commit or not, and the additional throughput and disk space due to the lower required replication factor is worth it.
 <p>
-Another important design distinction is that Kafka does not require that crashed nodes recover with all their data intact. It is not uncommon for replication algorithms in this space to depend on the existence of "stable storage" that cannot be lost in any failure-recovery scenario without potential consistency violations. There are two primary problems with this assumption. First, disk errors are the most common problem we observe in real operation of persistent data systems and they often do not leave data intact. Secondly, even if this were not a problem, we do not want to require the use of fsync on every write for our consistency guarantees as this can reduce performance by two to three orders of magnitude. Our protocol for allowing a replica to rejoin the ISR ensures that before rejoining it must fully re-sync again even if it lost unflushed data in its crash.
+Another important design distinction is that Kafka does not require that crashed nodes recover with all their data intact. It is not uncommon for replication algorithms in this space to depend on the existence of "stable storage" that cannot be lost in any failure-recovery scenario without potential consistency violations. There are two primary problems with this assumption. First, disk errors are the most common problem we observe in real operation of persistent data systems and they often do not leave data intact. Secondly, even if this were not a problem, we do not want to require the use of fsync on every write for our consistency guarantees as this can reduce performance by two to three orders of magnitude. Our protocol for allowing a replica to rejoin the ISR ensures that before rejoining, it must fully re-sync again even if it lost unflushed data in its crash.
 
 <h4>Unclean leader election: What if they all die?</h4>
 
 Note that Kafka's guarantee with respect to data loss is predicated on at least on replica remaining in sync. If all the nodes replicating a partition die, this guarantee no longer holds.
 <p>
-However a practical system needs to do something reasonable when all the brokers die. If you are unlucky enough to have this happen it is important to consider what will happen. There are two behaviors that could be implemented:
+However a practical system needs to do something reasonable when all the replicas die. If you are unlucky enough to have this occur, it is important to consider what will happen. There are two behaviors that could be implemented:
 <ol>
-	<li>Wait for a broker in the last in-sync set to come back to life and choose this broker as the leader (hopefully it still has all its data).
-	<li>Choose the first broker to come back to life as the leader.
+	<li>Wait for a replica in the ISR to come back to life and choose this replica as the leader (hopefully it still has all its data).
+	<li>Choose the first replica (not necessarily in the ISR) that comes back to life as the leader.
 </ol>
 <p>
-This is a simple tradeoff between availability and consistency. If we wait for a broker in the ISR then we will remain unavailable as long as this broker is down. If this broker was destroyed or its data was lost then we are permanently down. If, on the other hand, a non-in-sync broker comes back to life and we allow it to become leader then it's log becomes the source of truth even though it is not guaranteed to have every committed message. In our current release we choose the second strategy and favor choosing a potentially inconsistent broker when all other machines are dead. In the future we would like to make this configurable to better support uses where downtime is preferable to inconsistency.
+This is a simple tradeoff between availability and consistency. If we wait for replicas in the ISR, then we will remain unavailable as long as those replicas are down. If such replicas were destroyed or their data was lost, then we are permanently down. If, on the other hand, a non-in-sync replica comes back to life and we allow it to become leader, then its log becomes the source of truth even though it is not guaranteed to have every committed message. In our current release we choose the second strategy and favor choosing a potentially inconsistent replica when all replicas in the ISR are dead. In the future, we would like to make this configurable to better support use cases where downtime is preferable to inconsistency.
 <p>
-This dilemma is not specific to Kafka, it exists in any quorum-based scheme. For example in a majority voting scheme if a majority of servers suffer a permanent failure then you must either choose to lose 100% of your data or violate consistency by taking what remains on an existing server as your new source of truth.
+This dilemma is not specific to Kafka. It exists in any quorum-based scheme. For example in a majority voting scheme, if a majority of servers suffer a permanent failure, then you must either choose to lose 100% of your data or violate consistency by taking what remains on an existing server as your new source of truth.
 
 <h4>Replica Management</h4>
 
-The above discussion on replicated logs really covers only a single log, i.e. one topic partition. However a Kafka cluster will manage hundreds or thousands of these. We attempt to balance partitions within a cluster in a round-robin fashion to avoid clustering all partitions for high-volume topics on a small number of nodes. Likewise we try to balance leadership so that each node is the leader for a proportional share of its partitions.
+The above discussion on replicated logs really covers only a single log, i.e. one topic partition. However a Kafka cluster will manage hundreds or thousands of these partitions. We attempt to balance partitions within a cluster in a round-robin fashion to avoid clustering all partitions for high-volume topics on a small number of nodes. Likewise we try to balance leadership so that each node is the leader for a proportional share of its partitions.
 <p>
-It is also important to optimize the leadership election process as that is the critical window of unavailability. A naive implementation of leader election would end up running an election per partition for all partitions a node hosted when that node failed. Instead we elect a single "controller" that is responsible for leadership assignment decisions. This controller serves an analogous role to the role of leaders themselves&mdash;we avoid making a sequence of leadership decisions by choosing a designated process to make all these decisions and then handling faults in this process. The result is that we are able to batch together many of the required leadership change notifications which makes the election process far cheaper and faster for a large number of partitions.
+It is also important to optimize the leadership election process as that is the critical window of unavailability. A naive implementation of leader election would end up running an election per partition for all partitions a node hosted when that node failed. Instead, we elect one of the brokers as the "controller". This controller detects failures at the broker level and is responsible for changing the leader of all affected partitions in a failed broker. The result is that we are able to batch together many of the required leadership change notifications which makes the election process far cheaper and faster for a large number of partitions. If the controller fails, one of the surviving brokers will become the new controller.