You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gw...@apache.org on 2015/11/06 01:42:36 UTC
kafka git commit: KAFKA-2745: Update JavaDoc for new / updated
consumer APIs
Repository: kafka
Updated Branches:
refs/heads/trunk cbdd8218c -> 76bcccd61
KAFKA-2745: Update JavaDoc for new / updated consumer APIs
Author: Guozhang Wang <wa...@gmail.com>
Reviewers: Jeff Holoman, Jason Gustafson, Gwen Shapira
Closes #425 from guozhangwang/K2745
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/76bcccd6
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/76bcccd6
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/76bcccd6
Branch: refs/heads/trunk
Commit: 76bcccd61f61d33a4a4d4db02043a2f0063d0d38
Parents: cbdd821
Author: Guozhang Wang <wa...@gmail.com>
Authored: Thu Nov 5 16:42:27 2015 -0800
Committer: Gwen Shapira <cs...@gmail.com>
Committed: Thu Nov 5 16:42:27 2015 -0800
----------------------------------------------------------------------
.../kafka/clients/consumer/KafkaConsumer.java | 130 ++++++++++++-------
1 file changed, 81 insertions(+), 49 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/76bcccd6/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
index f3d2e15..d99607d 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
@@ -65,10 +65,10 @@ import java.util.regex.Pattern;
* A Kafka client that consumes records from a Kafka cluster.
* <p>
* It will transparently handle the failure of servers in the Kafka cluster, and transparently adapt as partitions of
- * data it subscribes to migrate within the cluster. This client also interacts with the server to allow groups of
+ * data it fetches migrate within the cluster. This client also interacts with the server to allow groups of
* consumers to load balance consumption using consumer groups (as described below).
* <p>
- * The consumer maintains TCP connections to the necessary brokers to fetch data for the topics it subscribes to.
+ * The consumer maintains TCP connections to the necessary brokers to fetch data.
* Failure to close the consumer after use will leak these connections.
* The consumer is not thread-safe. See <a href="#multithreaded">Multi-threaded Processing</a> for more details.
*
@@ -84,19 +84,24 @@ import java.util.regex.Pattern;
* <p>
* The {@link #commitSync() committed position} is the last offset that has been saved securely. Should the
* process fail and restart, this is the offset that it will recover to. The consumer can either automatically commit
- * offsets periodically, or it can choose to control this committed position manually by calling
- * {@link #commitSync() commit}.
+ * offsets periodically; or it can choose to control this committed position manually by calling
+ * {@link #commitSync() commitSync}, which will block until the offsets have been successfully committed
+ * or fatal error has happened during the commit process, or {@link #commitAsync(OffsetCommitCallback) commitAsync} which is non-blocking
+ * and will trigger {@link OffsetCommitCallback} upon either successfully committed or fatally failed.
* <p>
* This distinction gives the consumer control over when a record is considered consumed. It is discussed in further
* detail below.
*
- * <h3>Consumer Groups</h3>
+ * <h3>Consumer Groups and Topic Subscriptions</h3>
*
* Kafka uses the concept of <i>consumer groups</i> to allow a pool of processes to divide up the work of consuming and
* processing records. These processes can either be running on the same machine or, as is more likely, they can be
* distributed over many machines to provide additional scalability and fault tolerance for processing.
* <p>
- * Each Kafka consumer must specify a consumer group that it belongs to. Kafka will deliver each message in the
+ * Each Kafka consumer is able to configure a consumer group that it belongs to, and can dynamically set the
+ * list of topics it wants to subscribe to through {@link #subscribe(List, ConsumerRebalanceListener)},
+ * or subscribe to all topics matching certain pattern through {@link #subscribe(Pattern, ConsumerRebalanceListener)}.
+ * Kafka will deliver each message in the
* subscribed topics to one process in each consumer group. This is achieved by balancing the partitions in the topic
* over the consumer processes in each group. So if there is a topic with four partitions, and a consumer group with two
* processes, each process would consume from two partitions. This group membership is maintained dynamically: if a
@@ -116,18 +121,21 @@ import java.util.regex.Pattern;
* have multiple such groups. To get semantics similar to pub-sub in a traditional messaging system each process would
* have its own consumer group, so each process would subscribe to all the records published to the topic.
* <p>
- * In addition, when offsets are committed they are always committed for a given consumer group.
+ * In addition, when group reassignment happens automatically, consumers can be notified through {@link ConsumerRebalanceListener},
+ * which allows them to finish necessary application-level logic such as state cleanup, manual offset
+ * commits (note that offsets are always committed for a given consumer group), etc.
+ * See <a href="#rebalancecallback">Storing Offsets Outside Kafka</a> for more details
* <p>
- * It is also possible for the consumer to manually specify the partitions it subscribes to, which disables this dynamic
- * partition balancing.
+ * It is also possible for the consumer to manually specify the partitions that are assigned to it through {@link #assign(List)},
+ * which disables this dynamic partition assignment.
*
* <h3>Usage Examples</h3>
* The consumer APIs offer flexibility to cover a variety of consumption use cases. Here are some examples to
* demonstrate how to use them.
*
- * <h4>Simple Processing</h4>
- * This example demonstrates the simplest usage of Kafka's consumer api.
- *
+ * <h4>Automatic Offset Committing</h4>
+ * This example demonstrates a simple usage of Kafka's consumer api that relying on automatic offset committing.
+ * <p>
* <pre>
* Properties props = new Properties();
* props.put("bootstrap.servers", "localhost:9092");
@@ -138,7 +146,7 @@ import java.util.regex.Pattern;
* props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
* props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
* KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);
- * consumer.subscribe("foo", "bar");
+ * consumer.subscribe(Arrays.asList("foo", "bar"));
* while (true) {
* ConsumerRecords<String, String> records = consumer.poll(100);
* for (ConsumerRecord<String, String> record : records)
@@ -166,8 +174,11 @@ import java.util.regex.Pattern;
* The deserializer settings specify how to turn bytes into objects. For example, by specifying string deserializers, we
* are saying that our record's key and value will just be simple strings.
*
- * <h4>Controlling When Messages Are Considered Consumed</h4>
+ * <h4>Manual Offset Control</h4>
*
+ * Instead of relying on the consumer to periodically commit consumed offsets, users can also control when messages
+ * should be considered as consumed and hence commit their offsets. This is useful when the consumption of the messages
+ * are coupled with some processing logic and hence a message should not be considered as consumed until it is completed processing.
* In this example we will consume a batch of records and batch them up in memory, when we have sufficient records
* batched we will insert them into a database. If we allowed offsets to auto commit as in the previous example messages
* would be considered consumed after they were given out by the consumer, and it would be possible that our process
@@ -179,7 +190,7 @@ import java.util.regex.Pattern;
* would consume from last committed offset and would repeat the insert of the last batch of data. Used in this way
* Kafka provides what is often called "at-least once delivery" guarantees, as each message will likely be delivered one
* time but in failure cases could be duplicated.
- *
+ * <p>
* <pre>
* Properties props = new Properties();
* props.put("bootstrap.servers", "localhost:9092");
@@ -190,7 +201,7 @@ import java.util.regex.Pattern;
* props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
* props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
* KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);
- * consumer.subscribe("foo", "bar");
+ * consumer.subscribe(Arrays.asList("foo", "bar"));
* int commitInterval = 200;
* List<ConsumerRecord<String, String>> buffer = new ArrayList<ConsumerRecord<String, String>>();
* while (true) {
@@ -242,7 +253,7 @@ import java.util.regex.Pattern;
* It isn't possible to mix both subscription to specific partitions (with no load balancing) and to topics (with load
* balancing) using the same consumer instance.
*
- * <h4>Managing Your Own Offsets</h4>
+ * <h4><a name="rebalancecallback">Storing Offsets Outside Kafka</h4>
*
* The consumer application need not use Kafka's built-in offset storage, it can store offsets in a store of its own
* choosing. The primary use case for this is allowing the application to store both the offset and the results of the
@@ -263,19 +274,22 @@ import java.util.regex.Pattern;
* This means that in this case the indexing process that comes back having lost recent updates just resumes indexing
* from what it has ensuring that no updates are lost.
* </ul>
- *
+ * <p>
* Each record comes with its own offset, so to manage your own offset you just need to do the following:
- * <ol>
+ *
+ * <ul>
* <li>Configure <code>enable.auto.commit=false</code>
* <li>Use the offset provided with each {@link ConsumerRecord} to save your position.
* <li>On restart restore the position of the consumer using {@link #seek(TopicPartition, long)}.
- * </ol>
+ * </ul>
*
+ * <p>
* This type of usage is simplest when the partition assignment is also done manually (this would be likely in the
* search index use case described above). If the partition assignment is done automatically special care is
* needed to handle the case where partition assignments change. This can be done by providing a
- * {@link ConsumerRebalanceListener} instance in the call to {@link #subscribe(List, ConsumerRebalanceListener)}.
- * When partitions are taken from a consumer the consumer will want to commit its offset for those partitions by
+ * {@link ConsumerRebalanceListener} instance in the call to {@link #subscribe(List, ConsumerRebalanceListener)}
+ * and {@link #subscribe(Pattern, ConsumerRebalanceListener)}.
+ * For example, when partitions are taken from a consumer the consumer will want to commit its offset for those partitions by
* implementing {@link ConsumerRebalanceListener#onPartitionsRevoked(Collection)}. When partitions are assigned to a
* consumer, the consumer will want to look up the offset for those new partitions an correctly initialize the consumer
* to that position by implementing {@link ConsumerRebalanceListener#onPartitionsAssigned(Collection)}.
@@ -298,12 +312,30 @@ import java.util.regex.Pattern;
* Another use case is for a system that maintains local state as described in the previous section. In such a system
* the consumer will want to initialize its position on start-up to whatever is contained in the local store. Likewise
* if the local state is destroyed (say because the disk is lost) the state may be recreated on a new machine by
- * reconsuming all the data and recreating the state (assuming that Kafka is retaining sufficient history).
- *
+ * re-consuming all the data and recreating the state (assuming that Kafka is retaining sufficient history).
+ * <p>
* Kafka allows specifying the position using {@link #seek(TopicPartition, long)} to specify the new position. Special
* methods for seeking to the earliest and latest offset the server maintains are also available (
* {@link #seekToBeginning(TopicPartition...)} and {@link #seekToEnd(TopicPartition...)} respectively).
*
+ * <h4>Consumption Flow Control</h4>
+ *
+ * If a consumer is assigned multiple partitions to fetch data from, it will try to consume from all of them at the same time,
+ * effectively giving these partitions the same priority for consumption. However in some cases consumers may want to
+ * first focus on fetching from some subset of the assigned partitions at full speed, and only start fetching other partitions
+ * when these partitions have few or no data to consume.
+ *
+ * <p>
+ * One of such cases is stream processing, where processor fetches from two topics and performs the join on these two streams.
+ * When one of the topic is long lagging behind the other, the processor would like to pause fetching from the ahead topic
+ * in order to get the lagging stream to catch up. Another example is bootstraping upon consumer starting up where there are
+ * a lot of history data to catch up, the applciations usually wants to get the latest data on some of the topics before consider
+ * fetching other topics.
+ *
+ * <p>
+ * Kafka supports dynamic controlling of consumption flows by using {@link #pause(TopicPartition...)} and {@link #resume(TopicPartition...)}
+ * to pause the consumption on the specified assigned partitions and resume the consumption
+ * on the specified paused partitions respectively in the future {@link #poll(long)} calls.
*
* <h3><a name="multithreaded">Multi-threaded Processing</a></h3>
*
@@ -347,6 +379,7 @@ import java.util.regex.Pattern;
*
* Then in a separate thread, the consumer can be shutdown by setting the closed flag and waking up the consumer.
*
+ * <p>
* <pre>
* closed.set(true);
* consumer.wakeup();
@@ -593,7 +626,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
}
/**
- * The set of partitions currently assigned to this consumer. If subscription happened by directly assigning
+ * Get the set of partitions currently assigned to this consumer. If subscription happened by directly assigning
* partitions using {@link #assign(List)} then this will simply return the same partitions that
* were assigned. If topic subscription was used, then this will give the set of topic partitions currently assigned
* to the consumer (which may be none if the assignment hasn't happened yet, or the partitions are in the
@@ -624,9 +657,9 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
}
/**
- * Subscribe to the given list of topics and use the consumer's group management functionality to
- * assign partitions. Topic subscriptions are not incremental. This list will replace the current
- * assignment (if there is one). Note that it is not possible to combine topic subscription with group management
+ * Subscribe to the given list of topics to get dynamically
+ * assigned partitions. <b>Topic subscriptions are not incremental. This list will replace the current
+ * assignment (if there is one).</b> Note that it is not possible to combine topic subscription with group management
* with manual partition assignment through {@link #assign(List)}.
*
* If the given list of topics is empty, it is treated the same as {@link #unsubscribe()}.
@@ -669,9 +702,9 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
}
/**
- * Subscribe to the given list of topics and use the consumer's group management functionality to
- * assign partitions. Topic subscriptions are not incremental. This list will replace the current
- * assignment (if there is one). It is not possible to combine topic subscription with group management
+ * Subscribe to the given list of topics to get dynamically assigned partitions.
+ * <b>Topic subscriptions are not incremental. This list will replace the current
+ * assignment (if there is one).</b> It is not possible to combine topic subscription with group management
* with manual partition assignment through {@link #assign(List)}.
*
* If the given list of topics is empty, it is treated the same as {@link #unsubscribe()}.
@@ -691,8 +724,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
}
/**
- * Subscribes to topics matching specified pattern and uses the consumer's group
- * management functionality. The pattern matching will be done periodically against topics
+ * Subscribe to all topics matching specified pattern to get dynamically assigned partitions. The pattern matching will be done periodically against topics
* existing at the time of check.
* <p>
* As part of group management, the consumer will keep track of the list of consumers that
@@ -720,7 +752,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
}
/**
- * Unsubscribe from topics currently subscribed to
+ * Unsubscribe from all topics currently subscribed to.
*/
public void unsubscribe() {
acquire();
@@ -735,7 +767,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
}
/**
- * Assign a list of partition to this consumer. This interface does not allow for incremental assignment
+ * Manually assign a list of partition to this consumer. This interface does not allow for incremental assignment
* and will replace the previous assignment (if there is one).
* <p>
* Manual topic assignment through this method does not use the consumer's group management
@@ -761,14 +793,13 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
}
/**
- * Fetches data for the topics or partitions specified using one of the subscribe/assign APIs. It is an error to not have
+ * Fetch data for the topics or partitions specified using one of the subscribe/assign APIs. It is an error to not have
* subscribed to any topics or partitions before polling for data.
* <p>
- * The offset used for fetching the data is governed by whether or not {@link #seek(TopicPartition, long)} is used.
- * If {@link #seek(TopicPartition, long)} is used, it will use the specified offsets on startup and on every
- * rebalance, to consume data from that offset sequentially on every poll. If not, it will use the last checkpointed
- * offset using {@link #commitSync(Map) commit(offsets)} for the subscribed list of partitions.
- *
+ * On each poll, consumer will try to use the last consumed offset as the starting offset and fetch sequentially. The last
+ * consumed offset can be manually set through {@link #seek(TopicPartition, long)} or automatically set as the last committed
+ * offset for the subscribed list of partitions
+ *
* @param timeout The time, in milliseconds, spent waiting in poll if data is not available. If 0, returns
* immediately with any records available now. Must not be negative.
* @return map of topic to records since the last fetch for the subscribed list of topics and partitions
@@ -851,7 +882,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
/**
- * Commits offsets returned on the last {@link #poll(long) poll()} for the subscribed list of topics and partitions.
+ * Commit offsets returned on the last {@link #poll(long) poll()} for all the subscribed list of topics and partitions.
* <p>
* This commits offsets only to Kafka. The offsets committed using this API will be used on the first fetch after
* every rebalance and also on startup. As such, if you need to store offsets in anything other than Kafka, this API
@@ -873,7 +904,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
}
/**
- * Commits the specified offsets for the specified list of topics and partitions to Kafka.
+ * Commit the specified offsets for the specified list of topics and partitions.
* <p>
* This commits offsets to Kafka. The offsets committed using this API will be used on the first fetch after every
* rebalance and also on startup. As such, if you need to store offsets in anything other than Kafka, this API
@@ -899,7 +930,8 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
}
/**
- * Convenient method. Same as {@link #commitAsync(OffsetCommitCallback) commitAsync(null)}
+ * Commit offsets returned on the last {@link #poll(long) poll()} for all the subscribed list of topics and partition.
+ * Same as {@link #commitAsync(OffsetCommitCallback) commitAsync(null)}
*/
@Override
public void commitAsync() {
@@ -907,7 +939,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
}
/**
- * Commits offsets returned on the last {@link #poll(long) poll()} for the subscribed list of topics and partitions.
+ * Commit offsets returned on the last {@link #poll(long) poll()} for the subscribed list of topics and partitions.
* <p>
* This commits offsets only to Kafka. The offsets committed using this API will be used on the first fetch after
* every rebalance and also on startup. As such, if you need to store offsets in anything other than Kafka, this API
@@ -929,7 +961,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
}
/**
- * Commits the specified offsets for the specified list of topics and partitions to Kafka.
+ * Commit the specified offsets for the specified list of topics and partitions to Kafka.
* <p>
* This commits offsets to Kafka. The offsets committed using this API will be used on the first fetch after every
* rebalance and also on startup. As such, if you need to store offsets in anything other than Kafka, this API
@@ -1005,7 +1037,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
}
/**
- * Returns the offset of the <i>next record</i> that will be fetched (if a record with that offset exists).
+ * Get the offset of the <i>next record</i> that will be fetched (if a record with that offset exists).
*
* @param partition The partition to get the position for
* @return The offset
@@ -1034,7 +1066,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
}
/**
- * Fetches the last committed offset for the given partition (whether the commit happened by this process or
+ * Get the last committed offset for the given partition (whether the commit happened by this process or
* another). This offset will be used as the position for the consumer in the event of a failure.
* <p>
* This call may block to do a remote call if the partition in question isn't assigned to this consumer or if the
@@ -1139,7 +1171,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
}
/**
- * Resume any partitions which have been paused with {@link #pause(TopicPartition...)}. New calls to
+ * Resume specified partitions which have been paused with {@link #pause(TopicPartition...)}. New calls to
* {@link #poll(long)} will return records from these partitions if there are any to be fetched.
* If the partitions were not previously paused, this method is a no-op.
* @param partitions The partitions which should be resumed