You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jj...@apache.org on 2015/02/03 02:59:07 UTC

svn commit: r1656620 - in /kafka/site/082: configuration.html implementation.html

Author: jjkoshy
Date: Tue Feb  3 01:59:07 2015
New Revision: 1656620

URL: http://svn.apache.org/r1656620
Log:
KAFKA-1729; Add documentation for Kafka-based offset management; reviewed by Jun Rao and Gwen Shapira.

Modified:
    kafka/site/082/configuration.html
    kafka/site/082/implementation.html

Modified: kafka/site/082/configuration.html
URL: http://svn.apache.org/viewvc/kafka/site/082/configuration.html?rev=1656620&r1=1656619&r2=1656620&view=diff
==============================================================================
--- kafka/site/082/configuration.html (original)
+++ kafka/site/082/configuration.html Tue Feb  3 01:59:07 2015
@@ -384,6 +384,53 @@ ZooKeeper also allows you to add a "chro
       <td>false</td>
       <td>Enable delete topic.</td>
     </tr>
+    <tr>
+      <td>offsets.topic.num.partitions</td>
+      <td>50</td>
+      <td>The number of partitions for the offset commit topic. Since changing this after deployment is currently unsupported, we recommend using a higher setting for production (e.g., 100-200).</td>
+    </tr>
+    <tr>
+      <td>offsets.topic.retention.minutes</td>
+      <td>1440</td>
+      <td>Offsets that are older than this age will be marked for deletion. The actual purge will occur when the log cleaner compacts the offsets topic.</td>
+    </tr>
+    <tr>
+      <td>offsets.retention.check.interval.ms</td>
+      <td>600000</td>
+      <td>The frequency at which the offset manager checks for stale offsets.</td>
+    </tr>
+    <tr>
+      <td>offsets.topic.replication.factor</td>
+      <td>3</td>
+      <td>The replication factor for the offset commit topic. A higher setting (e.g., three or four) is recommended in order to ensure higher availability. If the offsets topic is created when fewer brokers than the replication factor then the offsets topic will be created with fewer replicas.</td>
+    </tr>
+    <tr>
+      <td>offsets.topic.segment.bytes</td>
+      <td>104857600</td>
+      <td>Segment size for the offsets topic. Since it uses a compacted topic, this should be kept relatively low in order to facilitate faster log compaction and loads.</td>
+    </tr>
+    <tr>
+      <td>offsets.load.buffer.size</td>
+      <td>5242880</td>
+      <td>An offset load occurs when a broker becomes the offset manager for a set of consumer groups (i.e., when it becomes a leader for an offsets topic partition). This setting corresponds to the batch size (in bytes) to use when reading from the offsets segments when loading offsets into the offset manager's cache.</td>
+    </tr>
+<!--
+    <tr>
+      <td>offsets.topic.compression.codec</td>
+      <td>none</td>
+      <td>(Should not be used until KAFKA-1374 is implemented.) Compression codec for the offsets topic. Compression should be enabled in order to achieve "atomic" commits.</td>
+    </tr>
+-->
+    <tr>
+      <td>offsets.commit.required.acks</td>
+      <td>-1</td>
+      <td>The number of acknowledgements that are required before the offset commit can be accepted. This is similar to the producer's acknowledgement setting. In general, the default should not be overridden.</td>
+    </tr>
+    <tr>
+      <td>offsets.commit.timeout.ms</td>
+      <td>5000</td>
+      <td>The offset commit will be delayed until this timeout or the required number of replicas have received the offset commit. This is similar to the producer request timeout.</td>
+    </tr>
 </tbody></table>
 
 <p>More details about broker configuration can be found in the scala class <code>kafka.server.KafkaConfig</code>.</p>
@@ -637,6 +684,36 @@ The essential consumer configurations ar
       <td colspan="1">2000</td>
       <td>How far a ZK follower can be behind a ZK leader</td>
     </tr>
