You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ne...@apache.org on 2014/05/21 01:52:19 UTC
[1/2] KAFKA-1328 New consumer APIs;
reviewed by Jun Rao and Guozhang Wang
Repository: kafka
Updated Branches:
refs/heads/trunk bf7fb6321 -> c24740c7b
http://git-wip-us.apache.org/repos/asf/kafka/blob/c24740c7/clients/src/test/java/org/apache/kafka/clients/consumer/ConsumerExampleTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/ConsumerExampleTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/ConsumerExampleTest.java
new file mode 100644
index 0000000..0548fb4
--- /dev/null
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/ConsumerExampleTest.java
@@ -0,0 +1,298 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+*/
+package org.apache.kafka.clients.consumer;
+
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Properties;
+
+import org.apache.kafka.common.TopicPartition;
+import org.junit.Test;
+
+/**
+ * TODO: Clean this after the consumer implementation is complete. Until then, it is useful to write some sample test code using the new APIs
+ *
+ */
+public class ConsumerExampleTest {
+ /**
+ * This example demonstrates how to use the consumer to leverage Kafka's group management functionality for automatic consumer load
+ * balancing and failure detection. This example assumes that the offsets are stored in Kafka and are automatically committed periodically,
+ * as controlled by the auto.commit.interval.ms config
+ */
+// @Test
+// public void testConsumerGroupManagementWithAutoOffsetCommits() {
+// Properties props = new Properties();
+// props.put("metadata.broker.list", "localhost:9092");
+// props.put("group.id", "test");
+// props.put("session.timeout.ms", "1000");
+// props.put("auto.commit.enable", "true");
+// props.put("auto.commit.interval.ms", "10000");
+// KafkaConsumer consumer = new KafkaConsumer(props);
+// // subscribe to some topics
+// consumer.subscribe("foo", "bar");
+// boolean isRunning = true;
+// while(isRunning) {
+// Map<String, ConsumerRecords> records = consumer.poll(100);
+// process(records);
+// }
+// consumer.close();
+// }
+
+ /**
+ * This example demonstrates how to use the consumer to leverage Kafka's group management functionality for automatic consumer load
+ * balancing and failure detection. This example assumes that the offsets are stored in Kafka and are manually committed using
+ * either the commit() or commitAsync() APIs. This example also demonstrates rewinding the consumer's offsets if processing of consumed
+ * messages fails.
+ */
+// @Test
+// public void testConsumerGroupManagementWithManualOffsetCommit() {
+// Properties props = new Properties();
+// props.put("metadata.broker.list", "localhost:9092");
+// props.put("group.id", "test");
+// props.put("session.timeout.ms", "1000");
+// props.put("auto.commit.enable", "false");
+// KafkaConsumer consumer = new KafkaConsumer(props);
+// // subscribe to some topics
+// consumer.subscribe("foo", "bar");
+// int commitInterval = 100;
+// int numRecords = 0;
+// boolean isRunning = true;
+// Map<TopicPartition, Long> consumedOffsets = new HashMap<TopicPartition, Long>();
+// while(isRunning) {
+// Map<String, ConsumerRecords> records = consumer.poll(100);
+// try {
+// Map<TopicPartition, Long> lastConsumedOffsets = process(records);
+// consumedOffsets.putAll(lastConsumedOffsets);
+// numRecords += records.size();
+// // commit offsets for all partitions of topics foo, bar synchronously, owned by this consumer instance
+// if(numRecords % commitInterval == 0)
+// consumer.commit(true);
+// } catch(Exception e) {
+// // rewind consumer's offsets for failed partitions
+// List<TopicPartition> failedPartitions = getFailedPartitions();
+// Map<TopicPartition, Long> offsetsToRewindTo = new HashMap<TopicPartition, Long>();
+// for(TopicPartition failedPartition : failedPartitions) {
+// // rewind to the last consumed offset for the failed partition. Since process() failed for this partition, the consumed offset
+// // should still be pointing to the last successfully processed offset and hence is the right offset to rewind consumption to.
+// offsetsToRewindTo.put(failedPartition, consumedOffsets.get(failedPartition));
+// }
+// // seek to new offsets only for partitions that failed the last process()
+// consumer.seek(offsetsToRewindTo);
+// }
+// }
+// consumer.close();
+// }
+
+ private List<TopicPartition> getFailedPartitions() { return null; }
+
+ /**
+ * This example demonstrates the consumer can be used to leverage Kafka's group management functionality along with custom offset storage.
+ * In this example, the assumption made is that the user chooses to store the consumer offsets outside Kafka. This requires the user to
+ * plugin logic for retrieving the offsets from a custom store and provide the offsets to the consumer in the ConsumerRebalanceCallback
+ * callback. The onPartitionsAssigned callback is invoked after the consumer is assigned a new set of partitions on rebalance <i>and</i>
+ * before the consumption restarts post rebalance. This is the right place to supply offsets from a custom store to the consumer.
+ */
+// @Test
+// public void testConsumerRebalanceWithCustomOffsetStore() {
+// Properties props = new Properties();
+// props.put("metadata.broker.list", "localhost:9092");
+// props.put("group.id", "test");
+// props.put("session.timeout.ms", "1000");
+// props.put("auto.commit.enable", "true");
+// props.put("auto.commit.interval.ms", "10000");
+// KafkaConsumer consumer = new KafkaConsumer(props,
+// new ConsumerRebalanceCallback() {
+// public void onPartitionsAssigned(Consumer consumer, Collection<TopicPartition> partitions) {
+// Map<TopicPartition, Long> lastCommittedOffsets = getLastCommittedOffsetsFromCustomStore(partitions);
+// consumer.seek(lastCommittedOffsets);
+// }
+// public void onPartitionsRevoked(Consumer consumer, Collection<TopicPartition> partitions) {
+// Map<TopicPartition, Long> offsets = getLastConsumedOffsets(partitions); // implemented by the user
+// commitOffsetsToCustomStore(offsets); // implemented by the user
+// }
+// private Map<TopicPartition, Long> getLastCommittedOffsetsFromCustomStore(Collection<TopicPartition> partitions) {
+// return null;
+// }
+// private Map<TopicPartition, Long> getLastConsumedOffsets(Collection<TopicPartition> partitions) { return null; }
+// private void commitOffsetsToCustomStore(Map<TopicPartition, Long> offsets) {}
+// });
+// // subscribe to topics
+// consumer.subscribe("foo", "bar");
+// int commitInterval = 100;
+// int numRecords = 0;
+// boolean isRunning = true;
+// while(isRunning) {
+// Map<String, ConsumerRecords> records = consumer.poll(100);
+// Map<TopicPartition, Long> consumedOffsets = process(records);
+// numRecords += records.size();
+// // commit offsets for all partitions of topics foo, bar synchronously, owned by this consumer instance
+// if(numRecords % commitInterval == 0)
+// commitOffsetsToCustomStore(consumedOffsets);
+// }
+// consumer.close();
+// }
+
+ /**
+ * This example demonstrates how the consumer can be used to leverage Kafka's group management functionality along with Kafka based offset storage.
+ * In this example, the assumption made is that the user chooses to use Kafka based offset management.
+ */
+// @Test
+// public void testConsumerRewindWithGroupManagementAndKafkaOffsetStorage() {
+// Properties props = new Properties();
+// props.put("metadata.broker.list", "localhost:9092");
+// props.put("group.id", "test");
+// props.put("session.timeout.ms", "1000");
+// props.put("auto.commit.enable", "false");
+// KafkaConsumer consumer = new KafkaConsumer(props,
+// new ConsumerRebalanceCallback() {
+// boolean rewindOffsets = true;
+// public void onPartitionsAssigned(Consumer consumer, Collection<TopicPartition> partitions) {
+// if(rewindOffsets) {
+// Map<TopicPartition, Long> latestCommittedOffsets = consumer.committed(null);
+// Map<TopicPartition, Long> newOffsets = rewindOffsets(latestCommittedOffsets, 100);
+// consumer.seek(newOffsets);
+// }
+// }
+// public void onPartitionsRevoked(Consumer consumer, Collection<TopicPartition> partitions) {
+// consumer.commit(true);
+// }
+// // this API rewinds every partition back by numberOfMessagesToRewindBackTo messages
+// private Map<TopicPartition, Long> rewindOffsets(Map<TopicPartition, Long> currentOffsets,
+// long numberOfMessagesToRewindBackTo) {
+// Map<TopicPartition, Long> newOffsets = new HashMap<TopicPartition, Long>();
+// for(Map.Entry<TopicPartition, Long> offset : currentOffsets.entrySet()) {
+// newOffsets.put(offset.getKey(), offset.getValue() - numberOfMessagesToRewindBackTo);
+// }
+// return newOffsets;
+// }
+// });
+// // subscribe to topics
+// consumer.subscribe("foo", "bar");
+// int commitInterval = 100;
+// int numRecords = 0;
+// boolean isRunning = true;
+// while(isRunning) {
+// Map<String, ConsumerRecords> records = consumer.poll(100);
+// Map<TopicPartition, Long> consumedOffsets = process(records);
+// numRecords += records.size();
+// // commit offsets for all partitions of topics foo, bar synchronously, owned by this consumer instance
+// if(numRecords % commitInterval == 0)
+// commitOffsetsToCustomStore(consumedOffsets);
+// }
+// consumer.close();
+// }
+
+ /**
+ * This example demonstrates how the consumer can be used to subscribe to specific partitions of certain topics and consume upto the latest
+ * available message for each of those partitions before shutting down. When used to subscribe to specific partitions, the user foregoes
+ * the group management functionality and instead relies on manually configuring the consumer instances to subscribe to a set of partitions.
+ * This example assumes that the user chooses to use Kafka based offset storage. The user still has to specify a group.id to use Kafka
+ * based offset management. However, session.timeout.ms is not required since the Kafka consumer only does failure detection with group
+ * management.
+ */
+// @Test
+// public void testConsumerWithKafkaBasedOffsetManagement() {
+// Properties props = new Properties();
+// props.put("metadata.broker.list", "localhost:9092");
+// props.put("group.id", "test");
+// props.put("auto.commit.enable", "true");
+// props.put("auto.commit.interval.ms", "10000");
+// KafkaConsumer consumer = new KafkaConsumer(props);
+// // subscribe to some partitions of topic foo
+// TopicPartition partition0 = new TopicPartition("foo", 0);
+// TopicPartition partition1 = new TopicPartition("foo", 1);
+// TopicPartition[] partitions = new TopicPartition[2];
+// partitions[0] = partition0;
+// partitions[1] = partition1;
+// consumer.subscribe(partitions);
+// // find the last committed offsets for partitions 0,1 of topic foo
+// Map<TopicPartition, Long> lastCommittedOffsets = consumer.committed(null);
+// // seek to the last committed offsets to avoid duplicates
+// consumer.seek(lastCommittedOffsets);
+// // find the offsets of the latest available messages to know where to stop consumption
+// Map<TopicPartition, Long> latestAvailableOffsets = consumer.offsetsBeforeTime(-2, null);
+// boolean isRunning = true;
+// while(isRunning) {
+// Map<String, ConsumerRecords> records = consumer.poll(100);
+// Map<TopicPartition, Long> consumedOffsets = process(records);
+// for(TopicPartition partition : partitions) {
+// if(consumedOffsets.get(partition) >= latestAvailableOffsets.get(partition))
+// isRunning = false;
+// else
+// isRunning = true;
+// }
+// }
+// consumer.close();
+// }
+
+ /**
+ * This example demonstrates how the consumer can be used to subscribe to specific partitions of certain topics and consume upto the latest
+ * available message for each of those partitions before shutting down. When used to subscribe to specific partitions, the user foregoes
+ * the group management functionality and instead relies on manually configuring the consumer instances to subscribe to a set of partitions.
+ * This example assumes that the user chooses to use custom offset storage.
+ */
+ @Test
+ public void testConsumerWithCustomOffsetManagement() {
+// Properties props = new Properties();
+// props.put("metadata.broker.list", "localhost:9092");
+// KafkaConsumer consumer = new KafkaConsumer(props);
+// // subscribe to some partitions of topic foo
+// TopicPartition partition0 = new TopicPartition("foo", 0);
+// TopicPartition partition1 = new TopicPartition("foo", 1);
+// TopicPartition[] partitions = new TopicPartition[2];
+// partitions[0] = partition0;
+// partitions[1] = partition1;
+// consumer.subscribe(partitions);
+// Map<TopicPartition, Long> lastCommittedOffsets = getLastCommittedOffsetsFromCustomStore();
+// // seek to the last committed offsets to avoid duplicates
+// consumer.seek(lastCommittedOffsets);
+// // find the offsets of the latest available messages to know where to stop consumption
+// Map<TopicPartition, Long> latestAvailableOffsets = consumer.offsetsBeforeTime(-2, null);
+// boolean isRunning = true;
+// while(isRunning) {
+// Map<String, ConsumerRecords> records = consumer.poll(100);
+// Map<TopicPartition, Long> consumedOffsets = process(records);
+// // commit offsets for partitions 0,1 for topic foo to custom store
+// commitOffsetsToCustomStore(consumedOffsets);
+// for(TopicPartition partition : partitions) {
+// if(consumedOffsets.get(partition) >= latestAvailableOffsets.get(partition))
+// isRunning = false;
+// else
+// isRunning = true;
+// }
+// }
+// consumer.close();
+ }
+
+ private Map<TopicPartition, Long> getLastCommittedOffsetsFromCustomStore() { return null; }
+ private void commitOffsetsToCustomStore(Map<TopicPartition, Long> consumedOffsets) {}
+ private Map<TopicPartition, Long> process(Map<String, ConsumerRecords> records) {
+ Map<TopicPartition, Long> processedOffsets = new HashMap<TopicPartition, Long>();
+ for(Entry<String, ConsumerRecords> recordMetadata : records.entrySet()) {
+ List<ConsumerRecord> recordsPerTopic = recordMetadata.getValue().records();
+ for(int i = 0;i < recordsPerTopic.size();i++) {
+ ConsumerRecord record = recordsPerTopic.get(i);
+ // process record
+ try {
+ processedOffsets.put(record.topicAndPartition(), record.offset());
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+ }
+ return processedOffsets;
+ }
+}
[2/2] git commit: KAFKA-1328 New consumer APIs;
reviewed by Jun Rao and Guozhang Wang
Posted by ne...@apache.org.
KAFKA-1328 New consumer APIs; reviewed by Jun Rao and Guozhang Wang
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/c24740c7
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/c24740c7
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/c24740c7
Branch: refs/heads/trunk
Commit: c24740c7b0f6a6e7c66659da786a346650b76766
Parents: bf7fb63
Author: Neha Narkhede <ne...@gmail.com>
Authored: Tue May 20 16:49:31 2014 -0700
Committer: Neha Narkhede <ne...@gmail.com>
Committed: Tue May 20 16:49:31 2014 -0700
----------------------------------------------------------------------
.../apache/kafka/clients/consumer/Consumer.java | 125 ++++
.../kafka/clients/consumer/ConsumerConfig.java | 187 ++++++
.../consumer/ConsumerRebalanceCallback.java | 50 ++
.../kafka/clients/consumer/ConsumerRecord.java | 127 ++++
.../kafka/clients/consumer/ConsumerRecords.java | 61 ++
.../kafka/clients/consumer/KafkaConsumer.java | 575 +++++++++++++++++++
.../kafka/clients/consumer/MockConsumer.java | 192 +++++++
.../kafka/clients/consumer/OffsetMetadata.java | 59 ++
.../kafka/clients/producer/KafkaProducer.java | 26 +-
.../apache/kafka/common/utils/ClientUtils.java | 44 ++
.../clients/consumer/ConsumerExampleTest.java | 298 ++++++++++
11 files changed, 1720 insertions(+), 24 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/c24740c7/clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java
new file mode 100644
index 0000000..227f564
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java
@@ -0,0 +1,125 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+*/
+package org.apache.kafka.clients.consumer;
+
+import java.io.Closeable;
+import java.util.Collection;
+import java.util.Map;
+
+import org.apache.kafka.common.Metric;
+import org.apache.kafka.common.TopicPartition;
+
+/**
+ * @see KafkaConsumer
+ * @see MockConsumer
+ */
+public interface Consumer extends Closeable {
+
+ /**
+ * Incrementally subscribe to the given list of topics. This API is mutually exclusive to
+ * {@link #subscribe(TopicPartition...) subscribe(partitions)}
+ * @param topics A variable list of topics that the consumer subscribes to
+ */
+ public void subscribe(String...topics);
+
+ /**
+ * Incrementally subscribes to a specific topic and partition. This API is mutually exclusive to
+ * {@link #subscribe(String...) subscribe(topics)}
+ * @param partitions Partitions to subscribe to
+ */
+ public void subscribe(TopicPartition... partitions);
+
+ /**
+ * Unsubscribe from the specific topics. Messages for this topic will not be returned from the next {@link #poll(long) poll()}
+ * onwards. This should be used in conjunction with {@link #subscribe(String...) subscribe(topics)}. It is an error to
+ * unsubscribe from a topic that was never subscribed to using {@link #subscribe(String...) subscribe(topics)}
+ * @param topics Topics to unsubscribe from
+ */
+ public void unsubscribe(String... topics);
+
+ /**
+ * Unsubscribe from the specific topic partitions. Messages for these partitions will not be returned from the next
+ * {@link #poll(long) poll()} onwards. This should be used in conjunction with
+ * {@link #subscribe(TopicPartition...) subscribe(topic, partitions)}. It is an error to
+ * unsubscribe from a partition that was never subscribed to using {@link #subscribe(TopicPartition...) subscribe(partitions)}
+ * @param partitions Partitions to unsubscribe from
+ */
+ public void unsubscribe(TopicPartition... partitions);
+
+ /**
+ * Fetches data for the subscribed list of topics and partitions
+ * @param timeout The time, in milliseconds, spent waiting in poll if data is not available. If 0, waits indefinitely. Must not be negative
+ * @return Map of topic to records for the subscribed topics and partitions as soon as data is available for a topic partition. Availability
+ * of data is controlled by {@link ConsumerConfig#FETCH_MIN_BYTES_CONFIG} and {@link ConsumerConfig#FETCH_MAX_WAIT_MS_CONFIG}.
+ * If no data is available for timeout ms, returns an empty list
+ */
+ public Map<String, ConsumerRecords> poll(long timeout);
+
+ /**
+ * Commits offsets returned on the last {@link #poll(long) poll()} for the subscribed list of topics and partitions.
+ * @param sync If true, the commit should block until the consumer receives an acknowledgment
+ * @return An {@link OffsetMetadata} object that contains the partition, offset and a corresponding error code. Returns null
+ * if the sync flag is set to false
+ */
+ public OffsetMetadata commit(boolean sync);
+
+ /**
+ * Commits the specified offsets for the specified list of topics and partitions to Kafka.
+ * @param offsets The map of offsets to commit for the given topic partitions
+ * @param sync If true, commit will block until the consumer receives an acknowledgment
+ * @return An {@link OffsetMetadata} object that contains the partition, offset and a corresponding error code. Returns null
+ * if the sync flag is set to false.
+ */
+ public OffsetMetadata commit(Map<TopicPartition, Long> offsets, boolean sync);
+
+ /**
+ * Overrides the fetch positions that the consumer will use on the next fetch request. If the consumer subscribes to a list of topics
+ * using {@link #subscribe(String...) subscribe(topics)}, an exception will be thrown if the specified topic partition is not owned by
+ * the consumer.
+ * @param offsets The map of fetch positions per topic and partition
+ */
+ public void seek(Map<TopicPartition, Long> offsets);
+
+ /**
+ * Returns the fetch position of the <i>next message</i> for the specified topic partition to be used on the next {@link #poll(long) poll()}
+ * @param partitions Partitions for which the fetch position will be returned
+ * @return The position from which data will be fetched for the specified partition on the next {@link #poll(long) poll()}
+ */
+ public Map<TopicPartition, Long> position(Collection<TopicPartition> partitions);
+
+ /**
+ * Fetches the last committed offsets for the input list of partitions
+ * @param partitions The list of partitions to return the last committed offset for
+ * @return The list of offsets for the specified list of partitions
+ */
+ public Map<TopicPartition, Long> committed(Collection<TopicPartition> partitions);
+
+ /**
+ * Fetches offsets before a certain timestamp
+ * @param timestamp The unix timestamp. Value -1 indicates earliest available timestamp. Value -2 indicates latest available timestamp.
+ * @param partitions The list of partitions for which the offsets are returned
+ * @return The offsets for messages that were written to the server before the specified timestamp.
+ */
+ public Map<TopicPartition, Long> offsetsBeforeTime(long timestamp, Collection<TopicPartition> partitions);
+
+ /**
+ * Return a map of metrics maintained by the consumer
+ */
+ public Map<String, ? extends Metric> metrics();
+
+ /**
+ * Close this consumer
+ */
+ public void close();
+
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/c24740c7/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java
new file mode 100644
index 0000000..46efc0c
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java
@@ -0,0 +1,187 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+*/
+package org.apache.kafka.clients.consumer;
+
+import static org.apache.kafka.common.config.ConfigDef.Range.atLeast;
+
+import java.util.Map;
+
+import org.apache.kafka.common.config.AbstractConfig;
+import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.common.config.ConfigDef.Importance;
+import org.apache.kafka.common.config.ConfigDef.Type;
+
+/**
+ * The consumer configuration keys
+ */
+public class ConsumerConfig extends AbstractConfig {
+ private static final ConfigDef config;
+
+ /**
+ * The identifier of the group this consumer belongs to. This is required if the consumer uses either the
+ * group management functionality by using {@link Consumer#subscribe(String...) subscribe(topics)}. This is also required
+ * if the consumer uses the default Kafka based offset management strategy.
+ */
+ public static final String GROUP_ID_CONFIG = "group.id";
+
+ /**
+ * The timeout after which, if the {@link Consumer#poll(long) poll(timeout)} is not invoked, the consumer is
+ * marked dead and a rebalance operation is triggered for the group identified by {@link #GROUP_ID_CONFIG}. Relevant
+ * if the consumer uses the group management functionality by invoking {@link Consumer#subscribe(String...) subscribe(topics)}
+ */
+ public static final String SESSION_TIMEOUT_MS = "session.timeout.ms";
+
+ /**
+ * The number of times a consumer sends a heartbeat to the co-ordinator broker within a {@link #SESSION_TIMEOUT_MS} time window.
+ * This frequency affects the latency of a rebalance operation since the co-ordinator broker notifies a consumer of a rebalance
+ * in the heartbeat response. Relevant if the consumer uses the group management functionality by invoking
+ * {@link Consumer#subscribe(String...) subscribe(topics)}
+ */
+ public static final String HEARTBEAT_FREQUENCY = "heartbeat.frequency";
+
+ /**
+ * A list of URLs to use for establishing the initial connection to the cluster. This list should be in the form
+ * <code>host1:port1,host2:port2,...</code>. These urls are just used for the initial connection to discover the
+ * full cluster membership (which may change dynamically) so this list need not contain the full set of servers (you
+ * may want more than one, though, in case a server is down).
+ */
+ public static final String BOOTSTRAP_SERVERS_CONFIG = "bootstrap.servers";
+
+ /**
+ * If true, periodically commit to Kafka the offsets of messages already returned by the consumer. This committed
+ * offset will be used when the process fails as the position from which the consumption will begin.
+ */
+ public static final String ENABLE_AUTO_COMMIT_CONFIG = "enable.auto.commit";
+
+ /**
+ * The friendly name of the partition assignment strategy that the server will use to distribute partition ownership
+ * amongst consumer instances when group management is used
+ */
+ public static final String PARTITION_ASSIGNMENT_STRATEGY = "partition.assignment.strategy";
+
+ /**
+ * The frequency in milliseconds that the consumer offsets are committed to Kafka. Relevant if {@link #ENABLE_AUTO_COMMIT_CONFIG}
+ * is turned on.
+ */
+ public static final String AUTO_COMMIT_INTERVAL_MS_CONFIG = "auto.commit.interval.ms";
+
+ /**
+ * What to do when there is no initial offset in Kafka or if an offset is out of range:
+ * <ul>
+ * <li> smallest: automatically reset the offset to the smallest offset
+ * <li> largest: automatically reset the offset to the largest offset
+ * <li> disable: throw exception to the consumer if no previous offset is found for the consumer's group
+ * <li> anything else: throw exception to the consumer.
+ * </ul>
+ */
+ public static final String AUTO_OFFSET_RESET_CONFIG = "auto.offset.reset";
+
+ /**
+ * The minimum amount of data the server should return for a fetch request. If insufficient data is available the
+ * request will wait for that much data to accumulate before answering the request.
+ */
+ public static final String FETCH_MIN_BYTES_CONFIG = "fetch.min.bytes";
+
+ /**
+ * The maximum amount of time the server will block before answering the fetch request if there isn't sufficient
+ * data to immediately satisfy {@link #FETCH_MIN_BYTES_CONFIG}. This should be less than or equal to the timeout used in
+ * {@link KafkaConsumer#poll(long) poll(timeout)}
+ */
+ public static final String FETCH_MAX_WAIT_MS_CONFIG = "fetch.max.wait.ms";
+
+ /**
+ * The maximum amount of time to block waiting to fetch metadata about a topic the first time a record is received
+ * from that topic. The consumer will throw a TimeoutException if it could not successfully fetch metadata within
+ * this timeout.
+ */
+ public static final String METADATA_FETCH_TIMEOUT_CONFIG = "metadata.fetch.timeout.ms";
+
+ /**
+ * The total memory used by the consumer to buffer records received from the server. This config is meant to control
+ * the consumer's memory usage, so it is the size of the global fetch buffer that will be shared across all partitions.
+ */
+ public static final String TOTAL_BUFFER_MEMORY_CONFIG = "total.memory.bytes";
+
+ /**
+ * The minimum amount of memory that should be used to fetch at least one message for a partition. This puts a lower
+ * bound on the consumer's memory utilization when there is at least one message for a partition available on the server.
+ * This size must be at least as large as the maximum message size the server allows or else it is possible for the producer
+ * to send messages larger than the consumer can fetch. If that happens, the consumer can get stuck trying to fetch a large
+ * message on a certain partition.
+ */
+ public static final String FETCH_BUFFER_CONFIG = "fetch.buffer.bytes";
+
+ /**
+ * The id string to pass to the server when making requests. The purpose of this is to be able to track the source
+ * of requests beyond just ip/port by allowing a logical application name to be included.
+ */
+ public static final String CLIENT_ID_CONFIG = "client.id";
+
+ /**
+ * The size of the TCP send buffer to use when fetching data
+ */
+ public static final String SOCKET_RECEIVE_BUFFER_CONFIG = "socket.receive.buffer.bytes";
+
+ /**
+ * The amount of time to wait before attempting to reconnect to a given host. This avoids repeatedly connecting to a
+ * host in a tight loop. This backoff applies to all requests sent by the consumer to the broker.
+ */
+ public static final String RECONNECT_BACKOFF_MS_CONFIG = "reconnect.backoff.ms";
+
+ /** <code>metrics.sample.window.ms</code> */
+ public static final String METRICS_SAMPLE_WINDOW_MS_CONFIG = "metrics.sample.window.ms";
+ private static final String METRICS_SAMPLE_WINDOW_MS_DOC = "The metrics system maintains a configurable number of samples over a fixed window size. This configuration " + "controls the size of the window. For example we might maintain two samples each measured over a 30 second period. "
+ + "When a window expires we erase and overwrite the oldest window.";
+
+ /** <code>metrics.num.samples</code> */
+ public static final String METRICS_NUM_SAMPLES_CONFIG = "metrics.num.samples";
+ private static final String METRICS_NUM_SAMPLES_DOC = "The number of samples maintained to compute metrics.";
+
+ /** <code>metric.reporters</code> */
+ public static final String METRIC_REPORTER_CLASSES_CONFIG = "metric.reporters";
+ private static final String METRIC_REPORTER_CLASSES_DOC = "A list of classes to use as metrics reporters. Implementing the <code>MetricReporter</code> interface allows " + "plugging in classes that will be notified of new metric creation. The JmxReporter is always included to register JMX statistics.";
+
+ static {
+ /* TODO: add config docs */
+ config = new ConfigDef().define(BOOTSTRAP_SERVERS_CONFIG, Type.LIST, Importance.HIGH, "blah blah")
+ .define(GROUP_ID_CONFIG, Type.STRING, Importance.HIGH, "blah blah")
+ .define(SESSION_TIMEOUT_MS, Type.LONG, 1000, Importance.HIGH, "blah blah")
+ .define(HEARTBEAT_FREQUENCY, Type.INT, 3, Importance.MEDIUM, "blah blah")
+ .define(PARTITION_ASSIGNMENT_STRATEGY, Type.STRING, Importance.MEDIUM, "blah blah")
+ .define(METADATA_FETCH_TIMEOUT_CONFIG, Type.LONG, 60 * 1000, atLeast(0), Importance.MEDIUM, "blah blah")
+ .define(ENABLE_AUTO_COMMIT_CONFIG, Type.BOOLEAN, true, Importance.MEDIUM, "blah blah")
+ .define(AUTO_COMMIT_INTERVAL_MS_CONFIG, Type.LONG, 5000, atLeast(0), Importance.LOW, "blah blah")
+ .define(CLIENT_ID_CONFIG, Type.STRING, "", Importance.LOW, "blah blah")
+ .define(TOTAL_BUFFER_MEMORY_CONFIG, Type.LONG, 32 * 1024 * 1024L, atLeast(0L), Importance.LOW, "blah blah")
+ .define(FETCH_BUFFER_CONFIG, Type.INT, 1 * 1024 * 1024, atLeast(0), Importance.HIGH, "blah blah")
+ .define(SOCKET_RECEIVE_BUFFER_CONFIG, Type.INT, 128 * 1024, atLeast(0), Importance.LOW, "blah blah")
+ .define(FETCH_MIN_BYTES_CONFIG, Type.LONG, 1024, atLeast(0), Importance.HIGH, "blah blah")
+ .define(FETCH_MAX_WAIT_MS_CONFIG, Type.LONG, 500, atLeast(0), Importance.LOW, "blah blah")
+ .define(RECONNECT_BACKOFF_MS_CONFIG, Type.LONG, 10L, atLeast(0L), Importance.LOW, "blah blah")
+ .define(AUTO_OFFSET_RESET_CONFIG, Type.STRING, "largest", Importance.MEDIUM, "blah blah")
+ .define(METRICS_SAMPLE_WINDOW_MS_CONFIG,
+ Type.LONG,
+ 30000,
+ atLeast(0),
+ Importance.LOW,
+ METRICS_SAMPLE_WINDOW_MS_DOC)
+ .define(METRICS_NUM_SAMPLES_CONFIG, Type.INT, 2, atLeast(1), Importance.LOW, METRICS_NUM_SAMPLES_DOC)
+ .define(METRIC_REPORTER_CLASSES_CONFIG, Type.LIST, "", Importance.LOW, METRIC_REPORTER_CLASSES_DOC);
+
+ }
+
+ ConsumerConfig(Map<? extends Object, ? extends Object> props) {
+ super(config, props);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/c24740c7/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRebalanceCallback.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRebalanceCallback.java b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRebalanceCallback.java
new file mode 100644
index 0000000..05eb6ce
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRebalanceCallback.java
@@ -0,0 +1,50 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+*/
+package org.apache.kafka.clients.consumer;
+
+import java.util.Collection;
+
+import org.apache.kafka.common.TopicPartition;
+
+/**
+ * A callback interface that the user can implement to manage customized offsets on the start and end of
+ * every rebalance operation. This callback will execute in the user thread as part of the
+ * {@link Consumer#poll(long) poll(long)} API on every rebalance attempt.
+ * Default implementation of the callback will {@link Consumer#seek(java.util.Map) seek(offsets)} to the last committed offsets in the
+ * {@link #onPartitionsAssigned(Consumer, TopicPartition...) onPartitionsAssigned()} callback. And will commit offsets synchronously
+ * for the specified list of partitions to Kafka in the {@link #onPartitionsRevoked(Consumer, TopicPartition...) onPartitionsRevoked()}
+ * callback.
+ */
+public interface ConsumerRebalanceCallback {
+
+ /**
+ * A callback method the user can implement to provide handling of customized offsets on completion of a successful
+ * rebalance operation. This method will be called after a rebalance operation completes and before the consumer
+ * starts fetching data.
+ * <p>
+ * For examples on usage of this API, see Usage Examples section of {@link KafkaConsumer KafkaConsumer}
+ * @param partitions The list of partitions that are assigned to the consumer after rebalance
+ */
+ public void onPartitionsAssigned(Consumer consumer, Collection<TopicPartition> partitions);
+
+ /**
+ * A callback method the user can implement to provide handling of offset commits to a customized store on the
+ * start of a rebalance operation. This method will be called before a rebalance operation starts and after the
+ * consumer stops fetching data. It is recommended that offsets should be committed in this callback to
+ * either Kafka or a custom offset store to prevent duplicate data
+ * <p>
+ * For examples on usage of this API, see Usage Examples section of {@link KafkaConsumer KafkaConsumer}
+ * @param partitions The list of partitions that were assigned to the consumer on the last rebalance
+ */
+ public void onPartitionsRevoked(Consumer consumer, Collection<TopicPartition> partitions);
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/c24740c7/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecord.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecord.java b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecord.java
new file mode 100644
index 0000000..436d8a4
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecord.java
@@ -0,0 +1,127 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+*/
+package org.apache.kafka.clients.consumer;
+
+import org.apache.kafka.common.TopicPartition;
+
+/**
+ * A key/value pair to be received from Kafka. This consists of a topic name and a partition number, from which the
+ * record is being received and an offset that points to the record in a Kafka partition.
+ */
+public final class ConsumerRecord {
+ private final TopicPartition partition;
+ private final byte[] key;
+ private final byte[] value;
+ private final long offset;
+ private volatile Exception error;
+
+ /**
+ * Creates a record to be received from a specified topic and partition
+ *
+ * @param topic The topic this record is received from
+ * @param partitionId The partition of the topic this record is received from
+ * @param key The key of the record, if one exists
+ * @param value The record contents
+ * @param offset The offset of this record in the corresponding Kafka partition
+ */
+ public ConsumerRecord(String topic, int partitionId, byte[] key, byte[] value, long offset) {
+ this(topic, partitionId, key, value, offset, null);
+ }
+
+ /**
+ * Create a record with no key
+ *
+ * @param topic The topic this record is received from
+ * @param partitionId The partition of the topic this record is received from
+ * @param value The record contents
+ * @param offset The offset of this record in the corresponding Kafka partition
+ */
+ public ConsumerRecord(String topic, int partitionId, byte[] value, long offset) {
+ this(topic, partitionId, null, value, offset);
+ }
+
+ /**
+ * Creates a record with an error code
+ * @param topic The topic this record is received from
+ * @param partitionId The partition of the topic this record is received from
+ * @param error The exception corresponding to the error code returned by the server for this topic partition
+ */
+ public ConsumerRecord(String topic, int partitionId, Exception error) {
+ this(topic, partitionId, null, null, -1L, error);
+ }
+
+ private ConsumerRecord(String topic, int partitionId, byte[] key, byte[] value, long offset, Exception error) {
+ if (topic == null)
+ throw new IllegalArgumentException("Topic cannot be null");
+ this.partition = new TopicPartition(topic, partitionId);
+ this.key = key;
+ this.value = value;
+ this.offset = offset;
+ this.error = error;
+ }
+
+ /**
+ * The topic this record is received from
+ */
+ public String topic() {
+ return partition.topic();
+ }
+
+ /**
+ * The partition from which this record is received
+ */
+ public int partition() {
+ return partition.partition();
+ }
+
+ /**
+ * The TopicPartition object containing the topic and partition
+ */
+ public TopicPartition topicAndPartition() {
+ return partition;
+ }
+
+ /**
+ * The key (or null if no key is specified)
+ * @throws Exception The exception thrown while fetching this record.
+ */
+ public byte[] key() throws Exception {
+ if (this.error != null)
+ throw this.error;
+ return key;
+ }
+
+ /**
+ * The value
+ * @throws Exception The exception thrown while fetching this record.
+ */
+ public byte[] value() throws Exception {
+ if (this.error != null)
+ throw this.error;
+ return value;
+ }
+
+ /**
+ * The position of this record in the corresponding Kafka partition.
+ * @throws Exception The exception thrown while fetching this record.
+ */
+ public long offset() throws Exception {
+ if (this.error != null)
+ throw this.error;
+ return offset;
+ }
+
+ public Exception error() {
+ return this.error;
+ }
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/c24740c7/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecords.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecords.java b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecords.java
new file mode 100644
index 0000000..2ecfc8a
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecords.java
@@ -0,0 +1,61 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+*/
+package org.apache.kafka.clients.consumer;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+
+/**
+ * A container that holds the list {@link ConsumerRecord} per partition for a particular topic. There is one for every topic returned by a
+ * {@link Consumer#poll(long)} operation.
+ */
+public class ConsumerRecords {
+
+ private final String topic;
+ private final Map<Integer, List<ConsumerRecord>> recordsPerPartition;
+
+ public ConsumerRecords(String topic, Map<Integer, List<ConsumerRecord>> records) {
+ this.topic = topic;
+ this.recordsPerPartition = records;
+ }
+
+ /**
+ * @param partitions The input list of partitions for a particular topic. If no partitions are
+ * specified, returns records for all partitions
+ * @return The list of {@link ConsumerRecord}s associated with the given partitions.
+ */
+ public List<ConsumerRecord> records(int... partitions) {
+ List<ConsumerRecord> recordsToReturn = new ArrayList<ConsumerRecord>();
+ if(partitions.length == 0) {
+ // return records for all partitions
+ for(Entry<Integer, List<ConsumerRecord>> record : recordsPerPartition.entrySet()) {
+ recordsToReturn.addAll(record.getValue());
+ }
+ } else {
+ for(int partition : partitions) {
+ List<ConsumerRecord> recordsForThisPartition = recordsPerPartition.get(partition);
+ recordsToReturn.addAll(recordsForThisPartition);
+ }
+ }
+ return recordsToReturn;
+ }
+
+ /**
+ * @return The topic of all records associated with this instance
+ */
+ public String topic() {
+ return this.topic;
+ }
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/c24740c7/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
new file mode 100644
index 0000000..18bcc90
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
@@ -0,0 +1,575 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+*/
+package org.apache.kafka.clients.consumer;
+
+import java.net.InetSocketAddress;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+import java.util.Map.Entry;
+import java.util.concurrent.Future;
+
+import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.kafka.common.Metric;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.metrics.JmxReporter;
+import org.apache.kafka.common.metrics.MetricConfig;
+import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.metrics.MetricsReporter;
+import org.apache.kafka.common.utils.ClientUtils;
+import org.apache.kafka.common.utils.SystemTime;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A Kafka client that consumes records from a Kafka cluster.
+ * <P>
+ * The consumer is <i>thread safe</i> and should generally be shared among all threads for best performance.
+ * <p>
+ * The consumer is single threaded and multiplexes I/O over TCP connections to each of the brokers it
+ * needs to communicate with. Failure to close the consumer after use will leak these resources.
+ * <h3>Usage Examples</h3>
+ * The consumer APIs offer flexibility to cover a variety of consumption use cases. Following are some examples to demonstrate the correct use of
+ * the available APIs. Each of the examples assumes the presence of a user implemented process() method that processes a given batch of messages
+ * and returns the offset of the latest processed message per partition. Note that process() is not part of the consumer API and is only used as
+ * a convenience method to demonstrate the different use cases of the consumer APIs. Here is a sample implementation of such a process() method.
+ * <pre>
+ * {@code
+ * private Map<TopicPartition, Long> process(Map<String, ConsumerRecords> records) {
+ * Map<TopicPartition, Long> processedOffsets = new HashMap<TopicPartition, Long>();
+ * for(Entry<String, ConsumerRecords> recordMetadata : records.entrySet()) {
+ * List<ConsumerRecord> recordsPerTopic = recordMetadata.getValue().records();
+ * for(int i = 0;i < recordsPerTopic.size();i++) {
+ * ConsumerRecord record = recordsPerTopic.get(i);
+ * // process record
+ * processedOffsets.put(record.partition(), record.offset());
+ * }
+ * }
+ * return processedOffsets;
+ * }
+ * }
+ * </pre>
+ * <p>
+ * This example demonstrates how the consumer can be used to leverage Kafka's group management functionality for automatic consumer load
+ * balancing and failover. This example assumes that the offsets are stored in Kafka and are automatically committed periodically,
+ * as controlled by the auto.commit.interval.ms config
+ * <pre>
+ * {@code
+ * Properties props = new Properties();
+ * props.put("metadata.broker.list", "localhost:9092");
+ * props.put("group.id", "test");
+ * props.put("session.timeout.ms", "1000");
+ * props.put("enable.auto.commit", "true");
+ * props.put("auto.commit.interval.ms", "10000");
+ * KafkaConsumer consumer = new KafkaConsumer(props);
+ * consumer.subscribe("foo", "bar");
+ * boolean isRunning = true;
+ * while(isRunning) {
+ * Map<String, ConsumerRecords> records = consumer.poll(100, TimeUnit.MILLISECONDS);
+ * process(records);
+ * }
+ * consumer.close();
+ * }
+ * </pre>
+ * This example demonstrates how the consumer can be used to leverage Kafka's group management functionality for automatic consumer load
+ * balancing and failover. This example assumes that the offsets are stored in Kafka and are manually committed using
+ * the commit() API. This example also demonstrates rewinding the consumer's offsets if processing of the consumed
+ * messages fails. Note that this method of rewinding offsets using {@link #seek(Map) seek(offsets)} is only useful for rewinding the offsets
+ * of the current consumer instance. As such, this will not trigger a rebalance or affect the fetch offsets for the other consumer instances.
+ * <pre>
+ * {@code
+ * Properties props = new Properties();
+ * props.put("metadata.broker.list", "localhost:9092");
+ * props.put("group.id", "test");
+ * props.put("session.timeout.ms", "1000");
+ * props.put("enable.auto.commit", "false");
+ * KafkaConsumer consumer = new KafkaConsumer(props);
+ * consumer.subscribe("foo", "bar");
+ * int commitInterval = 100;
+ * int numRecords = 0;
+ * boolean isRunning = true;
+ * Map<TopicPartition, Long> consumedOffsets = new HashMap<TopicPartition, Long>();
+ * while(isRunning) {
+ * Map<String, ConsumerRecords> records = consumer.poll(100, TimeUnit.MILLISECONDS);
+ * try {
+ * Map<TopicPartition, Long> lastConsumedOffsets = process(records);
+ * consumedOffsets.putAll(lastConsumedOffsets);
+ * numRecords += records.size();
+ * // commit offsets for all partitions of topics foo, bar synchronously, owned by this consumer instance
+ * if(numRecords % commitInterval == 0)
+ * consumer.commit();
+ * } catch(Exception e) {
+ * try {
+ * // rewind consumer's offsets for failed partitions
+ * // assume failedPartitions() returns the list of partitions for which the processing of the last batch of messages failed
+ * List<TopicPartition> failedPartitions = failedPartitions();
+ * Map<TopicPartition, Long> offsetsToRewindTo = new HashMap<TopicPartition, Long>();
+ * for(TopicPartition failedPartition : failedPartitions) {
+ * // rewind to the last consumed offset for the failed partition. Since process() failed for this partition, the consumed offset
+ * // should still be pointing to the last successfully processed offset and hence is the right offset to rewind consumption to.
+ * offsetsToRewindTo.put(failedPartition, consumedOffsets.get(failedPartition));
+ * }
+ * // seek to new offsets only for partitions that failed the last process()
+ * consumer.seek(offsetsToRewindTo);
+ * } catch(Exception e) { break; } // rewind failed
+ * }
+ * }
+ * consumer.close();
+ * }
+ * </pre>
+ * <p>
+ * This example demonstrates how to rewind the offsets of the entire consumer group. It is assumed that the user has chosen to use Kafka's
+ * group management functionality for automatic consumer load balancing and failover. This example also assumes that the offsets are stored in
+ * Kafka. If group management is used, the right place to systematically rewind offsets for <i>every</i> consumer instance is inside the
+ * ConsumerRebalanceCallback. The onPartitionsAssigned callback is invoked after the consumer is assigned a new set of partitions on rebalance
+ * <i>and</i> before the consumption restarts post rebalance. This is the right place to supply the newly rewound offsets to the consumer. It
+ * is recommended that if you foresee the requirement to ever reset the consumer's offsets in the presence of group management, that you
+ * always configure the consumer to use the ConsumerRebalanceCallback with a flag that protects whether or not the offset rewind logic is used.
+ * This method of rewinding offsets is useful if you notice an issue with your message processing after successful consumption and offset commit.
+ * And you would like to rewind the offsets for the entire consumer group as part of rolling out a fix to your processing logic. In this case,
+ * you would configure each of your consumer instances with the offset rewind configuration flag turned on and bounce each consumer instance
+ * in a rolling restart fashion. Each restart will trigger a rebalance and eventually all consumer instances would have rewound the offsets for
+ * the partitions they own, effectively rewinding the offsets for the entire consumer group.
+ * <pre>
+ * {@code
+ * Properties props = new Properties();
+ * props.put("metadata.broker.list", "localhost:9092");
+ * props.put("group.id", "test");
+ * props.put("session.timeout.ms", "1000");
+ * props.put("enable.auto.commit", "false");
+ * KafkaConsumer consumer = new KafkaConsumer(props,
+ * new ConsumerRebalanceCallback() {
+ * boolean rewindOffsets = true; // should be retrieved from external application config
+ * public void onPartitionsAssigned(Consumer consumer, TopicPartition...partitions) {
+ * Map<TopicPartition, Long> latestCommittedOffsets = consumer.committed(partitions);
+ * if(rewindOffsets)
+ * Map<TopicPartition, Long> newOffsets = rewindOffsets(latestCommittedOffsets, 100);
+ * consumer.seek(newOffsets);
+ * }
+ * public void onPartitionsRevoked(Consumer consumer, TopicPartition...partitions) {
+ * consumer.commit();
+ * }
+ * // this API rewinds every partition back by numberOfMessagesToRewindBackTo messages
+ * private Map<TopicPartition, Long> rewindOffsets(Map<TopicPartition, Long> currentOffsets,
+ * long numberOfMessagesToRewindBackTo) {
+ * Map<TopicPartition, Long> newOffsets = new HashMap<TopicPartition, Long>();
+ * for(Map.Entry<TopicPartition, Long> offset : currentOffsets.entrySet())
+ * newOffsets.put(offset.getKey(), offset.getValue() - numberOfMessagesToRewindBackTo);
+ * return newOffsets;
+ * }
+ * });
+ * consumer.subscribe("foo", "bar");
+ * int commitInterval = 100;
+ * int numRecords = 0;
+ * boolean isRunning = true;
+ * Map<TopicPartition, Long> consumedOffsets = new HashMap<TopicPartition, Long>();
+ * while(isRunning) {
+ * Map<String, ConsumerRecords> records = consumer.poll(100, TimeUnit.MILLISECONDS);
+ * Map<TopicPartition, Long> lastConsumedOffsets = process(records);
+ * consumedOffsets.putAll(lastConsumedOffsets);
+ * numRecords += records.size();
+ * // commit offsets for all partitions of topics foo, bar synchronously, owned by this consumer instance
+ * if(numRecords % commitInterval == 0)
+ * consumer.commit(consumedOffsets);
+ * }
+ * consumer.close();
+ * }
+ * </pre>
+ * This example demonstrates how the consumer can be used to leverage Kafka's group management functionality along with custom offset storage.
+ * In this example, the assumption made is that the user chooses to store the consumer offsets outside Kafka. This requires the user to
+ * plugin logic for retrieving the offsets from a custom store and provide the offsets to the consumer in the ConsumerRebalanceCallback
+ * callback. The onPartitionsAssigned callback is invoked after the consumer is assigned a new set of partitions on rebalance <i>and</i>
+ * before the consumption restarts post rebalance. This is the right place to supply offsets from a custom store to the consumer.
+ * <p>
+ * Similarly, the user would also be required to plugin logic for storing the consumer's offsets to a custom store. The onPartitionsRevoked
+ * callback is invoked right after the consumer has stopped fetching data and before the partition ownership changes. This is the right place
+ * to commit the offsets for the current set of partitions owned by the consumer.
+ * <pre>
+ * {@code
+ * Properties props = new Properties();
+ * props.put("metadata.broker.list", "localhost:9092");
+ * props.put("group.id", "test");
+ * props.put("session.timeout.ms", "1000");
+ * props.put("enable.auto.commit", "false"); // since enable.auto.commit only applies to Kafka based offset storage
+ * KafkaConsumer consumer = new KafkaConsumer(props,
+ * new ConsumerRebalanceCallback() {
+ * public void onPartitionsAssigned(Consumer consumer, TopicPartition...partitions) {
+ * Map<TopicPartition, Long> lastCommittedOffsets = getLastCommittedOffsetsFromCustomStore(partitions);
+ * consumer.seek(lastCommittedOffsets);
+ * }
+ * public void onPartitionsRevoked(Consumer consumer, TopicPartition...partitions) {
+ * Map<TopicPartition, Long> offsets = getLastConsumedOffsets(partitions);
+ * commitOffsetsToCustomStore(offsets);
+ * }
+ * // following APIs should be implemented by the user for custom offset management
+ * private Map<TopicPartition, Long> getLastCommittedOffsetsFromCustomStore(TopicPartition... partitions) {
+ * return null;
+ * }
+ * private Map<TopicPartition, Long> getLastConsumedOffsets(TopicPartition... partitions) { return null; }
+ * private void commitOffsetsToCustomStore(Map<TopicPartition, Long> offsets) {}
+ * });
+ * consumer.subscribe("foo", "bar");
+ * int commitInterval = 100;
+ * int numRecords = 0;
+ * boolean isRunning = true;
+ * Map<TopicPartition, Long> consumedOffsets = new HashMap<TopicPartition, Long>();
+ * while(isRunning) {
+ * Map<String, ConsumerRecords> records = consumer.poll(100, TimeUnit.MILLISECONDS);
+ * Map<TopicPartition, Long> lastConsumedOffsets = process(records);
+ * consumedOffsets.putAll(lastConsumedOffsets);
+ * numRecords += records.size();
+ * // commit offsets for all partitions of topics foo, bar synchronously, owned by this consumer instance
+ * if(numRecords % commitInterval == 0)
+ * commitOffsetsToCustomStore(consumedOffsets);
+ * }
+ * consumer.close();
+ * }
+ * </pre>
+ * This example demonstrates how the consumer can be used to subscribe to specific partitions of certain topics and consume upto the latest
+ * available message for each of those partitions before shutting down. When used to subscribe to specific partitions, the user foregoes
+ * the group management functionality and instead relies on manually configuring the consumer instances to subscribe to a set of partitions.
+ * This example assumes that the user chooses to use Kafka based offset storage. The user still has to specify a group.id to use Kafka
+ * based offset management. However, session.timeout.ms is not required since the Kafka consumer only does automatic failover when group
+ * management is used.
+ * <pre>
+ * {@code
+ * Properties props = new Properties();
+ * props.put("metadata.broker.list", "localhost:9092");
+ * props.put("group.id", "test");
+ * props.put("enable.auto.commit", "true");
+ * props.put("auto.commit.interval.ms", "10000");
+ * KafkaConsumer consumer = new KafkaConsumer(props);
+ * // subscribe to some partitions of topic foo
+ * TopicPartition partition0 = new TopicPartition("foo", 0);
+ * TopicPartition partition1 = new TopicPartition("foo", 1);
+ * TopicPartition[] partitions = new TopicPartition[2];
+ * partitions[0] = partition0;
+ * partitions[1] = partition1;
+ * consumer.subscribe(partitions);
+ * // find the last committed offsets for partitions 0,1 of topic foo
+ * Map<TopicPartition, Long> lastCommittedOffsets = consumer.committed(partition0, partition1);
+ * // seek to the last committed offsets to avoid duplicates
+ * consumer.seek(lastCommittedOffsets);
+ * // find the offsets of the latest available messages to know where to stop consumption
+ * Map<TopicPartition, Long> latestAvailableOffsets = consumer.offsetsBeforeTime(-2, partition0, partition1);
+ * boolean isRunning = true;
+ * Map<TopicPartition, Long> consumedOffsets = new HashMap<TopicPartition, Long>();
+ * while(isRunning) {
+ * Map<String, ConsumerRecords> records = consumer.poll(100, TimeUnit.MILLISECONDS);
+ * Map<TopicPartition, Long> lastConsumedOffsets = process(records);
+ * consumedOffsets.putAll(lastConsumedOffsets);
+ * for(TopicPartition partition : partitions) {
+ * if(consumedOffsets.get(partition) >= latestAvailableOffsets.get(partition))
+ * isRunning = false;
+ * else
+ * isRunning = true;
+ * }
+ * }
+ * consumer.commit();
+ * consumer.close();
+ * }
+ * </pre>
+ * This example demonstrates how the consumer can be used to subscribe to specific partitions of certain topics and consume upto the latest
+ * available message for each of those partitions before shutting down. When used to subscribe to specific partitions, the user foregoes
+ * the group management functionality and instead relies on manually configuring the consumer instances to subscribe to a set of partitions.
+ * This example assumes that the user chooses to use custom offset storage.
+ * <pre>
+ * {@code
+ * Properties props = new Properties();
+ * props.put("metadata.broker.list", "localhost:9092");
+ * KafkaConsumer consumer = new KafkaConsumer(props);
+ * // subscribe to some partitions of topic foo
+ * TopicPartition partition0 = new TopicPartition("foo", 0);
+ * TopicPartition partition1 = new TopicPartition("foo", 1);
+ * TopicPartition[] partitions = new TopicPartition[2];
+ * partitions[0] = partition0;
+ * partitions[1] = partition1;
+ * consumer.subscribe(partitions);
+ * Map<TopicPartition, Long> lastCommittedOffsets = getLastCommittedOffsetsFromCustomStore();
+ * // seek to the last committed offsets to avoid duplicates
+ * consumer.seek(lastCommittedOffsets);
+ * // find the offsets of the latest available messages to know where to stop consumption
+ * Map<TopicPartition, Long> latestAvailableOffsets = consumer.offsetsBeforeTime(-2, partition0, partition1);
+ * boolean isRunning = true;
+ * Map<TopicPartition, Long> consumedOffsets = new HashMap<TopicPartition, Long>();
+ * while(isRunning) {
+ * Map<String, ConsumerRecords> records = consumer.poll(100, TimeUnit.MILLISECONDS);
+ * Map<TopicPartition, Long> lastConsumedOffsets = process(records);
+ * consumedOffsets.putAll(lastConsumedOffsets);
+ * // commit offsets for partitions 0,1 for topic foo to custom store
+ * commitOffsetsToCustomStore(consumedOffsets);
+ * for(TopicPartition partition : partitions) {
+ * if(consumedOffsets.get(partition) >= latestAvailableOffsets.get(partition))
+ * isRunning = false;
+ * else
+ * isRunning = true;
+ * }
+ * }
+ * commitOffsetsToCustomStore(consumedOffsets);
+ * consumer.close();
+ * }
+ * </pre>
+ */
+public class KafkaConsumer implements Consumer {
+
+ private static final Logger log = LoggerFactory.getLogger(KafkaConsumer.class);
+
+ private final long metadataFetchTimeoutMs;
+ private final long totalMemorySize;
+ private final Metrics metrics;
+ private final Set<String> subscribedTopics;
+ private final Set<TopicPartition> subscribedPartitions;
+
+ /**
+ * A consumer is instantiated by providing a set of key-value pairs as configuration. Valid configuration strings
+ * are documented <a href="http://kafka.apache.org/documentation.html#consumerconfigs">here</a>. Values can be
+ * either strings or Objects of the appropriate type (for example a numeric configuration would accept either the
+ * string "42" or the integer 42).
+ * <p>
+ * Valid configuration strings are documented at {@link ConsumerConfig}
+ * @param configs The consumer configs
+ */
+ public KafkaConsumer(Map<String, Object> configs) {
+ this(new ConsumerConfig(configs), null);
+ }
+
+ /**
+ * A consumer is instantiated by providing a set of key-value pairs as configuration and a {@link ConsumerRebalanceCallback}
+ * implementation
+ * <p>
+ * Valid configuration strings are documented at {@link ConsumerConfig}
+ * @param configs The consumer configs
+ * @param callback A callback interface that the user can implement to manage customized offsets on the start and end of
+ * every rebalance operation.
+ */
+ public KafkaConsumer(Map<String, Object> configs, ConsumerRebalanceCallback callback) {
+ this(new ConsumerConfig(configs), callback);
+ }
+
+ /**
+ * A consumer is instantiated by providing a {@link java.util.Properties} object as configuration.
+ * Valid configuration strings are documented at {@link ConsumerConfig}
+ */
+ public KafkaConsumer(Properties properties) {
+ this(new ConsumerConfig(properties), null);
+ }
+
+ /**
+ * A consumer is instantiated by providing a {@link java.util.Properties} object as configuration and a
+ * {@link ConsumerRebalanceCallback} implementation.
+ * <p>
+ * Valid configuration strings are documented at {@link ConsumerConfig}
+ * @param properties The consumer configuration properties
+ * @param callback A callback interface that the user can implement to manage customized offsets on the start and end of
+ * every rebalance operation.
+ */
+ public KafkaConsumer(Properties properties, ConsumerRebalanceCallback callback) {
+ this(new ConsumerConfig(properties), callback);
+ }
+
+ private KafkaConsumer(ConsumerConfig config) {
+ this(config, null);
+ }
+
+ private KafkaConsumer(ConsumerConfig config, ConsumerRebalanceCallback callback) {
+ log.trace("Starting the Kafka consumer");
+ subscribedTopics = new HashSet<String>();
+ subscribedPartitions = new HashSet<TopicPartition>();
+ this.metrics = new Metrics(new MetricConfig(),
+ Collections.singletonList((MetricsReporter) new JmxReporter("kafka.consumer.")),
+ new SystemTime());
+ this.metadataFetchTimeoutMs = config.getLong(ConsumerConfig.METADATA_FETCH_TIMEOUT_CONFIG);
+ this.totalMemorySize = config.getLong(ConsumerConfig.TOTAL_BUFFER_MEMORY_CONFIG);
+ List<InetSocketAddress> addresses = ClientUtils.parseAndValidateAddresses(config.getList(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG));
+ config.logUnused();
+ log.debug("Kafka consumer started");
+ }
+
+ /**
+ * Incrementally subscribes to the given list of topics and uses the consumer's group management functionality
+ * <p>
+ * As part of group management, the consumer will keep track of the list of consumers that belong to a particular group and
+ * will trigger a rebalance operation if one of the following events trigger -
+ * <ul>
+ * <li> Number of partitions change for any of the subscribed list of topics
+ * <li> Topic is created or deleted
+ * <li> An existing member of the consumer group dies
+ * <li> A new member is added to an existing consumer group via the join API
+ * </ul>
+ * @param topics A variable list of topics that the consumer wants to subscribe to
+ */
+ @Override
+ public void subscribe(String... topics) {
+ if(subscribedPartitions.size() > 0)
+ throw new IllegalStateException("Subcription to topics and partitions is mutually exclusive");
+ for(String topic:topics)
+ subscribedTopics.add(topic);
+ // TODO: trigger a rebalance operation
+ }
+
+ /**
+ * Incrementally subscribes to a specific topic partition and does not use the consumer's group management functionality. As such,
+ * there will be no rebalance operation triggered when group membership or cluster and topic metadata change.
+ * <p>
+ * @param partitions Partitions to incrementally subscribe to
+ */
+ @Override
+ public void subscribe(TopicPartition... partitions) {
+ if(subscribedTopics.size() > 0)
+ throw new IllegalStateException("Subcription to topics and partitions is mutually exclusive");
+ for(TopicPartition partition:partitions)
+ subscribedPartitions.add(partition);
+ }
+
+ /**
+ * Unsubscribe from the specific topics. This will trigger a rebalance operation and messages for this topic will not be returned
+ * from the next {@link #poll(long) poll()} onwards
+ * @param topics Topics to unsubscribe from
+ */
+ public void unsubscribe(String... topics) {
+ // throw an exception if the topic was never subscribed to
+ for(String topic:topics) {
+ if(!subscribedTopics.contains(topic))
+ throw new IllegalStateException("Topic " + topic + " was never subscribed to. subscribe(" + topic + ") should be called prior" +
+ " to unsubscribe(" + topic + ")");
+ subscribedTopics.remove(topic);
+ }
+ // TODO trigger a rebalance operation
+ }
+
+ /**
+ * Unsubscribe from the specific topic partitions. Messages for these partitions will not be returned from the next
+ * {@link #poll(long) poll()} onwards
+ * @param partitions Partitions to unsubscribe from
+ */
+ public void unsubscribe(TopicPartition... partitions) {
+ // throw an exception if the partition was never subscribed to
+ for(TopicPartition partition:partitions) {
+ if(!subscribedPartitions.contains(partition))
+ throw new IllegalStateException("Partition " + partition + " was never subscribed to. subscribe(new TopicPartition(" +
+ partition.topic() + "," + partition.partition() + ") should be called prior" +
+ " to unsubscribe(new TopicPartition(" + partition.topic() + "," + partition.partition() + ")");
+ subscribedPartitions.remove(partition);
+ }
+ // trigger a rebalance operation
+ }
+
+ /**
+ * Fetches data for the topics or partitions specified using one of the subscribe APIs. It is an error to not have subscribed to
+ * any topics or partitions before polling for data.
+ * <p>
+ * The offset used for fetching the data is governed by whether or not {@link #seek(Map) seek(offsets)}
+ * is used. If {@link #seek(Map) seek(offsets)} is used, it will use the specified offsets on startup and
+ * on every rebalance, to consume data from that offset sequentially on every poll. If not, it will use the last checkpointed offset
+ * using {@link #commit(Map, boolean) commit(offsets, sync)}
+ * for the subscribed list of partitions.
+ * @param timeout The time, in milliseconds, spent waiting in poll if data is not available. If 0, waits indefinitely. Must not be negative
+ * @return map of topic to records since the last fetch for the subscribed list of topics and partitions
+ */
+ @Override
+ public Map<String, ConsumerRecords> poll(long timeout) {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+ /**
+ * Commits the specified offsets for the specified list of topics and partitions to Kafka.
+ * <p>
+ * This commits offsets only to Kafka. The offsets committed using this API will be used on the first fetch after every rebalance
+ * and also on startup. As such, if you need to store offsets in anything other than Kafka, this API should not be used.
+ * @param offsets The list of offsets per partition that should be committed to Kafka.
+ * @param sync If true, commit will block until the consumer receives an acknowledgment
+ * @return An {@link OffsetMetadata} object that contains the partition, offset and a corresponding error code. Returns null
+ * if the sync flag is set to false.
+ */
+ @Override
+ public OffsetMetadata commit(Map<TopicPartition, Long> offsets, boolean sync) {
+ throw new UnsupportedOperationException();
+ }
+
+ /**
+ * Commits offsets returned on the last {@link #poll(long) poll()} for the subscribed list of topics and
+ * partitions.
+ * <p>
+ * This commits offsets only to Kafka. The offsets committed using this API will be used on the first fetch after every rebalance
+ * and also on startup. As such, if you need to store offsets in anything other than Kafka, this API should not be used.
+ * @param sync If true, commit will block until the consumer receives an acknowledgment
+ * @return An {@link OffsetMetadata} object that contains the partition, offset and a corresponding error code. Returns null
+ * if the sync flag is set to false.
+ */
+ @Override
+ public OffsetMetadata commit(boolean sync) {
+ throw new UnsupportedOperationException();
+ }
+
+ /**
+ * Overrides the fetch offsets that the consumer will use on the next {@link #poll(long) poll(timeout)}. If this API is invoked
+ * for the same partition more than once, the latest offset will be used on the next poll(). Note that you may lose data if this API is
+ * arbitrarily used in the middle of consumption, to reset the fetch offsets
+ */
+ @Override
+ public void seek(Map<TopicPartition, Long> offsets) {
+ }
+
+ /**
+ * Returns the fetch position of the <i>next message</i> for the specified topic partition to be used on the next {@link #poll(long) poll()}
+ * @param partitions Partitions for which the fetch position will be returned
+ * @return The position from which data will be fetched for the specified partition on the next {@link #poll(long) poll()}
+ */
+ public Map<TopicPartition, Long> position(Collection<TopicPartition> partitions) {
+ return null;
+ }
+
+ /**
+ * Fetches the last committed offsets of partitions that the consumer currently consumes. This API is only relevant if Kafka based offset
+ * storage is used. This API can be used in conjunction with {@link #seek(Map) seek(offsets)} to rewind consumption of data.
+ * @param partitions The list of partitions to return the last committed offset for
+ * @return The list of offsets committed on the last {@link #commit(boolean) commit(sync)}
+ */
+ @Override
+ public Map<TopicPartition, Long> committed(Collection<TopicPartition> partitions) {
+ // TODO Auto-generated method stub
+ throw new UnsupportedOperationException();
+ }
+
+ /**
+ * Fetches offsets before a certain timestamp. Note that the offsets returned are approximately computed and do not correspond to the exact
+ * message at the given timestamp. As such, if the consumer is rewound to offsets returned by this API, there may be duplicate messages
+ * returned by the consumer.
+ * @param partitions The list of partitions for which the offsets are returned
+ * @param timestamp The unix timestamp. Value -1 indicates earliest available timestamp. Value -2 indicates latest available timestamp.
+ * @return The offsets per partition before the specified timestamp.
+ */
+ public Map<TopicPartition, Long> offsetsBeforeTime(long timestamp, Collection<TopicPartition> partitions) {
+ return null;
+ }
+
+ @Override
+ public Map<String, ? extends Metric> metrics() {
+ return Collections.unmodifiableMap(this.metrics.metrics());
+ }
+
+ @Override
+ public void close() {
+ log.trace("Closing the Kafka consumer.");
+ subscribedTopics.clear();
+ subscribedPartitions.clear();
+ this.metrics.close();
+ log.debug("The Kafka consumer has closed.");
+ }
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/c24740c7/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java
new file mode 100644
index 0000000..c3aad3b
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java
@@ -0,0 +1,192 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+*/
+package org.apache.kafka.clients.consumer;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.ObjectOutputStream;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import org.apache.kafka.common.Metric;
+import org.apache.kafka.common.TopicPartition;
+
+/**
+ * A mock of the {@link Consumer} interface you can use for testing code that uses Kafka.
+ * This class is <i> not threadsafe </i>
+ * <p>
+ * The consumer runs in the user thread and multiplexes I/O over TCP connections to each of the brokers it
+ * needs to communicate with. Failure to close the consumer after use will leak these resources.
+ */
+public class MockConsumer implements Consumer {
+
+ private final Set<TopicPartition> subscribedPartitions;
+ private final Set<String> subscribedTopics;
+ private final Map<TopicPartition, Long> committedOffsets;
+ private final Map<TopicPartition, Long> consumedOffsets;
+
+ public MockConsumer() {
+ subscribedPartitions = new HashSet<TopicPartition>();
+ subscribedTopics = new HashSet<String>();
+ committedOffsets = new HashMap<TopicPartition, Long>();
+ consumedOffsets = new HashMap<TopicPartition, Long>();
+ }
+
+ @Override
+ public void subscribe(String... topics) {
+ if(subscribedPartitions.size() > 0)
+ throw new IllegalStateException("Subcription to topics and partitions is mutually exclusive");
+ for(String topic : topics) {
+ subscribedTopics.add(topic);
+ }
+ }
+
+ @Override
+ public void subscribe(TopicPartition... partitions) {
+ if(subscribedTopics.size() > 0)
+ throw new IllegalStateException("Subcription to topics and partitions is mutually exclusive");
+ for(TopicPartition partition : partitions) {
+ subscribedPartitions.add(partition);
+ consumedOffsets.put(partition, 0L);
+ }
+ }
+
+ public void unsubscribe(String... topics) {
+ // throw an exception if the topic was never subscribed to
+ for(String topic:topics) {
+ if(!subscribedTopics.contains(topic))
+ throw new IllegalStateException("Topic " + topic + " was never subscribed to. subscribe(" + topic + ") should be called prior" +
+ " to unsubscribe(" + topic + ")");
+ subscribedTopics.remove(topic);
+ }
+ }
+
+ public void unsubscribe(TopicPartition... partitions) {
+ // throw an exception if the partition was never subscribed to
+ for(TopicPartition partition:partitions) {
+ if(!subscribedPartitions.contains(partition))
+ throw new IllegalStateException("Partition " + partition + " was never subscribed to. subscribe(new TopicPartition(" +
+ partition.topic() + "," + partition.partition() + ") should be called prior" +
+ " to unsubscribe(new TopicPartition(" + partition.topic() + "," + partition.partition() + ")");
+ subscribedPartitions.remove(partition);
+ committedOffsets.remove(partition);
+ consumedOffsets.remove(partition);
+ }
+ }
+
+ @Override
+ public Map<String, ConsumerRecords> poll(long timeout) {
+ // hand out one dummy record, 1 per topic
+ Map<String, List<ConsumerRecord>> records = new HashMap<String, List<ConsumerRecord>>();
+ Map<String, ConsumerRecords> recordMetadata = new HashMap<String, ConsumerRecords>();
+ for(TopicPartition partition : subscribedPartitions) {
+ // get the last consumed offset
+ long messageSequence = consumedOffsets.get(partition);
+ ByteArrayOutputStream byteStream = new ByteArrayOutputStream();
+ ObjectOutputStream outputStream;
+ try {
+ outputStream = new ObjectOutputStream(byteStream);
+ outputStream.writeLong(messageSequence++);
+ outputStream.close();
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+ List<ConsumerRecord> recordsForTopic = records.get(partition.topic());
+ if(recordsForTopic == null) {
+ recordsForTopic = new ArrayList<ConsumerRecord>();
+ records.put(partition.topic(), recordsForTopic);
+ }
+ recordsForTopic.add(new ConsumerRecord(partition.topic(), partition.partition(), null, byteStream.toByteArray(), messageSequence));
+ consumedOffsets.put(partition, messageSequence);
+ }
+ for(Entry<String, List<ConsumerRecord>> recordsPerTopic : records.entrySet()) {
+ Map<Integer, List<ConsumerRecord>> recordsPerPartition = new HashMap<Integer, List<ConsumerRecord>>();
+ for(ConsumerRecord record : recordsPerTopic.getValue()) {
+ List<ConsumerRecord> recordsForThisPartition = recordsPerPartition.get(record.partition());
+ if(recordsForThisPartition == null) {
+ recordsForThisPartition = new ArrayList<ConsumerRecord>();
+ recordsPerPartition.put(record.partition(), recordsForThisPartition);
+ }
+ recordsForThisPartition.add(record);
+ }
+ recordMetadata.put(recordsPerTopic.getKey(), new ConsumerRecords(recordsPerTopic.getKey(), recordsPerPartition));
+ }
+ return recordMetadata;
+ }
+
+ @Override
+ public OffsetMetadata commit(Map<TopicPartition, Long> offsets, boolean sync) {
+ if(!sync)
+ return null;
+ for(Entry<TopicPartition, Long> partitionOffset : offsets.entrySet()) {
+ committedOffsets.put(partitionOffset.getKey(), partitionOffset.getValue());
+ }
+ return new OffsetMetadata(committedOffsets, null);
+ }
+
+ @Override
+ public OffsetMetadata commit(boolean sync) {
+ if(!sync)
+ return null;
+ return commit(consumedOffsets, sync);
+ }
+
+ @Override
+ public void seek(Map<TopicPartition, Long> offsets) {
+ // change the fetch offsets
+ for(Entry<TopicPartition, Long> partitionOffset : offsets.entrySet()) {
+ consumedOffsets.put(partitionOffset.getKey(), partitionOffset.getValue());
+ }
+ }
+
+ @Override
+ public Map<TopicPartition, Long> committed(Collection<TopicPartition> partitions) {
+ Map<TopicPartition, Long> offsets = new HashMap<TopicPartition, Long>();
+ for(TopicPartition partition : partitions) {
+ offsets.put(new TopicPartition(partition.topic(), partition.partition()), committedOffsets.get(partition));
+ }
+ return offsets;
+ }
+
+ @Override
+ public Map<TopicPartition, Long> position(Collection<TopicPartition> partitions) {
+ Map<TopicPartition, Long> positions = new HashMap<TopicPartition, Long>();
+ for(TopicPartition partition : partitions) {
+ positions.put(partition, consumedOffsets.get(partition));
+ }
+ return positions;
+ }
+
+ @Override
+ public Map<TopicPartition, Long> offsetsBeforeTime(long timestamp,
+ Collection<TopicPartition> partitions) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public Map<String, ? extends Metric> metrics() {
+ return null;
+ }
+
+ @Override
+ public void close() {
+ // unsubscribe from all partitions
+ TopicPartition[] allPartitions = new TopicPartition[subscribedPartitions.size()];
+ unsubscribe(subscribedPartitions.toArray(allPartitions));
+ }
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/c24740c7/clients/src/main/java/org/apache/kafka/clients/consumer/OffsetMetadata.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/OffsetMetadata.java b/clients/src/main/java/org/apache/kafka/clients/consumer/OffsetMetadata.java
new file mode 100644
index 0000000..ea423ad
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/OffsetMetadata.java
@@ -0,0 +1,59 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer;
+
+import java.util.Map;
+
+import org.apache.kafka.common.TopicPartition;
+
+/**
+ * The metadata for an offset commit that has been acknowledged by the server
+ */
+public final class OffsetMetadata {
+
+ private final Map<TopicPartition, Long> offsets;
+ private final Map<TopicPartition, RuntimeException> errors;
+
+ public OffsetMetadata(Map<TopicPartition, Long> offsets, Map<TopicPartition, RuntimeException> errors) {
+ super();
+ this.offsets = offsets;
+ this.errors = errors;
+ }
+
+ public OffsetMetadata(Map<TopicPartition, Long> offsets) {
+ this(offsets, null);
+ }
+
+ /**
+ * The offset of the record in the topic/partition.
+ */
+ public long offset(TopicPartition partition) {
+ if(this.errors != null)
+ throw errors.get(partition);
+ return offsets.get(partition);
+ }
+
+ /**
+ * @return The exception corresponding to the error code returned by the server
+ */
+ public Exception error(TopicPartition partition) {
+ if(errors != null)
+ return errors.get(partition);
+ else
+ return null;
+ }
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/c24740c7/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
index 90cacbd..f1def50 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
@@ -13,7 +13,6 @@
package org.apache.kafka.clients.producer;
import java.net.InetSocketAddress;
-import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
@@ -44,6 +43,7 @@ import org.apache.kafka.common.network.Selector;
import org.apache.kafka.common.record.CompressionType;
import org.apache.kafka.common.record.Record;
import org.apache.kafka.common.record.Records;
+import org.apache.kafka.common.utils.ClientUtils;
import org.apache.kafka.common.utils.KafkaThread;
import org.apache.kafka.common.utils.SystemTime;
import org.apache.kafka.common.utils.Time;
@@ -118,7 +118,7 @@ public class KafkaProducer implements Producer {
config.getBoolean(ProducerConfig.BLOCK_ON_BUFFER_FULL_CONFIG),
metrics,
time);
- List<InetSocketAddress> addresses = parseAndValidateAddresses(config.getList(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG));
+ List<InetSocketAddress> addresses = ClientUtils.parseAndValidateAddresses(config.getList(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG));
this.metadata.update(Cluster.bootstrap(addresses), time.milliseconds());
this.sender = new Sender(new Selector(this.metrics, time),
this.metadata,
@@ -150,28 +150,6 @@ public class KafkaProducer implements Producer {
}
}
- private static List<InetSocketAddress> parseAndValidateAddresses(List<String> urls) {
- List<InetSocketAddress> addresses = new ArrayList<InetSocketAddress>();
- for (String url : urls) {
- if (url != null && url.length() > 0) {
- String[] pieces = url.split(":");
- if (pieces.length != 2)
- throw new ConfigException("Invalid url in " + ProducerConfig.BOOTSTRAP_SERVERS_CONFIG + ": " + url);
- try {
- InetSocketAddress address = new InetSocketAddress(pieces[0], Integer.parseInt(pieces[1]));
- if (address.isUnresolved())
- throw new ConfigException("DNS resolution failed for metadata bootstrap url: " + url);
- addresses.add(address);
- } catch (NumberFormatException e) {
- throw new ConfigException("Invalid port in metadata.broker.list: " + url);
- }
- }
- }
- if (addresses.size() < 1)
- throw new ConfigException("No bootstrap urls given in metadata.broker.list.");
- return addresses;
- }
-
/**
* Asynchronously send a record to a topic. Equivalent to {@link #send(ProducerRecord, Callback) send(record, null)}
*/
http://git-wip-us.apache.org/repos/asf/kafka/blob/c24740c7/clients/src/main/java/org/apache/kafka/common/utils/ClientUtils.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/utils/ClientUtils.java b/clients/src/main/java/org/apache/kafka/common/utils/ClientUtils.java
new file mode 100644
index 0000000..cb33e34
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/utils/ClientUtils.java
@@ -0,0 +1,44 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package org.apache.kafka.common.utils;
+
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.common.config.ConfigException;
+
+public class ClientUtils {
+ public static List<InetSocketAddress> parseAndValidateAddresses(List<String> urls) {
+ List<InetSocketAddress> addresses = new ArrayList<InetSocketAddress>();
+ for (String url : urls) {
+ if (url != null && url.length() > 0) {
+ String[] pieces = url.split(":");
+ if (pieces.length != 2)
+ throw new ConfigException("Invalid url in " + ProducerConfig.BOOTSTRAP_SERVERS_CONFIG + ": " + url);
+ try {
+ InetSocketAddress address = new InetSocketAddress(pieces[0], Integer.parseInt(pieces[1]));
+ if (address.isUnresolved())
+ throw new ConfigException("DNS resolution failed for metadata bootstrap url: " + url);
+ addresses.add(address);
+ } catch (NumberFormatException e) {
+ throw new ConfigException("Invalid port in metadata.broker.list: " + url);
+ }
+ }
+ }
+ if (addresses.size() < 1)
+ throw new ConfigException("No bootstrap urls given in metadata.broker.list.");
+ return addresses;
+ }
+}
\ No newline at end of file