You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gw...@apache.org on 2015/11/06 01:42:36 UTC

kafka git commit: KAFKA-2745: Update JavaDoc for new / updated consumer APIs

Repository: kafka
Updated Branches:
  refs/heads/trunk cbdd8218c -> 76bcccd61


KAFKA-2745: Update JavaDoc for new / updated consumer APIs

Author: Guozhang Wang <wa...@gmail.com>

Reviewers: Jeff Holoman, Jason Gustafson, Gwen Shapira

Closes #425 from guozhangwang/K2745


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/76bcccd6
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/76bcccd6
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/76bcccd6

Branch: refs/heads/trunk
Commit: 76bcccd61f61d33a4a4d4db02043a2f0063d0d38
Parents: cbdd821
Author: Guozhang Wang <wa...@gmail.com>
Authored: Thu Nov 5 16:42:27 2015 -0800
Committer: Gwen Shapira <cs...@gmail.com>
Committed: Thu Nov 5 16:42:27 2015 -0800

----------------------------------------------------------------------
 .../kafka/clients/consumer/KafkaConsumer.java   | 130 ++++++++++++-------
 1 file changed, 81 insertions(+), 49 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/76bcccd6/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
index f3d2e15..d99607d 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
@@ -65,10 +65,10 @@ import java.util.regex.Pattern;
  * A Kafka client that consumes records from a Kafka cluster.
  * <p>
  * It will transparently handle the failure of servers in the Kafka cluster, and transparently adapt as partitions of
- * data it subscribes to migrate within the cluster. This client also interacts with the server to allow groups of
+ * data it fetches migrate within the cluster. This client also interacts with the server to allow groups of
  * consumers to load balance consumption using consumer groups (as described below).
  * <p>
- * The consumer maintains TCP connections to the necessary brokers to fetch data for the topics it subscribes to.
+ * The consumer maintains TCP connections to the necessary brokers to fetch data.
  * Failure to close the consumer after use will leak these connections.
  * The consumer is not thread-safe. See <a href="#multithreaded">Multi-threaded Processing</a> for more details.
  *