+    <tr>
+      <td>offsets.storage</td>
+      <td colspan="1">zookeeper</td>
+      <td>Select where offsets should be stored (zookeeper or kafka).</td>
+    </tr>
+    <tr>
+      <td>offsets.channel.backoff.ms</td>
+      <td colspan="1">1000</td>
+      <td>The backoff period when reconnecting the offsets channel or retrying failed offset fetch/commit requests.</td>
+    </tr>
+    <tr>
+      <td>offsets.channel.socket.timeout.ms</td>
+      <td colspan="1">10000</td>
+      <td>Socket timeout when reading responses for offset fetch/commit requests. This timeout is also used for ConsumerMetadata requests that are used to query for the offset manager.</td>
+    </tr>
+    <tr>
+      <td>offsets.commit.max.retries</td>
+      <td colspan="1">5</td>
+      <td>Retry the offset commit up to this many times on failure. This retry count only applies to offset commits during shut-down. It does not apply to commits originating from the auto-commit thread. It also does not apply to attempts to query for the offset coordinator before committing offsets. i.e., if a consumer metadata request fails for any reason, it will be retried and that retry does not count toward this limit.</td>
+    </tr>
+    <tr>
+      <td>dual.commit.enabled</td>
+      <td colspan="1">true</td>
+      <td>If you are using "kafka" as offsets.storage, you can dual commit offsets to ZooKeeper (in addition to Kafka). This is required during migration from zookeeper-based offset storage to kafka-based offset storage. With respect to any given consumer group, it is safe to turn this off after all instances within that group have been migrated to the new version that commits offsets to the broker (instead of directly to ZooKeeper).</td>
+    </tr>
+    <tr>
+      <td>partition.assignment.strategy</td>
+      <td colspan="1">range</td>
+      <td><p>Select between the "range" or "roundrobin" strategy for assigning partitions to consumer streams.<p>The round-robin partition assignor lays out all the available partitions and all the available consumer threads. It then proceeds to do a round-robin assignment from partition to consumer thread. If the subscriptions of all consumer instances are identical, then the partitions will be uniformly distributed. (i.e., the partition ownership counts will be within a delta of exactly one across all consumer threads.) Round-robin assignment is permitted only if: (a) Every topic has the same number of streams within a consumer instance (b) The set of subscribed topics is identical for every consumer instance within the group.<p> Range partitioning works on a per-topic basis. For each topic, we lay out the available partitions in numeric order and the consumer threads in lexicographic order. We then divide the number of partitions by the total number of consumer streams (threads) 
 to determine the number of partitions to assign to each consumer. If it does not evenly divide, then the first few consumers will have one extra partition.</td>
+    </tr>
 </tbody>
 </table>
 

Modified: kafka/site/082/implementation.html
URL: http://svn.apache.org/viewvc/kafka/site/082/implementation.html?rev=1656620&r1=1656619&r2=1656620&view=diff
==============================================================================
--- kafka/site/082/implementation.html (original)
+++ kafka/site/082/implementation.html Tue Feb  3 01:59:07 2015
@@ -225,6 +225,35 @@ Note that two kinds of corruption must b
 </p>
 
 <h3><a id="distributionimpl">5.6 Distribution</a></h3>
+<h4>Consumer Offset Tracking</h4>
+<p>
+The high-level consumer tracks the maximum offset it has consumed in each partition and periodically commits its offset vector so that it can resume from those offsets in the event of a restart. Kafka provides the option to store all the offsets for a given consumer group in a designated broker (for that group) called the <i>offset manager</i>. i.e., any consumer instance in that consumer group should send its offset commits and fetches to that offset manager (broker). The high-level consumer handles this automatically. If you use the simple consumer you will need to discover the offset manager and explicitly commit or fetch offsets. A consumer can look up its offset manager by issuing a ConsumerMetadataRequest to any Kafka broker and reading the ConsumerMetadataResponse which will contain the offset manager. The consumer can then proceed to commit or fetch offsets from the offsets manager broker. In case the offset manager moves, the consumer will need to rediscover the offset mana
 ger. If you wish to manage your offsets manually, you can take a look at these <a href="https://cwiki.apache.org/confluence/display/KAFKA/Committing+and+fetching+consumer+offsets+in+Kafka">code samples that explain how to issue OffsetCommitRequest and OffsetFetchRequest</a>.
