You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jk...@apache.org on 2015/01/30 03:39:35 UTC
[5/7] kafka git commit: KAFKA-1760: New consumer.
http://git-wip-us.apache.org/repos/asf/kafka/blob/0699ff2c/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
index 76efc21..300c551 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
@@ -9,380 +9,447 @@
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
-*/
+ */
package org.apache.kafka.clients.consumer;
+import java.net.InetSocketAddress;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.kafka.clients.ClientRequest;
+import org.apache.kafka.clients.ClientResponse;
+import org.apache.kafka.clients.ConnectionState;
+import org.apache.kafka.clients.NetworkClient;
+import org.apache.kafka.clients.RequestCompletionHandler;
+import org.apache.kafka.clients.consumer.internals.Heartbeat;
+import org.apache.kafka.clients.consumer.internals.SubscriptionState;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.clients.producer.internals.Metadata;
+import org.apache.kafka.common.Cluster;
+import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.Metric;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.metrics.JmxReporter;
+import org.apache.kafka.common.metrics.Measurable;
import org.apache.kafka.common.metrics.MetricConfig;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.metrics.MetricsReporter;
+import org.apache.kafka.common.metrics.Sensor;
+import org.apache.kafka.common.metrics.stats.Avg;
+import org.apache.kafka.common.metrics.stats.Count;
+import org.apache.kafka.common.metrics.stats.Max;
+import org.apache.kafka.common.metrics.stats.Rate;
import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.network.Selector;
+import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.protocol.types.Struct;
+import org.apache.kafka.common.record.LogEntry;
+import org.apache.kafka.common.record.MemoryRecords;
+import org.apache.kafka.common.requests.ConsumerMetadataRequest;
+import org.apache.kafka.common.requests.ConsumerMetadataResponse;
+import org.apache.kafka.common.requests.FetchRequest;
+import org.apache.kafka.common.requests.FetchRequest.PartitionData;
+import org.apache.kafka.common.requests.FetchResponse;
+import org.apache.kafka.common.requests.HeartbeatRequest;
+import org.apache.kafka.common.requests.HeartbeatResponse;
+import org.apache.kafka.common.requests.JoinGroupRequest;
+import org.apache.kafka.common.requests.JoinGroupResponse;
+import org.apache.kafka.common.requests.ListOffsetRequest;
+import org.apache.kafka.common.requests.ListOffsetResponse;
+import org.apache.kafka.common.requests.OffsetCommitRequest;
+import org.apache.kafka.common.requests.OffsetCommitResponse;
+import org.apache.kafka.common.requests.OffsetFetchRequest;
+import org.apache.kafka.common.requests.OffsetFetchResponse;
+import org.apache.kafka.common.requests.RequestHeader;
+import org.apache.kafka.common.requests.RequestSend;
import org.apache.kafka.common.utils.ClientUtils;
import org.apache.kafka.common.utils.SystemTime;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.common.utils.Utils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.net.InetSocketAddress;
-import java.util.*;
-
/**
* A Kafka client that consumes records from a Kafka cluster.
- * <P>
- * The consumer is <i>thread safe</i> and should generally be shared among all threads for best performance.
* <p>
- * The consumer is single threaded and multiplexes I/O over TCP connections to each of the brokers it
- * needs to communicate with. Failure to close the consumer after use will leak these resources.
+ * It will transparently handle the failure of servers in the Kafka cluster, and transparently adapt as partitions of
+ * data it subscribes to migrate within the cluster. This client also interacts with the server to allow groups of
+ * consumers to load balance consumption using consumer groups (as described below).
+ * <p>
+ * The consumer maintains TCP connections to the necessary brokers to fetch data for the topics it subscribes to.
+ * Failure to close the consumer after use will leak these connections.
+ * <p>
+ * The consumer is thread safe but generally will be used only from within a single thread. The consumer client has no
+ * threads of it's own, all work is done in the caller's thread when calls are made on the various methods exposed.
+ *
+ * <h3>Offsets and Consumer Position</h3>
+ * Kafka maintains a numerical offset for each record in a partition. This offset acts as a kind of unique identifier of
+ * a record within that partition, and also denotes the position of the consumer in the partition. That is, a consumer
+ * which has position 5 has consumed records with offsets 0 through 4 and will next receive record with offset 5. There
+ * are actually two notions of position relevant to the user of the consumer.
+ * <p>
+ * The {@link #position(TopicPartition) position} of the consumer gives the offset of the next record that will be given
+ * out. It will be one larger than the highest offset the consumer has seen in that partition. It automatically advances
+ * every time the consumer receives data calls {@link #poll(long)} and receives messages.
+ * <p>
+ * The {@link #commit(CommitType) committed position} is the last offset that has been saved securely. Should the
+ * process fail and restart, this is the offset that it will recover to. The consumer can either automatically commit
+ * offsets periodically, or it can choose to control this committed position manually by calling
+ * {@link #commit(CommitType) commit}.
+ * <p>
+ * This distinction gives the consumer control over when a record is considered consumed. It is discussed in further
+ * detail below.
+ *
+ * <h3>Consumer Groups</h3>
+ *
+ * Kafka uses the concept of <i>consumer groups</i> to allow a pool of processes to divide up the work of consuming and
+ * processing records. These processes can either be running on the same machine or, as is more likely, they can be
+ * distributed over many machines to provide additional scalability and fault tolerance for processing.
+ * <p>
+ * Each Kafka consumer must specify a consumer group that it belongs to. Kafka will deliver each message in the
+ * subscribed topics to one process in each consumer group. This is achieved by balancing the partitions in the topic
+ * over the consumer processes in each group. So if there is a topic with four partitions, and a consumer group with two
+ * processes, each process would consume from two partitions. This group membership is maintained dynamically: if a
+ * process fails the partitions assigned to it will be reassigned to other processes in the same group, and if a new
+ * process joins the group, partitions will be moved from existing consumers to this new process.
+ * <p>
+ * So if two processes subscribe to a topic both specifying different groups they will each get all the records in that
+ * topic; if they both specify the same group they will each get about half the records.
+ * <p>
+ * Conceptually you can think of a consumer group as being a single logical subscriber that happens to be made up of
+ * multiple processes. As a multi-subscriber system, Kafka naturally supports having any number of consumer groups for a
+ * given topic without duplicating data (additional consumers are actually quite cheap).
+ * <p>
+ * This is a slight generalization of the functionality that is common in messaging systems. To get semantics similar to
+ * a queue in a traditional messaging system all processes would be part of a single consumer group and hence record
+ * delivery would be balanced over the group like with a queue. Unlike a traditional messaging system, though, you can
+ * have multiple such groups. To get semantics similar to pub-sub in a traditional messaging system each process would
+ * have it's own consumer group, so each process would subscribe to all the records published to the topic.
+ * <p>
+ * In addition, when offsets are committed they are always committed for a given consumer group.
+ * <p>
+ * It is also possible for the consumer to manually specify the partitions it subscribes to, which disables this dynamic
+ * partition balancing.
+ *
* <h3>Usage Examples</h3>
- * The consumer APIs offer flexibility to cover a variety of consumption use cases. Following are some examples to demonstrate the correct use of
- * the available APIs. Each of the examples assumes the presence of a user implemented process() method that processes a given batch of messages
- * and returns the offset of the latest processed message per partition. Note that process() is not part of the consumer API and is only used as
- * a convenience method to demonstrate the different use cases of the consumer APIs. Here is a sample implementation of such a process() method.
+ * The consumer APIs offer flexibility to cover a variety of consumption use cases. Here are some examples to
+ * demonstrate how to use them.
+ *
+ * <h4>Simple Processing</h4>
+ * This example demonstrates the simplest usage of Kafka's consumer api.
+ *
* <pre>
- * {@code
- * private Map<TopicPartition, Long> process(Map<String, ConsumerRecord<byte[], byte[]> records) {
- * Map<TopicPartition, Long> processedOffsets = new HashMap<TopicPartition, Long>();
- * for(Entry<String, ConsumerRecords<byte[], byte[]>> recordMetadata : records.entrySet()) {
- * List<ConsumerRecord<byte[], byte[]>> recordsPerTopic = recordMetadata.getValue().records();
- * for(int i = 0;i < recordsPerTopic.size();i++) {
- * ConsumerRecord<byte[], byte[]> record = recordsPerTopic.get(i);
- * // process record
- * try {
- * processedOffsets.put(record.topicAndpartition(), record.offset());
- * } catch (Exception e) {
- * e.printStackTrace();
- * }
- * }
+ * Properties props = new Properties();
+ * props.put("metadata.broker.list", "localhost:9092");
+ * props.put("group.id", "test");
+ * props.put("enable.auto.commit", "true");
+ * props.put("auto.commit.interval.ms", "1000");
+ * props.put("session.timeout.ms", "30000");
+ * props.put("key.serializer", "org.apache.kafka.common.serializers.StringSerializer");
+ * props.put("value.serializer", "org.apache.kafka.common.serializers.StringSerializer");
+ * KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);
+ * consumer.subscribe("foo", "bar");
+ * while (true) {
+ * ConsumerRecords<String, String> records = consumer.poll(100);
+ * for (ConsumerRecord<String, String> record : records)
+ * System.out.printf("offset = %d, key = %s, value = %s", record.offset(), record.key(), record.value());
* }
- * return processedOffsets;
- * }
- * }
* </pre>
+ *
+ * Setting <code>enable.auto.commit</code> means that offsets are committed automatically with a frequency controlled by
+ * the config <code>auto.commit.interval.ms</code>.
* <p>
- * This example demonstrates how the consumer can be used to leverage Kafka's group management functionality for automatic consumer load
- * balancing and failover. This example assumes that the offsets are stored in Kafka and are automatically committed periodically,
- * as controlled by the auto.commit.interval.ms config
- * <pre>
- * {@code
- * Properties props = new Properties();
- * props.put("metadata.broker.list", "localhost:9092");
- * props.put("group.id", "test");
- * props.put("session.timeout.ms", "1000");
- * props.put("enable.auto.commit", "true");
- * props.put("auto.commit.interval.ms", "10000");
- * KafkaConsumer<byte[], byte[]> consumer = new KafkaConsumer<byte[], byte[]>(props);
- * consumer.subscribe("foo", "bar");
- * boolean isRunning = true;
- * while(isRunning) {
- * Map<String, ConsumerRecords<byte[], byte[]>> records = consumer.poll(100);
- * process(records);
- * }
- * consumer.close();
- * }
- * </pre>
- * This example demonstrates how the consumer can be used to leverage Kafka's group management functionality for automatic consumer load
- * balancing and failover. This example assumes that the offsets are stored in Kafka and are manually committed using
- * the commit(boolean) API. This example also demonstrates rewinding the consumer's offsets if processing of the consumed
- * messages fails. Note that this method of rewinding offsets using {@link #seek(Map) seek(offsets)} is only useful for rewinding the offsets
- * of the current consumer instance. As such, this will not trigger a rebalance or affect the fetch offsets for the other consumer instances.
+ * The connection to the cluster is bootstrapped by specifying a list of one or more brokers to contact using the
+ * configuration <code>metadata.broker.list</code>. This list is just used to discover the rest of the brokers in the
+ * cluster and need not be an exhaustive list of servers in the cluster (though you may want to specify more than one in
+ * case there are servers down when the client is connecting).
+ * <p>
+ * In this example the client is subscribing to the topics <i>foo</i> and <i>bar</i> as part of a group of consumers
+ * called <i>test</i> as described above.
+ * <p>
+ * The broker will automatically detect failed processes in the <i>test</i> group by using a heartbeat mechanism. The
+ * consumer will automatically ping the cluster periodically, which let's the cluster know that it is alive. As long as
+ * the consumer is able to do this it is considered alive and retains the right to consume from the partitions assigned
+ * to it. If it stops heartbeating for a period of time longer than <code>session.timeout.ms</code> then it will be
+ * considered dead and it's partitions will be assigned to another process.
+ * <p>
+ * The serializers settings specify how to turn the objects the user provides into bytes. By specifying the string
+ * serializers we are saying that our record's key and value will just be simple strings.
+ *
+ * <h4>Controlling When Messages Are Considered Consumed</h4>
+ *
+ * In this example we will consume a batch of records and batch them up in memory, when we have sufficient records
+ * batched we will insert them into a database. If we allowed offsets to auto commit as in the previous example messages
+ * would be considered consumed after they were given out by the consumer, and it would be possible that our process
+ * could fail after we have read messages into our in-memory buffer but before they had been inserted into the database.
+ * To avoid this we will manually commit the offsets only once the corresponding messages have been inserted into the
+ * database. This gives us exact control of when a message is considered consumed. This raises the opposite possibility:
+ * the process could fail in the interval after the insert into the database but before the commit (even though this
+ * would likely just be a few milliseconds, it is a possibility). In this case the process that took over consumption
+ * would consume from last committed offset and would repeat the insert of the last batch of data. Used in this way
+ * Kafka provides what is often called "at-least once delivery" guarantees, as each message will likely be delivered one
+ * time but in failure cases could be duplicated.
+ *
* <pre>
- * {@code
- * Properties props = new Properties();
- * props.put("metadata.broker.list", "localhost:9092");
- * props.put("group.id", "test");
- * props.put("session.timeout.ms", "1000");
- * props.put("enable.auto.commit", "false");
- * KafkaConsumer<byte[], byte[]> consumer = new KafkaConsumer<byte[], byte[]>(props);
- * consumer.subscribe("foo", "bar");
- * int commitInterval = 100;
- * int numRecords = 0;
- * boolean isRunning = true;
- * Map<TopicPartition, Long> consumedOffsets = new HashMap<TopicPartition, Long>();
- * while(isRunning) {
- * Map<String, ConsumerRecords<byte[], byte[]>> records = consumer.poll(100);
- * try {
- * Map<TopicPartition, Long> lastConsumedOffsets = process(records);
- * consumedOffsets.putAll(lastConsumedOffsets);
- * numRecords += records.size();
- * // commit offsets for all partitions of topics foo, bar synchronously, owned by this consumer instance
- * if(numRecords % commitInterval == 0)
- * consumer.commit(false);
- * } catch(Exception e) {
- * try {
- * // rewind consumer's offsets for failed partitions
- * // assume failedPartitions() returns the list of partitions for which the processing of the last batch of messages failed
- * List<TopicPartition> failedPartitions = failedPartitions();
- * Map<TopicPartition, Long> offsetsToRewindTo = new HashMap<TopicPartition, Long>();
- * for(TopicPartition failedPartition : failedPartitions) {
- * // rewind to the last consumed offset for the failed partition. Since process() failed for this partition, the consumed offset
- * // should still be pointing to the last successfully processed offset and hence is the right offset to rewind consumption to.
- * offsetsToRewindTo.put(failedPartition, consumedOffsets.get(failedPartition));
+ * Properties props = new Properties();
+ * props.put("metadata.broker.list", "localhost:9092");
+ * props.put("group.id", "test");
+ * props.put("enable.auto.commit", "false");
+ * props.put("auto.commit.interval.ms", "1000");
+ * props.put("session.timeout.ms", "30000");
+ * props.put("key.serializer", "org.apache.kafka.common.serializers.StringSerializer");
+ * props.put("value.serializer", "org.apache.kafka.common.serializers.StringSerializer");
+ * KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);
+ * consumer.subscribe("foo", "bar");
+ * int commitInterval = 200;
+ * List<ConsumerRecord<String, String>> buffer = new ArrayList<ConsumerRecord<String, String>>();
+ * while (true) {
+ * ConsumerRecords<String, String> records = consumer.poll(100);
+ * for (ConsumerRecord<String, String> record : records) {
+ * buffer.add(record);
+ * if (buffer.size() >= commitInterval) {
+ * insertIntoDb(buffer);
+ * consumer.commit(CommitType.SYNC);
+ * buffer.clear();
* }
- * // seek to new offsets only for partitions that failed the last process()
- * consumer.seek(offsetsToRewindTo);
- * } catch(Exception e) { break; } // rewind failed
+ * }
* }
- * }
- * consumer.close();
- * }
* </pre>
+ *
+ * <h4>Subscribing To Specific Partitions</h4>
+ *
+ * In the previous examples we subscribed to the topics we were interested in and let Kafka give our particular process
+ * a fair share of the partitions for those topics. This provides a simple load balancing mechanism so multiple
+ * instances of our program can divided up the work of processing records.
* <p>
- * This example demonstrates how to rewind the offsets of the entire consumer group. It is assumed that the user has chosen to use Kafka's
- * group management functionality for automatic consumer load balancing and failover. This example also assumes that the offsets are stored in
- * Kafka. If group management is used, the right place to systematically rewind offsets for <i>every</i> consumer instance is inside the
- * ConsumerRebalanceCallback. The onPartitionsAssigned callback is invoked after the consumer is assigned a new set of partitions on rebalance
- * <i>and</i> before the consumption restarts post rebalance. This is the right place to supply the newly rewound offsets to the consumer. It
- * is recommended that if you foresee the requirement to ever reset the consumer's offsets in the presence of group management, that you
- * always configure the consumer to use the ConsumerRebalanceCallback with a flag that protects whether or not the offset rewind logic is used.
- * This method of rewinding offsets is useful if you notice an issue with your message processing after successful consumption and offset commit.
- * And you would like to rewind the offsets for the entire consumer group as part of rolling out a fix to your processing logic. In this case,
- * you would configure each of your consumer instances with the offset rewind configuration flag turned on and bounce each consumer instance
- * in a rolling restart fashion. Each restart will trigger a rebalance and eventually all consumer instances would have rewound the offsets for
- * the partitions they own, effectively rewinding the offsets for the entire consumer group.
- * <pre>
- * {@code
- * Properties props = new Properties();
- * props.put("metadata.broker.list", "localhost:9092");
- * props.put("group.id", "test");
- * props.put("session.timeout.ms", "1000");
- * props.put("enable.auto.commit", "false");
- * KafkaConsumer<byte[], byte[]> consumer = new KafkaConsumer<byte[], byte[]>(
- * props,
- * new ConsumerRebalanceCallback() {
- * boolean rewindOffsets = true; // should be retrieved from external application config
- * public void onPartitionsAssigned(Consumer<?, ?> consumer, Collection<TopicPartition> partitions) {
- * Map<TopicPartition, Long> latestCommittedOffsets = consumer.committed(partitions);
- * if(rewindOffsets)
- * Map<TopicPartition, Long> newOffsets = rewindOffsets(latestCommittedOffsets, 100);
- * consumer.seek(newOffsets);
- * }
- * public void onPartitionsRevoked(Consumer<?, ?> consumer, Collection<TopicPartition> partitions) {
- * consumer.commit(true);
- * }
- * // this API rewinds every partition back by numberOfMessagesToRewindBackTo messages
- * private Map<TopicPartition, Long> rewindOffsets(Map<TopicPartition, Long> currentOffsets,
- * long numberOfMessagesToRewindBackTo) {
- * Map<TopicPartition, Long> newOffsets = new HashMap<TopicPartition, Long>();
- * for(Map.Entry<TopicPartition, Long> offset : currentOffsets.entrySet())
- * newOffsets.put(offset.getKey(), offset.getValue() - numberOfMessagesToRewindBackTo);
- * return newOffsets;
- * }
- * });
- * consumer.subscribe("foo", "bar");
- * int commitInterval = 100;
- * int numRecords = 0;
- * boolean isRunning = true;
- * Map<TopicPartition, Long> consumedOffsets = new HashMap<TopicPartition, Long>();
- * while(isRunning) {
- * Map<String, ConsumerRecords<byte[], byte[]>> records = consumer.poll(100);
- * Map<TopicPartition, Long> lastConsumedOffsets = process(records);
- * consumedOffsets.putAll(lastConsumedOffsets);
- * numRecords += records.size();
- * // commit offsets for all partitions of topics foo, bar synchronously, owned by this consumer instance
- * if(numRecords % commitInterval == 0)
- * consumer.commit(consumedOffsets, true);
- * }
- * consumer.commit(true);
- * consumer.close();
- * }
- * </pre>
- * This example demonstrates how the consumer can be used to leverage Kafka's group management functionality along with custom offset storage.
- * In this example, the assumption made is that the user chooses to store the consumer offsets outside Kafka. This requires the user to
- * plugin logic for retrieving the offsets from a custom store and provide the offsets to the consumer in the ConsumerRebalanceCallback
- * callback. The onPartitionsAssigned callback is invoked after the consumer is assigned a new set of partitions on rebalance <i>and</i>
- * before the consumption restarts post rebalance. This is the right place to supply offsets from a custom store to the consumer.
+ * In this mode the consumer will just get the partitions it subscribes to and if the consumer instance fails no attempt
+ * will be made to rebalance partitions to other instances.
* <p>
- * Similarly, the user would also be required to plugin logic for storing the consumer's offsets to a custom store. The onPartitionsRevoked
- * callback is invoked right after the consumer has stopped fetching data and before the partition ownership changes. This is the right place
- * to commit the offsets for the current set of partitions owned by the consumer.
- * <pre>
- * {@code
- * Properties props = new Properties();
- * props.put("metadata.broker.list", "localhost:9092");
- * props.put("group.id", "test");
- * props.put("session.timeout.ms", "1000");
- * props.put("enable.auto.commit", "false"); // since enable.auto.commit only applies to Kafka based offset storage
- * KafkaConsumer<byte[], byte[]> consumer = new KafkaConsumer<byte[], byte[]>(
- * props,
- * new ConsumerRebalanceCallback() {
- * public void onPartitionsAssigned(Consumer<?,?> consumer, Collection<TopicPartition> partitions) {
- * Map<TopicPartition, Long> lastCommittedOffsets = getLastCommittedOffsetsFromCustomStore(partitions);
- * consumer.seek(lastCommittedOffsets);
- * }
- * public void onPartitionsRevoked(Consumer<?,?> consumer, Collection<TopicPartition> partitions) {
- * Map<TopicPartition, Long> offsets = getLastConsumedOffsets(partitions);
- * commitOffsetsToCustomStore(offsets);
- * }
- * // following APIs should be implemented by the user for custom offset management
- * private Map<TopicPartition, Long> getLastCommittedOffsetsFromCustomStore(Collection<TopicPartition> partitions) {
- * return null;
- * }
- * private Map<TopicPartition, Long> getLastConsumedOffsets(Collection<TopicPartition> partitions) { return null; }
- * private void commitOffsetsToCustomStore(Map<TopicPartition, Long> offsets) {}
- * });
- * consumer.subscribe("foo", "bar");
- * int commitInterval = 100;
- * int numRecords = 0;
- * boolean isRunning = true;
- * Map<TopicPartition, Long> consumedOffsets = new HashMap<TopicPartition, Long>();
- * while(isRunning) {
- * Map<String, ConsumerRecords<byte[], byte[]>> records = consumer.poll(100);
- * Map<TopicPartition, Long> lastConsumedOffsets = process(records);
- * consumedOffsets.putAll(lastConsumedOffsets);
- * numRecords += records.size();
- * // commit offsets for all partitions of topics foo, bar synchronously, owned by this consumer instance
- * if(numRecords % commitInterval == 0)
- * commitOffsetsToCustomStore(consumedOffsets);
- * }
- * consumer.commit(true);
- * consumer.close();
- * }
- * </pre>
- * This example demonstrates how the consumer can be used to subscribe to specific partitions of certain topics and consume upto the latest
- * available message for each of those partitions before shutting down. When used to subscribe to specific partitions, the user foregoes
- * the group management functionality and instead relies on manually configuring the consumer instances to subscribe to a set of partitions.
- * This example assumes that the user chooses to use Kafka based offset storage. The user still has to specify a group.id to use Kafka
- * based offset management. However, session.timeout.ms is not required since the Kafka consumer only does automatic failover when group
- * management is used.
- * <pre>
- * {@code
- * Properties props = new Properties();
- * props.put("metadata.broker.list", "localhost:9092");
- * props.put("group.id", "test");
- * props.put("enable.auto.commit", "true");
- * props.put("auto.commit.interval.ms", "10000");
- * KafkaConsumer<byte[], byte[]> consumer = new KafkaConsumer<byte[], byte[]>(props);
- * // subscribe to some partitions of topic foo
- * TopicPartition partition0 = new TopicPartition("foo", 0);
- * TopicPartition partition1 = new TopicPartition("foo", 1);
- * TopicPartition[] partitions = new TopicPartition[2];
- * partitions[0] = partition0;
- * partitions[1] = partition1;
- * consumer.subscribe(partitions);
- * // find the last committed offsets for partitions 0,1 of topic foo
- * Map<TopicPartition, Long> lastCommittedOffsets = consumer.committed(Arrays.asList(partitions));
- * // seek to the last committed offsets to avoid duplicates
- * consumer.seek(lastCommittedOffsets);
- * // find the offsets of the latest available messages to know where to stop consumption
- * Map<TopicPartition, Long> latestAvailableOffsets = consumer.offsetsBeforeTime(-2, Arrays.asList(partitions));
- * boolean isRunning = true;
- * Map<TopicPartition, Long> consumedOffsets = new HashMap<TopicPartition, Long>();
- * while(isRunning) {
- * Map<String, ConsumerRecords<byte[], byte[]>> records = consumer.poll(100);
- * Map<TopicPartition, Long> lastConsumedOffsets = process(records);
- * consumedOffsets.putAll(lastConsumedOffsets);
- * for(TopicPartition partition : partitions) {
- * if(consumedOffsets.get(partition) >= latestAvailableOffsets.get(partition))
- * isRunning = false;
- * else
- * isRunning = true;
- * }
- * }
- * consumer.commit(true);
- * consumer.close();
- * }
- * </pre>
- * This example demonstrates how the consumer can be used to subscribe to specific partitions of certain topics and consume upto the latest
- * available message for each of those partitions before shutting down. When used to subscribe to specific partitions, the user foregoes
- * the group management functionality and instead relies on manually configuring the consumer instances to subscribe to a set of partitions.
- * This example assumes that the user chooses to use custom offset storage.
+ * There are several cases where this makes sense:
+ * <ul>
+ * <li>The first case is if the process is maintaining some kind of local state associated with that partition (like a
+ * local on-disk key-value store) and hence it should only get records for the partition it is maintaining on disk.
+ * <li>Another case is if the process itself is highly available and will be restarted if it fails (perhaps using a
+ * cluster management framework like YARN, Mesos, or AWS facilities, or as part of a stream processing framework). In
+ * this case there is no need for Kafka to detect the failure and reassign the partition, rather the consuming process
+ * will be restarted on another machine.
+ * </ul>
+ * <p>
+ * This mode is easy to specify, rather than subscribing to the topic, the consumer just subscribes to particular
+ * partitions:
+ *
* <pre>
- * {@code
- * Properties props = new Properties();
- * props.put("metadata.broker.list", "localhost:9092");
- * KafkaConsumer<byte[], byte[]> consumer = new KafkaConsumer<byte[], byte[]>(props);
- * // subscribe to some partitions of topic foo
- * TopicPartition partition0 = new TopicPartition("foo", 0);
- * TopicPartition partition1 = new TopicPartition("foo", 1);
- * TopicPartition[] partitions = new TopicPartition[2];
- * partitions[0] = partition0;
- * partitions[1] = partition1;
- * consumer.subscribe(partitions);
- * Map<TopicPartition, Long> lastCommittedOffsets = getLastCommittedOffsetsFromCustomStore();
- * // seek to the last committed offsets to avoid duplicates
- * consumer.seek(lastCommittedOffsets);
- * // find the offsets of the latest available messages to know where to stop consumption
- * Map<TopicPartition, Long> latestAvailableOffsets = consumer.offsetsBeforeTime(-2, Arrays.asList(partitions));
- * boolean isRunning = true;
- * Map<TopicPartition, Long> consumedOffsets = new HashMap<TopicPartition, Long>();
- * while(isRunning) {
- * Map<String, ConsumerRecords<byte[], byte[]>> records = consumer.poll(100);
- * Map<TopicPartition, Long> lastConsumedOffsets = process(records);
- * consumedOffsets.putAll(lastConsumedOffsets);
- * // commit offsets for partitions 0,1 for topic foo to custom store
- * commitOffsetsToCustomStore(consumedOffsets);
- * for(TopicPartition partition : partitions) {
- * if(consumedOffsets.get(partition) >= latestAvailableOffsets.get(partition))
- * isRunning = false;
- * else
- * isRunning = true;
- * }
- * }
- * commitOffsetsToCustomStore(consumedOffsets);
- * consumer.close();
- * }
+ * String topic = "foo";
+ * TopicPartition partition0 = new TopicPartition(topic, 0);
+ * TopicPartition partition1 = new TopicPartition(topic, 1);
+ * consumer.subscribe(partition0);
+ * consumer.subscribe(partition1);
* </pre>
+ *
+ * The group that the consumer specifies is still used for committing offsets, but now the set of partitions will only
+ * be changed if the consumer specifies new partitions, and no attempt at failure detection will be made.
+ * <p>
+ * It isn't possible to mix both subscription to specific partitions (with no load balancing) and to topics (with load
+ * balancing) using the same consumer instance.
+ *
+ * <h4>Managing Your Own Offsets</h4>
+ *
+ * The consumer application need not use Kafka's built-in offset storage, it can store offsets in a store of it's own
+ * choosing. The primary use case for this is allowing the application to store both the offset and the results of the
+ * consumption in the same system in a way that both the results and offsets are stored atomically. This is not always
+ * possible, but when it is it will make the consumption fully atomic and give "exactly once" semantics that are
+ * stronger than the default "at-least once" semantics you get with Kafka's offset commit functionality.
+ * <p>
+ * Here are a couple of examples of this type of usage:
+ * <ul>
+ * <li>If the results of the consumption are being stored in a relational database, storing the offset in the database
+ * as well can allow committing both the results and offset in a single transaction. Thus either the transaction will
+ * succeed and the offset will be updated based on what was consumed or the result will not be stored and the offset
+ * won't be updated.
+ * <li>If the results are being stored in a local store it may be possible to store the offset there as well. For
+ * example a search index could be built by subscribing to a particular partition and storing both the offset and the
+ * indexed data together. If this is done in a way that is atomic, it is often possible to have it be the case that even
+ * if a crash occurs that causes unsync'd data to be lost, whatever is left has the corresponding offset stored as well.
+ * This means that in this case the indexing process that comes back having lost recent updates just resumes indexing
+ * from what it has ensuring that no updates are lost.
+ * </ul>
+ *
+ * Each record comes with it's own offset, so to manage your own offset you just need to do the following:
+ * <ol>
+ * <li>Configure <code>enable.auto.commit=false</code>
+ * <li>Use the offset provided with each {@link ConsumerRecord} to save your position.
+ * <li>On restart restore the position of the consumer using {@link #seek(TopicPartition, long)}.
+ * </ol>
+ *
+ * This type of usage is simplest when the partition assignment is also done manually (this would be likely in the
+ * search index use case described above). If the partition assignment is done automatically special care will also be
+ * needed to handle the case where partition assignments change. This can be handled using a special callback specified
+ * using <code>rebalance.callback.class</code>, which specifies an implementation of the interface
+ * {@link ConsumerRebalanceCallback}. When partitions are taken from a consumer the consumer will want to commit its
+ * offset for those partitions by implementing
+ * {@link ConsumerRebalanceCallback#onPartitionsRevoked(Consumer, Collection)}. When partitions are assigned to a
+ * consumer, the consumer will want to look up the offset for those new partitions an correctly initialize the consumer
+ * to that position by implementing {@link ConsumerRebalanceCallback#onPartitionsAssigned(Consumer, Collection)}.
+ * <p>
+ * Another common use for {@link ConsumerRebalanceCallback} is to flush any caches the application maintains for
+ * partitions that are moved elsewhere.
+ *
+ * <h4>Controlling The Consumer's Position</h4>
+ *
+ * In most use cases the consumer will simply consume records from beginning to end, periodically committing it's
+ * position (either automatically or manually). However Kafka allows the consumer to manually control it's position,
+ * moving forward or backwards in a partition at will. This means a consumer can re-consume older records, or skip to
+ * the most recent records without actually consuming the intermediate records.
+ * <p>
+ * There are several instances where manually controlling the consumer's position can be useful.
+ * <p>
+ * One case is for time-sensitive record processing it may make sense for a consumer that falls far enough behind to not
+ * attempt to catch up processing all records, but rather just skip to the most recent records.
+ * <p>
+ * Another use case is for a system that maintains local state as described in the previous section. In such a system
+ * the consumer will want to initialize it's position on start-up to whatever is contained in the local store. Likewise
+ * if the local state is destroyed (say because the disk is lost) the state may be recreated on a new machine by
+ * reconsuming all the data and recreating the state (assuming that Kafka is retaining sufficient history).
+ *
+ * Kafka allows specifying the position using {@link #seek(TopicPartition, long)} to specify the new position. Special
+ * methods for seeking to the earliest and latest offset the server maintains are also available (
+ * {@link #seekToBeginning(TopicPartition...)} and {@link #seekToEnd(TopicPartition...)} respectively).
+ *
+ * <h3>Multithreaded Processing</h3>
+ *
+ * The Kafka consumer is threadsafe but coarsely synchronized. All network I/O happens in the thread of the application
+ * making the call. We have intentionally avoided implementing a particular threading model for processing.
+ * <p>
+ * This leaves several options for implementing multi-threaded processing of records.
+ *
+ * <h4>1. One Consumer Per Thread</h4>
+ *
+ * A simple option is to give each thread it's own consumer instance. Here are the pros and cons of this approach:
+ * <ul>
+ * <li><b>PRO</b>: It is the easiest to implement
+ * <li><b>PRO</b>: It is often the fastest as no inter-thread co-ordination is needed
+ * <li><b>PRO</b>: It makes in-order processing on a per-partition basis very easy to implement (each thread just
+ * processes messages in the order it receives them).
+ * <li><b>CON</b>: More consumers means more TCP connections to the cluster (one per thread). In general Kafka handles
+ * connections very efficiently so this is generally a small cost.
+ * <li><b>CON</b>: Multiple consumers means more requests being sent to the server and slightly less batching of data
+ * which can cause some drop in I/O throughput.
+ * <li><b>CON</b>: The number of total threads across all processes will be limited by the total number of partitions.
+ * </ul>
+ *
+ * <h4>2. Decouple Consumption and Processing</h4>
+ *
+ * Another alternative is to have one or more consumer threads that do all data consumption and hands off
+ * {@link ConsumerRecords} instances to a blocking queue consumed by a pool of processor threads that actually handle
+ * the record processing.
+ *
+ * This option likewise has pros and cons:
+ * <ul>
+ * <li><b>PRO</b>: This option allows independently scaling the number of consumers and processors. This makes it
+ * possible to have a single consumer that feeds many processor threads, avoiding any limitation on partitions.
+ * <li><b>CON</b>: Guaranteeing order across the processors requires particular care as the threads will execute
+ * independently an earlier chunk of data may actually be processed after a later chunk of data just due to the luck of
+ * thread execution timing. For processing that has no ordering requirements this is not a problem.
+ * <li><b>CON</b>: Manually committing the position becomes harder as it requires that all threads co-ordinate to ensure
+ * that processing is complete for that partition.
+ * </ul>
+ *
+ * There are many possible variations on this approach. For example each processor thread can have it's own queue, and
+ * the consumer threads can hash into these queues using the TopicPartition to ensure in-order consumption and simplify
+ * commit.
+ *
*/
-public class KafkaConsumer<K,V> implements Consumer<K,V> {
+public class KafkaConsumer<K, V> implements Consumer<K, V> {
private static final Logger log = LoggerFactory.getLogger(KafkaConsumer.class);
+ private static final long EARLIEST_OFFSET_TIMESTAMP = -2L;
+ private static final long LATEST_OFFSET_TIMESTAMP = -1L;
+ private static final AtomicInteger consumerAutoId = new AtomicInteger(1);
- private final long metadataFetchTimeoutMs;
- private final long totalMemorySize;
- private final Metrics metrics;
- private final Set<String> subscribedTopics;
- private final Set<TopicPartition> subscribedPartitions;
+ private final Time time;
+ private final ConsumerMetrics metrics;
private final Deserializer<K> keyDeserializer;
private final Deserializer<V> valueDeserializer;
+ private final SubscriptionState subscriptions;
+ private final Metadata metadata;
+ private final Heartbeat heartbeat;
+ private final NetworkClient client;
+ private final int maxWaitMs;
+ private final int minBytes;
+ private final int fetchSize;
+ private final boolean autoCommit;
+ private final long autoCommitIntervalMs;
+ private final String group;
+ private final long sessionTimeoutMs;
+ private final long retryBackoffMs;
+ private final String partitionAssignmentStrategy;
+ private final AutoOffsetResetStrategy offsetResetStrategy;
+ private final ConsumerRebalanceCallback rebalanceCallback;
+ private final List<PartitionRecords<K, V>> records;
+ private final boolean checkCrcs;
+ private long lastCommitAttemptMs;
+ private String consumerId;
+ private Node consumerCoordinator;
+ private boolean closed = false;
+ private int generation;
/**
* A consumer is instantiated by providing a set of key-value pairs as configuration. Valid configuration strings
- * are documented <a href="http://kafka.apache.org/documentation.html#consumerconfigs">here</a>. Values can be
- * either strings or Objects of the appropriate type (for example a numeric configuration would accept either the
+ * are documented <a href="http://kafka.apache.org/documentation.html#consumerconfigs" >here</a>. Values can be
+ * either strings or objects of the appropriate type (for example a numeric configuration would accept either the
* string "42" or the integer 42).
* <p>
* Valid configuration strings are documented at {@link ConsumerConfig}
- * @param configs The consumer configs
+ *
+ * @param configs The consumer configs
*/
public KafkaConsumer(Map<String, Object> configs) {
- this(configs, null);
+ this(configs, null, null, null);
}
/**
- * A consumer is instantiated by providing a set of key-value pairs as configuration and a {@link ConsumerRebalanceCallback}
- * implementation
+ * A consumer is instantiated by providing a set of key-value pairs as configuration, a
+ * {@link ConsumerRebalanceCallback} implementation, a key and a value {@link Deserializer}.
* <p>
* Valid configuration strings are documented at {@link ConsumerConfig}
- * @param configs The consumer configs
- * @param callback A callback interface that the user can implement to manage customized offsets on the start and end of
- * every rebalance operation.
+ *
+ * @param configs The consumer configs
+ * @param callback A callback interface that the user can implement to manage customized offsets on the start and
+ * end of every rebalance operation.
+ * @param keyDeserializer The deserializer for key that implements {@link Deserializer}. The configure() method
+ * won't be called in the consumer when the deserializer is passed in directly.
+ * @param valueDeserializer The deserializer for value that implements {@link Deserializer}. The configure() method
+ * won't be called in the consumer when the deserializer is passed in directly.
*/
- public KafkaConsumer(Map<String, Object> configs, ConsumerRebalanceCallback callback) {
- this(configs, callback, null, null);
- }
-
- /**
- * A consumer is instantiated by providing a set of key-value pairs as configuration, a {@link ConsumerRebalanceCallback}
- * implementation, a key and a value {@link Deserializer}.
- * <p>
- * Valid configuration strings are documented at {@link ConsumerConfig}
- * @param configs The consumer configs
- * @param callback A callback interface that the user can implement to manage customized offsets on the start and end of
- * every rebalance operation.
- * @param keyDeserializer The deserializer for key that implements {@link Deserializer}. The configure() method won't
- * be called in the consumer when the deserializer is passed in directly.
- * @param valueDeserializer The deserializer for value that implements {@link Deserializer}. The configure() method
- * won't be called in the consumer when the deserializer is passed in directly.
- */
- public KafkaConsumer(Map<String, Object> configs, ConsumerRebalanceCallback callback, Deserializer<K> keyDeserializer, Deserializer<V> valueDeserializer) {
+ public KafkaConsumer(Map<String, Object> configs,
+ ConsumerRebalanceCallback callback,
+ Deserializer<K> keyDeserializer,
+ Deserializer<V> valueDeserializer) {
this(new ConsumerConfig(addDeserializerToConfig(configs, keyDeserializer, valueDeserializer)),
- callback, keyDeserializer, valueDeserializer);
+ callback,
+ keyDeserializer,
+ valueDeserializer);
}
private static Map<String, Object> addDeserializerToConfig(Map<String, Object> configs,
- Deserializer<?> keyDeserializer, Deserializer<?> valueDeserializer) {
+ Deserializer<?> keyDeserializer,
+ Deserializer<?> valueDeserializer) {
Map<String, Object> newConfigs = new HashMap<String, Object>();
newConfigs.putAll(configs);
if (keyDeserializer != null)
@@ -393,24 +460,13 @@ public class KafkaConsumer<K,V> implements Consumer<K,V> {
}
/**
- * A consumer is instantiated by providing a {@link java.util.Properties} object as configuration.
- * Valid configuration strings are documented at {@link ConsumerConfig}
+ * A consumer is instantiated by providing a {@link java.util.Properties} object as configuration. Valid
+ * configuration strings are documented at {@link ConsumerConfig} A consumer is instantiated by providing a
+ * {@link java.util.Properties} object as configuration. Valid configuration strings are documented at
+ * {@link ConsumerConfig}
*/
public KafkaConsumer(Properties properties) {
- this(properties, null);
- }
-
- /**
- * A consumer is instantiated by providing a {@link java.util.Properties} object as configuration and a
- * {@link ConsumerRebalanceCallback} implementation.
- * <p>
- * Valid configuration strings are documented at {@link ConsumerConfig}
- * @param properties The consumer configuration properties
- * @param callback A callback interface that the user can implement to manage customized offsets on the start and end of
- * every rebalance operation.
- */
- public KafkaConsumer(Properties properties, ConsumerRebalanceCallback callback) {
- this(properties, callback, null, null);
+ this(properties, null, null, null);
}
/**
@@ -418,21 +474,28 @@ public class KafkaConsumer<K,V> implements Consumer<K,V> {
* {@link ConsumerRebalanceCallback} implementation, a key and a value {@link Deserializer}.
* <p>
* Valid configuration strings are documented at {@link ConsumerConfig}
+ *
* @param properties The consumer configuration properties
- * @param callback A callback interface that the user can implement to manage customized offsets on the start and end of
- * every rebalance operation.
- * @param keyDeserializer The deserializer for key that implements {@link Deserializer}. The configure() method won't
- * be called in the consumer when the deserializer is passed in directly.
- * @param valueDeserializer The deserializer for value that implements {@link Deserializer}. The configure() method
- * won't be called in the consumer when the deserializer is passed in directly.
- */
- public KafkaConsumer(Properties properties, ConsumerRebalanceCallback callback, Deserializer<K> keyDeserializer, Deserializer<V> valueDeserializer) {
+ * @param callback A callback interface that the user can implement to manage customized offsets on the start and
+ * end of every rebalance operation.
+ * @param keyDeserializer The deserializer for key that implements {@link Deserializer}. The configure() method
+ * won't be called in the consumer when the deserializer is passed in directly.
+ * @param valueDeserializer The deserializer for value that implements {@link Deserializer}. The configure() method
+ * won't be called in the consumer when the deserializer is passed in directly.
+ */
+ public KafkaConsumer(Properties properties,
+ ConsumerRebalanceCallback callback,
+ Deserializer<K> keyDeserializer,
+ Deserializer<V> valueDeserializer) {
this(new ConsumerConfig(addDeserializerToConfig(properties, keyDeserializer, valueDeserializer)),
- callback, keyDeserializer, valueDeserializer);
+ callback,
+ keyDeserializer,
+ valueDeserializer);
}
private static Properties addDeserializerToConfig(Properties properties,
- Deserializer<?> keyDeserializer, Deserializer<?> valueDeserializer) {
+ Deserializer<?> keyDeserializer,
+ Deserializer<?> valueDeserializer) {
Properties newProperties = new Properties();
newProperties.putAll(properties);
if (keyDeserializer != null)
@@ -442,17 +505,12 @@ public class KafkaConsumer<K,V> implements Consumer<K,V> {
return newProperties;
}
- private KafkaConsumer(ConsumerConfig config, ConsumerRebalanceCallback callback, Deserializer<K> keyDeserializer, Deserializer<V> valueDeserializer) {
- log.trace("Starting the Kafka consumer");
- subscribedTopics = new HashSet<String>();
- subscribedPartitions = new HashSet<TopicPartition>();
- this.metrics = new Metrics(new MetricConfig(),
- Collections.singletonList((MetricsReporter) new JmxReporter("kafka.consumer.")),
- new SystemTime());
- this.metadataFetchTimeoutMs = config.getLong(ConsumerConfig.METADATA_FETCH_TIMEOUT_CONFIG);
- this.totalMemorySize = config.getLong(ConsumerConfig.TOTAL_BUFFER_MEMORY_CONFIG);
- List<InetSocketAddress> addresses = ClientUtils.parseAndValidateAddresses(config.getList(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG));
-
+ @SuppressWarnings("unchecked")
+ private KafkaConsumer(ConsumerConfig config,
+ ConsumerRebalanceCallback callback,
+ Deserializer<K> keyDeserializer,
+ Deserializer<V> valueDeserializer) {
+ log.debug("Starting the Kafka consumer");
if (keyDeserializer == null)
this.keyDeserializer = config.getConfiguredInstance(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
Deserializer.class);
@@ -463,181 +521,1072 @@ public class KafkaConsumer<K,V> implements Consumer<K,V> {
Deserializer.class);
else
this.valueDeserializer = valueDeserializer;
+ if (callback == null)
+ this.rebalanceCallback = config.getConfiguredInstance(ConsumerConfig.CONSUMER_REBALANCE_CALLBACK_CLASS_CONFIG,
+ ConsumerRebalanceCallback.class);
+ else
+ this.rebalanceCallback = callback;
+ this.time = new SystemTime();
+ this.maxWaitMs = config.getInt(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG);
+ this.minBytes = config.getInt(ConsumerConfig.FETCH_MIN_BYTES_CONFIG);
+ this.fetchSize = config.getInt(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG);
+ this.group = config.getString(ConsumerConfig.GROUP_ID_CONFIG);
+ this.records = new LinkedList<PartitionRecords<K, V>>();
+ this.sessionTimeoutMs = config.getLong(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG);
+ this.heartbeat = new Heartbeat(this.sessionTimeoutMs, time.milliseconds());
+ this.partitionAssignmentStrategy = config.getString(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG);
+ this.offsetResetStrategy = AutoOffsetResetStrategy.valueOf(config.getString(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG)
+ .toUpperCase());
+ this.checkCrcs = config.getBoolean(ConsumerConfig.CHECK_CRCS_CONFIG);
+ this.autoCommit = config.getBoolean(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG);
+ this.autoCommitIntervalMs = config.getLong(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG);
+ this.lastCommitAttemptMs = time.milliseconds();
+
+ MetricConfig metricConfig = new MetricConfig().samples(config.getInt(ConsumerConfig.METRICS_NUM_SAMPLES_CONFIG))
+ .timeWindow(config.getLong(ConsumerConfig.METRICS_SAMPLE_WINDOW_MS_CONFIG),
+ TimeUnit.MILLISECONDS);
+ String clientId = config.getString(ConsumerConfig.CLIENT_ID_CONFIG);
+ String jmxPrefix = "kafka.consumer";
+ if(clientId .length() <= 0)
+ clientId = "consumer-" + consumerAutoId.getAndIncrement();
+ List<MetricsReporter> reporters = config.getConfiguredInstances(ConsumerConfig.METRIC_REPORTER_CLASSES_CONFIG,
+ MetricsReporter.class);
+ reporters.add(new JmxReporter(jmxPrefix));
+ Metrics metrics = new Metrics(metricConfig, reporters, time);
+ this.retryBackoffMs = config.getLong(ConsumerConfig.RETRY_BACKOFF_MS_CONFIG);
+ this.metadata = new Metadata(retryBackoffMs, config.getLong(ConsumerConfig.METADATA_MAX_AGE_CONFIG));
+ List<InetSocketAddress> addresses = ClientUtils.parseAndValidateAddresses(config.getList(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG));
+ this.metadata.update(Cluster.bootstrap(addresses), 0);
+
+ String metricsGroup = "consumer";
+ Map<String, String> metricsTags = new LinkedHashMap<String, String>();
+ metricsTags.put("client-id", clientId);
+ long reconnectBackoffMs = config.getLong(ConsumerConfig.RECONNECT_BACKOFF_MS_CONFIG);
+ int sendBuffer = config.getInt(ConsumerConfig.SEND_BUFFER_CONFIG);
+ int receiveBuffer = config.getInt(ConsumerConfig.RECEIVE_BUFFER_CONFIG);
+ this.client = new NetworkClient(new Selector(metrics, time, metricsGroup, metricsTags),
+ this.metadata,
+ clientId,
+ 100,
+ reconnectBackoffMs,
+ sendBuffer,
+ receiveBuffer);
+ this.subscriptions = new SubscriptionState();
+ this.metrics = new ConsumerMetrics(metrics, metricsGroup, metricsTags);
config.logUnused();
- log.debug("Kafka consumer started");
+
+ this.consumerCoordinator = null;
+ this.consumerId = "";
+ this.generation = -1;
+ log.debug("Kafka consumer created");
+ }
+
+ /**
+ * The set of partitions currently assigned to this consumer. If subscription happened by directly subscribing to
+ * partitions using {@link #subscribe(TopicPartition...)} then this will simply return the list of partitions that
+ * were subscribed to. If subscription was done by specifying only the topic using {@link #subscribe(String...)}
+ * then this will give the set of topics currently assigned to the consumer (which may be none if the assignment
+ * hasn't happened yet, or the partitions are in the process of getting reassigned).
+ */
+ public synchronized Set<TopicPartition> subscriptions() {
+ return Collections.unmodifiableSet(this.subscriptions.assignedPartitions());
}
/**
* Incrementally subscribes to the given list of topics and uses the consumer's group management functionality
* <p>
- * As part of group management, the consumer will keep track of the list of consumers that belong to a particular group and
- * will trigger a rebalance operation if one of the following events trigger -
+ * As part of group management, the consumer will keep track of the list of consumers that belong to a particular
+ * group and will trigger a rebalance operation if one of the following events trigger -
* <ul>
- * <li> Number of partitions change for any of the subscribed list of topics
- * <li> Topic is created or deleted
- * <li> An existing member of the consumer group dies
- * <li> A new member is added to an existing consumer group via the join API
- * </ul>
+ * <li>Number of partitions change for any of the subscribed list of topics
+ * <li>Topic is created or deleted
+ * <li>An existing member of the consumer group dies
+ * <li>A new member is added to an existing consumer group via the join API
+ * </ul>
+ *
* @param topics A variable list of topics that the consumer wants to subscribe to
*/
@Override
- public void subscribe(String... topics) {
- if(subscribedPartitions.size() > 0)
- throw new IllegalStateException("Subcription to topics and partitions is mutually exclusive");
- for(String topic:topics)
- subscribedTopics.add(topic);
- // TODO: trigger a rebalance operation
+ public synchronized void subscribe(String... topics) {
+ ensureNotClosed();
+ log.debug("Subscribed to topic(s): ", Utils.join(topics, ", "));
+ for (String topic : topics)
+ this.subscriptions.subscribe(topic);
+ metadata.addTopics(topics);
}
/**
- * Incrementally subscribes to a specific topic partition and does not use the consumer's group management functionality. As such,
- * there will be no rebalance operation triggered when group membership or cluster and topic metadata change.
+ * Incrementally subscribes to a specific topic partition and does not use the consumer's group management
+ * functionality. As such, there will be no rebalance operation triggered when group membership or cluster and topic
+ * metadata change.
* <p>
+ *
* @param partitions Partitions to incrementally subscribe to
*/
@Override
- public void subscribe(TopicPartition... partitions) {
- if(subscribedTopics.size() > 0)
- throw new IllegalStateException("Subcription to topics and partitions is mutually exclusive");
- for(TopicPartition partition:partitions)
- subscribedPartitions.add(partition);
+ public synchronized void subscribe(TopicPartition... partitions) {
+ ensureNotClosed();
+ log.debug("Subscribed to partitions(s): ", Utils.join(partitions, ", "));
+ for (TopicPartition tp : partitions) {
+ this.subscriptions.subscribe(tp);
+ metadata.addTopics(tp.topic());
+ }
}
/**
- * Unsubscribe from the specific topics. This will trigger a rebalance operation and messages for this topic will not be returned
- * from the next {@link #poll(long) poll()} onwards
+ * Unsubscribe from the specific topics. This will trigger a rebalance operation and records for this topic will not
+ * be returned from the next {@link #poll(long) poll()} onwards
+ *
* @param topics Topics to unsubscribe from
*/
- public void unsubscribe(String... topics) {
+ public synchronized void unsubscribe(String... topics) {
+ ensureNotClosed();
+ log.debug("Unsubscribed from topic(s): ", Utils.join(topics, ", "));
// throw an exception if the topic was never subscribed to
- for(String topic:topics) {
- if(!subscribedTopics.contains(topic))
- throw new IllegalStateException("Topic " + topic + " was never subscribed to. subscribe(" + topic + ") should be called prior" +
- " to unsubscribe(" + topic + ")");
- subscribedTopics.remove(topic);
- }
- // TODO trigger a rebalance operation
+ for (String topic : topics)
+ this.subscriptions.unsubscribe(topic);
}
/**
- * Unsubscribe from the specific topic partitions. Messages for these partitions will not be returned from the next
+ * Unsubscribe from the specific topic partitions. records for these partitions will not be returned from the next
* {@link #poll(long) poll()} onwards
+ *
* @param partitions Partitions to unsubscribe from
*/
- public void unsubscribe(TopicPartition... partitions) {
+ public synchronized void unsubscribe(TopicPartition... partitions) {
+ ensureNotClosed();
+ log.debug("Unsubscribed from partitions(s): ", Utils.join(partitions, ", "));
// throw an exception if the partition was never subscribed to
- for(TopicPartition partition:partitions) {
- if(!subscribedPartitions.contains(partition))
- throw new IllegalStateException("Partition " + partition + " was never subscribed to. subscribe(new TopicPartition(" +
- partition.topic() + "," + partition.partition() + ") should be called prior" +
- " to unsubscribe(new TopicPartition(" + partition.topic() + "," + partition.partition() + ")");
- subscribedPartitions.remove(partition);
- }
- // trigger a rebalance operation
+ for (TopicPartition partition : partitions)
+ this.subscriptions.unsubscribe(partition);
}
-
+
/**
- * Fetches data for the topics or partitions specified using one of the subscribe APIs. It is an error to not have subscribed to
- * any topics or partitions before polling for data.
- * <p>
- * The offset used for fetching the data is governed by whether or not {@link #seek(Map) seek(offsets)}
- * is used. If {@link #seek(Map) seek(offsets)} is used, it will use the specified offsets on startup and
- * on every rebalance, to consume data from that offset sequentially on every poll. If not, it will use the last checkpointed offset
- * using {@link #commit(Map, boolean) commit(offsets, sync)}
- * for the subscribed list of partitions.
- * @param timeout The time, in milliseconds, spent waiting in poll if data is not available. If 0, waits indefinitely. Must not be negative
+ * Fetches data for the topics or partitions specified using one of the subscribe APIs. It is an error to not have
+ * subscribed to any topics or partitions before polling for data.
+ * <p>
+ * The offset used for fetching the data is governed by whether or not {@link #seek(TopicPartition, long)} is used.
+ * If {@link #seek(TopicPartition, long)} is used, it will use the specified offsets on startup and on every
+ * rebalance, to consume data from that offset sequentially on every poll. If not, it will use the last checkpointed
+ * offset using {@link #commit(Map, CommitType) commit(offsets, sync)} for the subscribed list of partitions.
+ *
+ * @param timeout The time, in milliseconds, spent waiting in poll if data is not available. If 0, waits
+ * indefinitely. Must not be negative
* @return map of topic to records since the last fetch for the subscribed list of topics and partitions
+ *
+ * @throws NoOffsetForPartitionException If there is no stored offset for a subscribed partition and no automatic
+ * offset reset policy has been configured.
*/
@Override
- public Map<String, ConsumerRecords<K,V>> poll(long timeout) {
- // TODO Auto-generated method stub
- return null;
+ public synchronized ConsumerRecords<K, V> poll(long timeout) {
+ ensureNotClosed();
+ long now = time.milliseconds();
+
+ if (subscriptions.partitionsAutoAssigned()) {
+ // get partition assignment if needed
+ if (subscriptions.needsPartitionAssignment()) {
+ joinGroup(now);
+ } else if (!heartbeat.isAlive(now)) {
+ log.error("Failed heartbeat check.");
+ coordinatorDead();
+ } else if (heartbeat.shouldHeartbeat(now)) {
+ initiateHeartbeat(now);
+ }
+ }
+
+ // fetch positions if we have partitions we're subscribed to that we
+ // don't know the offset for
+ if (!subscriptions.hasAllFetchPositions())
+ fetchMissingPositionsOrResetThem(this.subscriptions.missingFetchPositions(), now);
+
+ // maybe autocommit position
+ if (shouldAutoCommit(now))
+ commit(CommitType.ASYNC);
+
+ /*
+ * initiate any needed fetches, then block for the timeout the user specified
+ */
+ Cluster cluster = this.metadata.fetch();
+ reinstateFetches(cluster, now);
+ client.poll(timeout, now);
+
+ /*
+ * initiate a fetch request for any nodes that we just got a response from without blocking
+ */
+ reinstateFetches(cluster, now);
+ client.poll(0, now);
+
+ return new ConsumerRecords<K, V>(consumeBufferedRecords());
}
/**
* Commits the specified offsets for the specified list of topics and partitions to Kafka.
* <p>
- * This commits offsets only to Kafka. The offsets committed using this API will be used on the first fetch after every rebalance
- * and also on startup. As such, if you need to store offsets in anything other than Kafka, this API should not be used.
- * @param offsets The list of offsets per partition that should be committed to Kafka.
- * @param sync If true, commit will block until the consumer receives an acknowledgment
- * @return An {@link OffsetMetadata} object that contains the partition, offset and a corresponding error code. Returns null
- * if the sync flag is set to false.
+ * This commits offsets to Kafka. The offsets committed using this API will be used on the first fetch after every
+ * rebalance and also on startup. As such, if you need to store offsets in anything other than Kafka, this API
+ * should not be used.
+ * <p>
+ * A non-blocking commit will attempt to commit offsets asychronously. No error will be thrown if the commit fails.
+ * A blocking commit will wait for a response acknowledging the commit. In the event of an error it will retry until
+ * the commit succeeds.
+ *
+ * @param offsets The list of offsets per partition that should be committed to Kafka.
+ * @param commitType Control whether the commit is blocking
*/
@Override
- public OffsetMetadata commit(Map<TopicPartition, Long> offsets, boolean sync) {
- throw new UnsupportedOperationException();
+ public synchronized void commit(final Map<TopicPartition, Long> offsets, CommitType commitType) {
+ ensureNotClosed();
+ log.debug("Committing offsets ({}): {} ", commitType.toString().toLowerCase(), offsets);
+ long now = time.milliseconds();
+ this.lastCommitAttemptMs = now;
+ if (!offsets.isEmpty()) {
+ Map<TopicPartition, OffsetCommitRequest.PartitionData> offsetData = new HashMap<TopicPartition, OffsetCommitRequest.PartitionData>(offsets.size());
+ for (Map.Entry<TopicPartition, Long> entry : offsets.entrySet())
+ offsetData.put(entry.getKey(), new OffsetCommitRequest.PartitionData(entry.getValue(), now, ""));
+ OffsetCommitRequest req = new OffsetCommitRequest(this.group, this.generation, this.consumerId, offsetData);
+
+ RequestCompletionHandler handler = new RequestCompletionHandler() {
+ public void onComplete(ClientResponse resp) {
+ if (resp.wasDisconnected()) {
+ handleDisconnect(resp, time.milliseconds());
+ } else {
+ OffsetCommitResponse response = new OffsetCommitResponse(resp.responseBody());
+ for (Map.Entry<TopicPartition, Short> entry : response.responseData().entrySet()) {
+ TopicPartition tp = entry.getKey();
+ short errorCode = entry.getValue();
+ long offset = offsets.get(tp);
+ if (errorCode == Errors.NONE.code()) {
+ log.debug("Committed offset {} for partition {}", offset, tp);
+ subscriptions.committed(tp, offset);
+ } else if (errorCode == Errors.CONSUMER_COORDINATOR_NOT_AVAILABLE.code()
+ || errorCode == Errors.NOT_COORDINATOR_FOR_CONSUMER.code()) {
+ coordinatorDead();
+ } else {
+ log.error("Error committing partition {} at offset {}: {}",
+ tp,
+ offset,
+ Errors.forCode(errorCode).exception().getMessage());
+ }
+ }
+ }
+ metrics.commitLatency.record(resp.requestLatencyMs());
+ }
+ };
+
+ if (commitType == CommitType.ASYNC) {
+ this.initiateCoordinatorRequest(ApiKeys.OFFSET_COMMIT, req.toStruct(), handler, now);
+ return;
+ } else {
+ boolean done;
+ do {
+ ClientResponse response = blockingCoordinatorRequest(ApiKeys.OFFSET_COMMIT,
+ req.toStruct(),
+ handler,
+ now);
+
+ // check for errors
+ done = true;
+ OffsetCommitResponse commitResponse = new OffsetCommitResponse(response.responseBody());
+ for (short errorCode : commitResponse.responseData().values()) {
+ if (errorCode != Errors.NONE.code())
+ done = false;
+ }
+ if (!done) {
+ log.debug("Error in offset commit, backing off for {} ms before retrying again.",
+ this.retryBackoffMs);
+ Utils.sleep(this.retryBackoffMs);
+ }
+ } while (!done);
+ }
+ }
}
/**
- * Commits offsets returned on the last {@link #poll(long) poll()} for the subscribed list of topics and
- * partitions.
+ * Commits offsets returned on the last {@link #poll(long) poll()} for the subscribed list of topics and partitions.
* <p>
- * This commits offsets only to Kafka. The offsets committed using this API will be used on the first fetch after every rebalance
- * and also on startup. As such, if you need to store offsets in anything other than Kafka, this API should not be used.
- * @param sync If true, commit will block until the consumer receives an acknowledgment
- * @return An {@link OffsetMetadata} object that contains the partition, offset and a corresponding error code. Returns null
- * if the sync flag is set to false.
+ * This commits offsets only to Kafka. The offsets committed using this API will be used on the first fetch after
+ * every rebalance and also on startup. As such, if you need to store offsets in anything other than Kafka, this API
+ * should not be used.
+ *
+ * @param commitType Whether or not the commit should block until it is acknowledged.
*/
@Override
- public OffsetMetadata commit(boolean sync) {
- throw new UnsupportedOperationException();
+ public synchronized void commit(CommitType commitType) {
+ ensureNotClosed();
+ commit(this.subscriptions.allConsumed(), commitType);
}
/**
- * Overrides the fetch offsets that the consumer will use on the next {@link #poll(long) poll(timeout)}. If this API is invoked
- * for the same partition more than once, the latest offset will be used on the next poll(). Note that you may lose data if this API is
- * arbitrarily used in the middle of consumption, to reset the fetch offsets
+ * Overrides the fetch offsets that the consumer will use on the next {@link #poll(long) poll(timeout)}. If this API
+ * is invoked for the same partition more than once, the latest offset will be used on the next poll(). Note that
+ * you may lose data if this API is arbitrarily used in the middle of consumption, to reset the fetch offsets
*/
@Override
- public void seek(Map<TopicPartition, Long> offsets) {
+ public synchronized void seek(TopicPartition partition, long offset) {
+ ensureNotClosed();
+ log.debug("Seeking to offset {} for partition {}", offset, partition);
+ this.subscriptions.seek(partition, offset);
}
/**
- * Returns the fetch position of the <i>next message</i> for the specified topic partition to be used on the next {@link #poll(long) poll()}
- * @param partitions Partitions for which the fetch position will be returned
- * @return The position from which data will be fetched for the specified partition on the next {@link #poll(long) poll()}
+ * Seek to the first offset for each of the given partitions
*/
- public Map<TopicPartition, Long> position(Collection<TopicPartition> partitions) {
- return null;
+ public synchronized void seekToBeginning(TopicPartition... partitions) {
+ ensureNotClosed();
+ Collection<TopicPartition> parts = partitions.length == 0 ? this.subscriptions.assignedPartitions()
+ : Arrays.asList(partitions);
+ for (TopicPartition tp : parts) {
+ // TODO: list offset call could be optimized by grouping by node
+ seek(tp, listOffset(tp, EARLIEST_OFFSET_TIMESTAMP));
+ }
}
/**
- * Fetches the last committed offsets of partitions that the consumer currently consumes. This API is only relevant if Kafka based offset
- * storage is used. This API can be used in conjunction with {@link #seek(Map) seek(offsets)} to rewind consumption of data.
- * @param partitions The list of partitions to return the last committed offset for
- * @return The list of offsets committed on the last {@link #commit(boolean) commit(sync)}
+ * Seek to the last offset for each of the given partitions
*/
- @Override
- public Map<TopicPartition, Long> committed(Collection<TopicPartition> partitions) {
- // TODO Auto-generated method stub
- throw new UnsupportedOperationException();
+ public synchronized void seekToEnd(TopicPartition... partitions) {
+ ensureNotClosed();
+ Collection<TopicPartition> parts = partitions.length == 0 ? this.subscriptions.assignedPartitions()
+ : Arrays.asList(partitions);
+ for (TopicPartition tp : parts) {
+ // TODO: list offset call could be optimized by grouping by node
+ seek(tp, listOffset(tp, LATEST_OFFSET_TIMESTAMP));
+ }
}
/**
- * Fetches offsets before a certain timestamp. Note that the offsets returned are approximately computed and do not correspond to the exact
- * message at the given timestamp. As such, if the consumer is rewound to offsets returned by this API, there may be duplicate messages
- * returned by the consumer.
- * @param partitions The list of partitions for which the offsets are returned
- * @param timestamp The unix timestamp. Value -1 indicates earliest available timestamp. Value -2 indicates latest available timestamp.
- * @return The offsets per partition before the specified timestamp.
- */
- public Map<TopicPartition, Long> offsetsBeforeTime(long timestamp, Collection<TopicPartition> partitions) {
- return null;
+ * Returns the offset of the <i>next record</i> that will be fetched (if a record with that offset exists).
+ *
+ * @param partition The partition to get the position for
+ * @return The offset
+ * @throws NoOffsetForPartitionException If a position hasn't been set for a given partition, and no reset policy is
+ * available.
+ */
+ public synchronized long position(TopicPartition partition) {
+ ensureNotClosed();
+ if (!this.subscriptions.assignedPartitions().contains(partition))
+ throw new IllegalArgumentException("You can only check the position for partitions assigned to this consumer.");
+ Long offset = this.subscriptions.consumed(partition);
+ if (offset == null) {
+ fetchMissingPositionsOrResetThem(Collections.singleton(partition), time.milliseconds());
+ return this.subscriptions.consumed(partition);
+ } else {
+ return offset;
+ }
}
+ /**
+ * Fetches the last committed offset for the given partition (whether the commit happened by this process or
+ * another). This offset will be used as the position for the consumer in the event of a failure.
+ * <p>
+ * This call may block to do a remote call if the partition in question isn't assigned to this consumer or if the
+ * consumer hasn't yet initialized it's cache of committed offsets.
+ *
+ * @param partition The partition to check
+ * @return The last committed offset or null if no offset has been committed
+ * @throws NoOffsetForPartitionException If no offset has ever been committed by any process for the given
+ * partition.
+ */
+ @Override
+ public synchronized long committed(TopicPartition partition) {
+ ensureNotClosed();
+ Set<TopicPartition> partitionsToFetch;
+ if (subscriptions.assignedPartitions().contains(partition)) {
+ Long committed = this.subscriptions.committed(partition);
+ if (committed != null)
+ return committed;
+ partitionsToFetch = subscriptions.assignedPartitions();
+ } else {
+ partitionsToFetch = Collections.singleton(partition);
+ }
+ this.refreshCommittedOffsets(time.milliseconds(), partitionsToFetch);
+ Long committed = this.subscriptions.committed(partition);
+ if (committed == null)
+ throw new NoOffsetForPartitionException("No offset has been committed for partition " + partition);
+ return committed;
+ }
+
+ /**
+ * Get the metrics kept by the consumer
+ */
@Override
public Map<MetricName, ? extends Metric> metrics() {
- return Collections.unmodifiableMap(this.metrics.metrics());
+ return Collections.unmodifiableMap(this.metrics.metrics.metrics());
}
+ /**
+ * Get metadata about the partitions for a given topic. This method will issue a remote call to the server if it
+ * does not already have any metadata about the given topic.
+ *
+ * @param topic The topic to get partition metadata for
+ * @return The list of partitions
+ */
@Override
- public void close() {
+ public List<PartitionInfo> partitionsFor(String topic) {
+ Cluster cluster = this.metadata.fetch();
+ List<PartitionInfo> parts = cluster.partitionsForTopic(topic);
+ if (parts == null) {
+ metadata.add(topic);
+ awaitMetadataUpdate();
+ parts = metadata.fetch().partitionsForTopic(topic);
+ }
+ return parts;
+ }
+
+ @Override
+ public synchronized void close() {
log.trace("Closing the Kafka consumer.");
- subscribedTopics.clear();
- subscribedPartitions.clear();
- this.metrics.close();
+ this.closed = true;
+ this.metrics.metrics.close();
+ this.client.close();
log.debug("The Kafka consumer has closed.");
}
+
+ private boolean shouldAutoCommit(long now) {
+ return this.autoCommit && this.lastCommitAttemptMs <= now - this.autoCommitIntervalMs;
+ }
+
+ /*
+ * Request a metadata update and wait until it has occurred
+ */
+ private void awaitMetadataUpdate() {
+ int version = this.metadata.requestUpdate();
+ do {
+ long now = time.milliseconds();
+ this.client.poll(this.retryBackoffMs, now);
+ } while (this.metadata.version() == version);
+ }
+
+ /*
+ * Send a join group request to the controller
+ */
+ private void joinGroup(long now) {
+ log.debug("Joining group {}", group);
+
+ // execute the user's callback
+ try {
+ // TODO: Hmmm, is passing the full Consumer like this actually safe?
+ // Need to think about reentrancy...
+ this.rebalanceCallback.onPartitionsRevoked(this, this.subscriptions.assignedPartitions());
+ } catch (Exception e) {
+ log.error("User provided callback " + this.rebalanceCallback.getClass().getName()
+ + " failed on partition revocation: ", e);
+ }
+
+ // join the group
+ JoinGroupRequest jgr = new JoinGroupRequest(group,
+ (int) this.sessionTimeoutMs,
+ new ArrayList<String>(this.subscriptions.subscribedTopics()),
+ this.consumerId,
+ this.partitionAssignmentStrategy);
+ ClientResponse resp = this.blockingCoordinatorRequest(ApiKeys.JOIN_GROUP, jgr.toStruct(), null, now);
+ // process the response
+ JoinGroupResponse response = new JoinGroupResponse(resp.responseBody());
+ log.debug("Joined group: {}", response);
+ Errors.forCode(response.errorCode()).maybeThrow();
+ this.consumerId = response.consumerId();
+ this.subscriptions.changePartitionAssignment(response.assignedPartitions());
+ this.heartbeat.receivedResponse(now);
+
+ // execute the callback
+ try {
+ // TODO: Hmmm, is passing the full Consumer like this actually safe?
+ this.rebalanceCallback.onPartitionsAssigned(this, this.subscriptions.assignedPartitions());
+ } catch (Exception e) {
+ log.error("User provided callback " + this.rebalanceCallback.getClass().getName()
+ + " failed on partition assignment: ", e);
+ }
+
+ // record re-assignment time
+ this.metrics.partitionReassignments.record(time.milliseconds() - now);
+ }
+
+ /*
+ * Empty the record buffer and update the consumed position.
+ */
+ private Map<TopicPartition, List<ConsumerRecord<K, V>>> consumeBufferedRecords() {
+ if (this.subscriptions.needsPartitionAssignment()) {
+ return Collections.emptyMap();
+ } else {
+ Map<TopicPartition, List<ConsumerRecord<K, V>>> drained = new HashMap<TopicPartition, List<ConsumerRecord<K, V>>>();
+ for (PartitionRecords<K, V> part : this.records) {
+ Long consumed = subscriptions.consumed(part.partition);
+ if (this.subscriptions.assignedPartitions().contains(part.partition)
+ && (consumed == null || part.fetchOffset == consumed)) {
+ List<ConsumerRecord<K, V>> partRecs = drained.get(part.partition);
+ if (partRecs == null) {
+ partRecs = part.records;
+ drained.put(part.partition, partRecs);
+ } else {
+ partRecs.addAll(part.records);
+
<TRUNCATED>