@@ -84,19 +84,24 @@ import java.util.regex.Pattern;
  * <p>
  * The {@link #commitSync() committed position} is the last offset that has been saved securely. Should the
  * process fail and restart, this is the offset that it will recover to. The consumer can either automatically commit
- * offsets periodically, or it can choose to control this committed position manually by calling
- * {@link #commitSync() commit}.
+ * offsets periodically; or it can choose to control this committed position manually by calling
+ * {@link #commitSync() commitSync}, which will block until the offsets have been successfully committed
+ * or fatal error has happened during the commit process, or {@link #commitAsync(OffsetCommitCallback) commitAsync} which is non-blocking
+ * and will trigger {@link OffsetCommitCallback} upon either successfully committed or fatally failed.
  * <p>
  * This distinction gives the consumer control over when a record is considered consumed. It is discussed in further
  * detail below.
  *
- * <h3>Consumer Groups</h3>
+ * <h3>Consumer Groups and Topic Subscriptions</h3>
  *
  * Kafka uses the concept of <i>consumer groups</i> to allow a pool of processes to divide up the work of consuming and
  * processing records. These processes can either be running on the same machine or, as is more likely, they can be
  * distributed over many machines to provide additional scalability and fault tolerance for processing.
  * <p>
- * Each Kafka consumer must specify a consumer group that it belongs to. Kafka will deliver each message in the
+ * Each Kafka consumer is able to configure a consumer group that it belongs to, and can dynamically set the
+ * list of topics it wants to subscribe to through {@link #subscribe(List, ConsumerRebalanceListener)},
+ * or subscribe to all topics matching certain pattern through {@link #subscribe(Pattern, ConsumerRebalanceListener)}.
+ * Kafka will deliver each message in the
  * subscribed topics to one process in each consumer group. This is achieved by balancing the partitions in the topic
  * over the consumer processes in each group. So if there is a topic with four partitions, and a consumer group with two
  * processes, each process would consume from two partitions. This group membership is maintained dynamically: if a
@@ -116,18 +121,21 @@ import java.util.regex.Pattern;
  * have multiple such groups. To get semantics similar to pub-sub in a traditional messaging system each process would
  * have its own consumer group, so each process would subscribe to all the records published to the topic.
  * <p>
- * In addition, when offsets are committed they are always committed for a given consumer group.
+ * In addition, when group reassignment happens automatically, consumers can be notified through {@link ConsumerRebalanceListener},
+ * which allows them to finish necessary application-level logic such as state cleanup, manual offset
+ * commits (note that offsets are always committed for a given consumer group), etc.
+ * See <a href="#rebalancecallback">Storing Offsets Outside Kafka</a> for more details
  * <p>
- * It is also possible for the consumer to manually specify the partitions it subscribes to, which disables this dynamic
- * partition balancing.
+ * It is also possible for the consumer to manually specify the partitions that are assigned to it through {@link #assign(List)},
+ * which disables this dynamic partition assignment.
  *
  * <h3>Usage Examples</h3>
  * The consumer APIs offer flexibility to cover a variety of consumption use cases. Here are some examples to
  * demonstrate how to use them.
  *
- * <h4>Simple Processing</h4>
- * This example demonstrates the simplest usage of Kafka's consumer api.
- *
+ * <h4>Automatic Offset Committing</h4>
+ * This example demonstrates a simple usage of Kafka's consumer api that relying on automatic offset committing.
+ * <p>
  * <pre>
  *     Properties props = new Properties();
  *     props.put(&quot;bootstrap.servers&quot;, &quot;localhost:9092&quot;);
@@ -138,7 +146,7 @@ import java.util.regex.Pattern;
  *     props.put(&quot;key.deserializer&quot;, &quot;org.apache.kafka.common.serialization.StringDeserializer&quot;);
  *     props.put(&quot;value.deserializer&quot;, &quot;org.apache.kafka.common.serialization.StringDeserializer&quot;);
  *     KafkaConsumer&lt;String, String&gt; consumer = new KafkaConsumer&lt;String, String&gt;(props);
- *     consumer.subscribe(&quot;foo&quot;, &quot;bar&quot;);
+ *     consumer.subscribe(Arrays.asList(&quot;foo&quot;, &quot;bar&quot;));
  *     while (true) {
  *         ConsumerRecords&lt;String, String&gt; records = consumer.poll(100);
  *         for (ConsumerRecord&lt;String, String&gt; record : records)
@@ -166,8 +174,11 @@ import java.util.regex.Pattern;
  * The deserializer settings specify how to turn bytes into objects. For example, by specifying string deserializers, we
  * are saying that our record's key and value will just be simple strings.
  *
- * <h4>Controlling When Messages Are Considered Consumed</h4>
+ * <h4>Manual Offset Control</h4>
  *
+ * Instead of relying on the consumer to periodically commit consumed offsets, users can also control when messages
+ * should be considered as consumed and hence commit their offsets. This is useful when the consumption of the messages
+ * are coupled with some processing logic and hence a message should not be considered as consumed until it is completed processing.
  * In this example we will consume a batch of records and batch them up in memory, when we have sufficient records
  * batched we will insert them into a database. If we allowed offsets to auto commit as in the previous example messages
  * would be considered consumed after they were given out by the consumer, and it would be possible that our process
@@ -179,7 +190,7 @@ import java.util.regex.Pattern;
  * would consume from last committed offset and would repeat the insert of the last batch of data. Used in this way
  * Kafka provides what is often called "at-least once delivery" guarantees, as each message will likely be delivered one
  * time but in failure cases could be duplicated.
- *
+ * <p>
  * <pre>
  *     Properties props = new Properties();
  *     props.put(&quot;bootstrap.servers&quot;, &quot;localhost:9092&quot;);
@@ -190,7 +201,7 @@ import java.util.regex.Pattern;
  *     props.put(&quot;key.deserializer&quot;, &quot;org.apache.kafka.common.serialization.StringDeserializer&quot;);
  *     props.put(&quot;value.deserializer&quot;, &quot;org.apache.kafka.common.serialization.StringDeserializer&quot;);
  *     KafkaConsumer&lt;String, String&gt; consumer = new KafkaConsumer&lt;String, String&gt;(props);
- *     consumer.subscribe(&quot;foo&quot;, &quot;bar&quot;);
+ *     consumer.subscribe(Arrays.asList(&quot;foo&quot;, &quot;bar&quot;));
  *     int commitInterval = 200;
  *     List&lt;ConsumerRecord&lt;String, String&gt;&gt; buffer = new ArrayList&lt;ConsumerRecord&lt;String, String&gt;&gt;();
  *     while (true) {
@@ -242,7 +253,7 @@ import java.util.regex.Pattern;
  * It isn't possible to mix both subscription to specific partitions (with no load balancing) and to topics (with load
  * balancing) using the same consumer instance.
  *
- * <h4>Managing Your Own Offsets</h4>
+ * <h4><a name="rebalancecallback">Storing Offsets Outside Kafka</h4>
  *
  * The consumer application need not use Kafka's built-in offset storage, it can store offsets in a store of its own
  * choosing. The primary use case for this is allowing the application to store both the offset and the results of the
@@ -263,19 +274,22 @@ import java.util.regex.Pattern;
  * This means that in this case the indexing process that comes back having lost recent updates just resumes indexing
  * from what it has ensuring that no updates are lost.
  * </ul>
- *
+ * <p>
  * Each record comes with its own offset, so to manage your own offset you just need to do the following:
- * <ol>
+ *
+ * <ul>
  * <li>Configure <code>enable.auto.commit=false</code>
  * <li>Use the offset provided with each {@link ConsumerRecord} to save your position.
  * <li>On restart restore the position of the consumer using {@link #seek(TopicPartition, long)}.
- * </ol>
+ * </ul>
  *
+ * <p>
  * This type of usage is simplest when the partition assignment is also done manually (this would be likely in the
  * search index use case described above). If the partition assignment is done automatically special care is
  * needed to handle the case where partition assignments change. This can be done by providing a
- * {@link ConsumerRebalanceListener} instance in the call to {@link #subscribe(List, ConsumerRebalanceListener)}.
- * When partitions are taken from a consumer the consumer will want to commit its offset for those partitions by
+ * {@link ConsumerRebalanceListener} instance in the call to {@link #subscribe(List, ConsumerRebalanceListener)}
+ * and {@link #subscribe(Pattern, ConsumerRebalanceListener)}.
+ * For example, when partitions are taken from a consumer the consumer will want to commit its offset for those partitions by
  * implementing {@link ConsumerRebalanceListener#onPartitionsRevoked(Collection)}. When partitions are assigned to a
  * consumer, the consumer will want to look up the offset for those new partitions an correctly initialize the consumer
  * to that position by implementing {@link ConsumerRebalanceListener#onPartitionsAssigned(Collection)}.
@@ -298,12 +312,30 @@ import java.util.regex.Pattern;
  * Another use case is for a system that maintains local state as described in the previous section. In such a system
  * the consumer will want to initialize its position on start-up to whatever is contained in the local store. Likewise
  * if the local state is destroyed (say because the disk is lost) the state may be recreated on a new machine by
- * reconsuming all the data and recreating the state (assuming that Kafka is retaining sufficient history).
- *
+ * re-consuming all the data and recreating the state (assuming that Kafka is retaining sufficient history).
+ * <p>
  * Kafka allows specifying the position using {@link #seek(TopicPartition, long)} to specify the new position. Special
  * methods for seeking to the earliest and latest offset the server maintains are also available (
  * {@link #seekToBeginning(TopicPartition...)} and {@link #seekToEnd(TopicPartition...)} respectively).
  *
+ * <h4>Consumption Flow Control</h4>
+ *
+ * If a consumer is assigned multiple partitions to fetch data from, it will try to consume from all of them at the same time,
+ * effectively giving these partitions the same priority for consumption. However in some cases consumers may want to
+ * first focus on fetching from some subset of the assigned partitions at full speed, and only start fetching other partitions
+ * when these partitions have few or no data to consume.
+ *
+ * <p>
+ * One of such cases is stream processing, where processor fetches from two topics and performs the join on these two streams.
+ * When one of the topic is long lagging behind the other, the processor would like to pause fetching from the ahead topic
+ * in order to get the lagging stream to catch up. Another example is bootstraping upon consumer starting up where there are
+ * a lot of history data to catch up, the applciations usually wants to get the latest data on some of the topics before consider
+ * fetching other topics.
+ *
+ * <p>
+ * Kafka supports dynamic controlling of consumption flows by using {@link #pause(TopicPartition...)} and {@link #resume(TopicPartition...)}
+ * to pause the consumption on the specified assigned partitions and resume the consumption
+ * on the specified paused partitions respectively in the future {@link #poll(long)} calls.
  *
  * <h3><a name="multithreaded">Multi-threaded Processing</a></h3>
  *
@@ -347,6 +379,7 @@ import java.util.regex.Pattern;
  *
  * Then in a separate thread, the consumer can be shutdown by setting the closed flag and waking up the consumer.
  *
+ * <p>
  * <pre>
  *     closed.set(true);
  *     consumer.wakeup();
@@ -593,7 +626,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
     }
 
     /**
-     * The set of partitions currently assigned to this consumer. If subscription happened by directly assigning
+     * Get the set of partitions currently assigned to this consumer. If subscription happened by directly assigning
      * partitions using {@link #assign(List)} then this will simply return the same partitions that
      * were assigned. If topic subscription was used, then this will give the set of topic partitions currently assigned
      * to the consumer (which may be none if the assignment hasn't happened yet, or the partitions are in the
@@ -624,9 +657,9 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
     }
 
     /**
-     * Subscribe to the given list of topics and use the consumer's group management functionality to
-     * assign partitions. Topic subscriptions are not incremental. This list will replace the current
-     * assignment (if there is one). Note that it is not possible to combine topic subscription with group management
+     * Subscribe to the given list of topics to get dynamically
+     * assigned partitions. <b>Topic subscriptions are not incremental. This list will replace the current
+     * assignment (if there is one).</b> Note that it is not possible to combine topic subscription with group management
      * with manual partition assignment through {@link #assign(List)}.
      *
      * If the given list of topics is empty, it is treated the same as {@link #unsubscribe()}.
@@ -669,9 +702,9 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
     }
 
     /**
-     * Subscribe to the given list of topics and use the consumer's group management functionality to
-     * assign partitions. Topic subscriptions are not incremental. This list will replace the current
-     * assignment (if there is one). It is not possible to combine topic subscription with group management
+     * Subscribe to the given list of topics to get dynamically assigned partitions.
+     * <b>Topic subscriptions are not incremental. This list will replace the current
+     * assignment (if there is one).</b> It is not possible to combine topic subscription with group management
      * with manual partition assignment through {@link #assign(List)}.
      *
      * If the given list of topics is empty, it is treated the same as {@link #unsubscribe()}.
@@ -691,8 +724,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
     }
 
     /**
-     * Subscribes to topics matching specified pattern and uses the consumer's group
-     * management functionality. The pattern matching will be done periodically against topics
+     * Subscribe to all topics matching specified pattern to get dynamically assigned partitions. The pattern matching will be done periodically against topics
      * existing at the time of check.
      * <p>
      * As part of group management, the consumer will keep track of the list of consumers that
@@ -720,7 +752,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
     }
 
     /**
-     * Unsubscribe from topics currently subscribed to
+     * Unsubscribe from all topics currently subscribed to.
      */
     public void unsubscribe() {
         acquire();
@@ -735,7 +767,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
     }
 
     /**
-     * Assign a list of partition to this consumer. This interface does not allow for incremental assignment
+     * Manually assign a list of partition to this consumer. This interface does not allow for incremental assignment
      * and will replace the previous assignment (if there is one).
      * <p>
      * Manual topic assignment through this method does not use the consumer's group management
@@ -761,14 +793,13 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
     }
 
     /**
-     * Fetches data for the topics or partitions specified using one of the subscribe/assign APIs. It is an error to not have
+     * Fetch data for the topics or partitions specified using one of the subscribe/assign APIs. It is an error to not have
      * subscribed to any topics or partitions before polling for data.
      * <p>
-     * The offset used for fetching the data is governed by whether or not {@link #seek(TopicPartition, long)} is used.
-     * If {@link #seek(TopicPartition, long)} is used, it will use the specified offsets on startup and on every
-     * rebalance, to consume data from that offset sequentially on every poll. If not, it will use the last checkpointed
-     * offset using {@link #commitSync(Map) commit(offsets)} for the subscribed list of partitions.
-     * 
+     * On each poll, consumer will try to use the last consumed offset as the starting offset and fetch sequentially. The last
+     * consumed offset can be manually set through {@link #seek(TopicPartition, long)} or automatically set as the last committed
+     * offset for the subscribed list of partitions
+     *
      * @param timeout The time, in milliseconds, spent waiting in poll if data is not available. If 0, returns
      *            immediately with any records available now. Must not be negative.
      * @return map of topic to records since the last fetch for the subscribed list of topics and partitions
@@ -851,7 +882,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
 
 
     /**
-     * Commits offsets returned on the last {@link #poll(long) poll()} for the subscribed list of topics and partitions.
+     * Commit offsets returned on the last {@link #poll(long) poll()} for all the subscribed list of topics and partitions.
      * <p>
      * This commits offsets only to Kafka. The offsets committed using this API will be used on the first fetch after
      * every rebalance and also on startup. As such, if you need to store offsets in anything other than Kafka, this API
@@ -873,7 +904,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
     }
 
     /**
-     * Commits the specified offsets for the specified list of topics and partitions to Kafka.
+     * Commit the specified offsets for the specified list of topics and partitions.
      * <p>
      * This commits offsets to Kafka. The offsets committed using this API will be used on the first fetch after every
      * rebalance and also on startup. As such, if you need to store offsets in anything other than Kafka, this API
@@ -899,7 +930,8 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
     }
 
     /**
-     * Convenient method. Same as {@link #commitAsync(OffsetCommitCallback) commitAsync(null)}
+     * Commit offsets returned on the last {@link #poll(long) poll()} for all the subscribed list of topics and partition.
+     * Same as {@link #commitAsync(OffsetCommitCallback) commitAsync(null)}
      */
     @Override
     public void commitAsync() {
@@ -907,7 +939,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
     }
 
     /**
-     * Commits offsets returned on the last {@link #poll(long) poll()} for the subscribed list of topics and partitions.
+     * Commit offsets returned on the last {@link #poll(long) poll()} for the subscribed list of topics and partitions.
      * <p>
      * This commits offsets only to Kafka. The offsets committed using this API will be used on the first fetch after
      * every rebalance and also on startup. As such, if you need to store offsets in anything other than Kafka, this API
@@ -929,7 +961,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
     }
 
     /**
-     * Commits the specified offsets for the specified list of topics and partitions to Kafka.
+     * Commit the specified offsets for the specified list of topics and partitions to Kafka.
      * <p>
      * This commits offsets to Kafka. The offsets committed using this API will be used on the first fetch after every
      * rebalance and also on startup. As such, if you need to store offsets in anything other than Kafka, this API
@@ -1005,7 +1037,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
     }
 
     /**
-     * Returns the offset of the <i>next record</i> that will be fetched (if a record with that offset exists).
+     * Get the offset of the <i>next record</i> that will be fetched (if a record with that offset exists).
      *
      * @param partition The partition to get the position for
      * @return The offset
@@ -1034,7 +1066,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
     }
 
     /**
-     * Fetches the last committed offset for the given partition (whether the commit happened by this process or
+     * Get the last committed offset for the given partition (whether the commit happened by this process or
      * another). This offset will be used as the position for the consumer in the event of a failure.
      * <p>
      * This call may block to do a remote call if the partition in question isn't assigned to this consumer or if the
@@ -1139,7 +1171,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
     }
 
     /**
-     * Resume any partitions which have been paused with {@link #pause(TopicPartition...)}. New calls to
+     * Resume specified partitions which have been paused with {@link #pause(TopicPartition...)}. New calls to
      * {@link #poll(long)} will return records from these partitions if there are any to be fetched.
      * If the partitions were not previously paused, this method is a no-op.
      * @param partitions The partitions which should be resumed