+</p>
+
+<p>
+When the offset manager receives an OffsetCommitRequest, it appends the request to a special <a href="#compaction">compacted</a> Kafka topic named <i>__consumer_offsets</i>. The offset manager sends a successful offset commit response to the consumer only after all the replicas of the offsets topic receive the offsets. In case the offsets fail to replicate within a configurable timeout, the offset commit will fail and the consumer may retry the commit after backing off. (This is done automatically by the high-level consumer.) The brokers periodically compact the offsets topic since it only needs to maintain the most recent offset commit per partition. The offset manager also caches the offsets in an in-memory table in order to serve offset fetches quickly.
+</p>
+
+<p>
+When the offset manager receives an offset fetch request, it simply returns the last committed offset vector from the offsets cache. In case the offset manager was just started or if it just became the offset manager for a new set of consumer groups (by becoming a leader for a partition of the offsets topic), it may need to load the offsets topic partition into the cache. In this case, the offset fetch will fail with an OffsetsLoadInProgress exception and the consumer may retry the OffsetFetchRequest after backing off. (This is done automatically by the high-level consumer.)
+</p>
+
+<h5><a id="offsetmigration">Migrating offsets from ZooKeeper to Kafka</a></h5>
+<p>
+Kafka consumers in earlier releases store their offsets by default in ZooKeeper. It is possible to migrate these consumers to commit offsets into Kafka by following these steps:
+<ol>
+   <li>Set <code>offsets.storage=kafka</code> and <code>dual.commit.enabled=true</code> in your consumer config.
+   </li>
+   <li>Do a rolling bounce of your consumers and then verify that your consumers are healthy.
+   </li>
+   <li>Set <code>dual.commit.enabled=false</code> in your consumer config.
+   </li>
+   <li>Do a rolling bounce of your consumers and then verify that your consumers are healthy.
+   </li>
+</ol>
+A roll-back (i.e., migrating from Kafka back to ZooKeeper) can also be performed using the above steps if you set <code>offsets.storage=zookeeper</code>.
+</p>
+
 <h4>ZooKeeper Directories</h4>
 <p>
 The following gives the ZooKeeper structures and algorithms used for co-ordination between consumers and brokers.
@@ -256,7 +285,7 @@ Each broker registers itself under the t
 
 <h4>Consumers and Consumer Groups</h4>
 <p>
-Consumers of topics also register themselves in ZooKeeper, in order to balance the consumption of data and track their offsets in each partition for each broker they consume from.
+Consumers of topics also register themselves in ZooKeeper, in order to coordinate with each other and balance the consumption of data. Consumers can also store their offsets in ZooKeeper by setting <code>offsets.storage=zookeeper</code>. However, this offset storage mechanism will be deprecated in a future release. Therefore, it is recommended to <a href="#offsetmigration">migrate offsets storage to Kafka</a>.
 </p>
 
 <p>
@@ -277,9 +306,9 @@ In addition to the group_id which is sha
 Each of the consumers in the group registers under its group and creates a znode with its consumer_id. The value of the znode contains a map of &lt;topic, #streams&gt;. This id is simply used to identify each of the consumers which is currently active within a group. This is an ephemeral node so it will disappear if the consumer process dies.
 </p>
 
-<h4>Consumer Offset Tracking</h4>
+<h4>Consumer Offsets</h4>
 <p>
-Consumers track the maximum offset they have consumed in each partition. This value is stored in a ZooKeeper directory
+Consumers track the maximum offset they have consumed in each partition. This value is stored in a ZooKeeper directory if <code>offsets.storage=zookeeper</code>. This valued is stored in a ZooKeeper directory.
 </p>
 <pre>
 /consumers/[group_id]/offsets/[topic]/[broker_id-partition_id] --> offset_counter_value ((persistent node)