You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ew...@apache.org on 2016/02/24 00:46:50 UTC
kafka git commit: KAFKA-3007: implement max.poll.records (KIP-41)
Repository: kafka
Updated Branches:
refs/heads/trunk c00a036e0 -> 73ecd7a17
KAFKA-3007: implement max.poll.records (KIP-41)
Author: Jason Gustafson <ja...@confluent.io>
Reviewers: Grant Henke <gr...@gmail.com>, Ewen Cheslack-Postava <ew...@confluent.io>
Closes #931 from hachikuji/KAFKA-3007
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/73ecd7a1
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/73ecd7a1
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/73ecd7a1
Branch: refs/heads/trunk
Commit: 73ecd7a1797a023039fea365a08dbdba4171d10b
Parents: c00a036
Author: Jason Gustafson <ja...@confluent.io>
Authored: Tue Feb 23 15:46:32 2016 -0800
Committer: Ewen Cheslack-Postava <me...@ewencp.org>
Committed: Tue Feb 23 15:46:32 2016 -0800
----------------------------------------------------------------------
.../kafka/clients/consumer/ConsumerConfig.java | 10 ++
.../kafka/clients/consumer/KafkaConsumer.java | 5 +-
.../clients/consumer/internals/Fetcher.java | 129 +++++++++++++------
.../clients/consumer/internals/FetcherTest.java | 90 ++++++++++---
.../kafka/api/BaseConsumerTest.scala | 13 +-
.../kafka/api/PlaintextConsumerTest.scala | 15 +++
6 files changed, 205 insertions(+), 57 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/73ecd7a1/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java
index 3132cae..356f0aa 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java
@@ -168,6 +168,10 @@ public class ConsumerConfig extends AbstractConfig {
+ "Implementing the <code>ConsumerInterceptor</code> interface allows you to intercept (and possibly mutate) records "
+ "received by the consumer. By default, there are no interceptors.";
+ /** <code>max.poll.records</code> */
+ public static final String MAX_POLL_RECORDS_CONFIG = "max.poll.records";
+ private static final String MAX_POLL_RECORDS_DOC = "The maximum number of records returned in a single call to poll().";
+
static {
CONFIG = new ConfigDef().define(BOOTSTRAP_SERVERS_CONFIG,
Type.LIST,
@@ -306,6 +310,12 @@ public class ConsumerConfig extends AbstractConfig {
null,
Importance.LOW,
INTERCEPTOR_CLASSES_DOC)
+ .define(MAX_POLL_RECORDS_CONFIG,
+ Type.INT,
+ Integer.MAX_VALUE,
+ atLeast(1),
+ Importance.MEDIUM,
+ MAX_POLL_RECORDS_DOC)
// security support
.define(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG,
http://git-wip-us.apache.org/repos/asf/kafka/blob/73ecd7a1/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
index faa9a78..f907922 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
@@ -632,6 +632,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
config.getInt(ConsumerConfig.FETCH_MIN_BYTES_CONFIG),
config.getInt(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG),
config.getInt(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG),
+ config.getInt(ConsumerConfig.MAX_POLL_RECORDS_CONFIG),
config.getBoolean(ConsumerConfig.CHECK_CRCS_CONFIG),
this.keyDeserializer,
this.valueDeserializer,
@@ -867,7 +868,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
// task execution since the consumed positions has already been updated and we
// must return these records to users to process before being interrupted or
// auto-committing offsets
- fetcher.initFetches(metadata.fetch());
+ fetcher.sendFetches(metadata.fetch());
client.quickPoll();
return this.interceptors == null
? new ConsumerRecords<>(records) : this.interceptors.onConsume(new ConsumerRecords<>(records));
@@ -912,7 +913,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
return records;
}
- fetcher.initFetches(cluster);
+ fetcher.sendFetches(cluster);
client.poll(timeout);
return fetcher.fetchedRecords();
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/73ecd7a1/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
index 427664a..5d92a76 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
@@ -58,6 +58,7 @@ import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
+import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
@@ -76,6 +77,7 @@ public class Fetcher<K, V> {
private final int maxWaitMs;
private final int fetchSize;
private final long retryBackoffMs;
+ private final int maxPollRecords;
private final boolean checkCrcs;
private final Metadata metadata;
private final FetchManagerMetrics sensors;
@@ -92,6 +94,7 @@ public class Fetcher<K, V> {
int minBytes,
int maxWaitMs,
int fetchSize,
+ int maxPollRecords,
boolean checkCrcs,
Deserializer<K> keyDeserializer,
Deserializer<V> valueDeserializer,
@@ -109,6 +112,7 @@ public class Fetcher<K, V> {
this.minBytes = minBytes;
this.maxWaitMs = maxWaitMs;
this.fetchSize = fetchSize;
+ this.maxPollRecords = maxPollRecords;
this.checkCrcs = checkCrcs;
this.keyDeserializer = keyDeserializer;
@@ -128,7 +132,7 @@ public class Fetcher<K, V> {
*
* @param cluster The current cluster metadata
*/
- public void initFetches(Cluster cluster) {
+ public void sendFetches(Cluster cluster) {
for (Map.Entry<Node, FetchRequest> fetchEntry: createFetchRequests(cluster).entrySet()) {
final FetchRequest fetch = fetchEntry.getValue();
client.send(fetchEntry.getKey(), ApiKeys.FETCH, fetch)
@@ -393,44 +397,57 @@ public class Fetcher<K, V> {
throwIfUnauthorizedTopics();
throwIfRecordTooLarge();
- for (PartitionRecords<K, V> part : this.records) {
- if (!subscriptions.isAssigned(part.partition)) {
- // this can happen when a rebalance happened before fetched records are returned to the consumer's poll call
- log.debug("Not returning fetched records for partition {} since it is no longer assigned", part.partition);
- continue;
- }
-
- // note that the consumed position should always be available
- // as long as the partition is still assigned
- long position = subscriptions.position(part.partition);
- if (!subscriptions.isFetchable(part.partition)) {
- // this can happen when a partition is paused before fetched records are returned to the consumer's poll call
- log.debug("Not returning fetched records for assigned partition {} since it is no longer fetchable", part.partition);
- } else if (part.fetchOffset == position) {
- long nextOffset = part.records.get(part.records.size() - 1).offset() + 1;
-
- log.trace("Returning fetched records at offset {} for assigned partition {} and update " +
- "position to {}", position, part.partition, nextOffset);
-
- List<ConsumerRecord<K, V>> records = drained.get(part.partition);
- if (records == null) {
- records = part.records;
- drained.put(part.partition, records);
- } else {
- records.addAll(part.records);
- }
+ int maxRecords = maxPollRecords;
+ Iterator<PartitionRecords<K, V>> iterator = records.iterator();
+ while (iterator.hasNext() && maxRecords > 0) {
+ PartitionRecords<K, V> part = iterator.next();
+ maxRecords -= append(drained, part, maxRecords);
+ if (part.isConsumed())
+ iterator.remove();
+ }
+ return drained;
+ }
+ }
- subscriptions.position(part.partition, nextOffset);
+ private int append(Map<TopicPartition, List<ConsumerRecord<K, V>>> drained,
+ PartitionRecords<K, V> part,
+ int maxRecords) {
+ if (!subscriptions.isAssigned(part.partition)) {
+ // this can happen when a rebalance happened before fetched records are returned to the consumer's poll call
+ log.debug("Not returning fetched records for partition {} since it is no longer assigned", part.partition);
+ } else {
+ // note that the consumed position should always be available as long as the partition is still assigned
+ long position = subscriptions.position(part.partition);
+ if (!subscriptions.isFetchable(part.partition)) {
+ // this can happen when a partition is paused before fetched records are returned to the consumer's poll call
+ log.debug("Not returning fetched records for assigned partition {} since it is no longer fetchable", part.partition);
+ } else if (part.fetchOffset == position) {
+ List<ConsumerRecord<K, V>> partRecords = part.take(maxRecords);
+ long nextOffset = partRecords.get(partRecords.size() - 1).offset() + 1;
+
+ log.trace("Returning fetched records at offset {} for assigned partition {} and update " +
+ "position to {}", position, part.partition, nextOffset);
+
+ List<ConsumerRecord<K, V>> records = drained.get(part.partition);
+ if (records == null) {
+ records = partRecords;
+ drained.put(part.partition, records);
} else {
- // these records aren't next in line based on the last consumed position, ignore them
- // they must be from an obsolete request
- log.debug("Ignoring fetched records for {} at offset {} since the current position is {}",
- part.partition, part.fetchOffset, position);
+ records.addAll(partRecords);
}
+
+ subscriptions.position(part.partition, nextOffset);
+ return partRecords.size();
+ } else {
+ // these records aren't next in line based on the last consumed position, ignore them
+ // they must be from an obsolete request
+ log.debug("Ignoring fetched records for {} at offset {} since the current position is {}",
+ part.partition, part.fetchOffset, position);
}
- this.records.clear();
- return drained;
}
+
+ part.discard();
+ return 0;
}
/**
@@ -441,7 +458,7 @@ public class Fetcher<K, V> {
* @return A response which can be polled to obtain the corresponding offset.
*/
private RequestFuture<Long> sendListOffsetRequest(final TopicPartition topicPartition, long timestamp) {
- Map<TopicPartition, ListOffsetRequest.PartitionData> partitions = new HashMap<TopicPartition, ListOffsetRequest.PartitionData>(1);
+ Map<TopicPartition, ListOffsetRequest.PartitionData> partitions = new HashMap<>(1);
partitions.put(topicPartition, new ListOffsetRequest.PartitionData(timestamp, 1));
PartitionInfo info = metadata.fetch().partition(topicPartition);
if (info == null) {
@@ -494,6 +511,15 @@ public class Fetcher<K, V> {
}
}
+ private Set<TopicPartition> fetchablePartitions() {
+ Set<TopicPartition> fetchable = subscriptions.fetchablePartitions();
+ if (records.isEmpty())
+ return fetchable;
+ for (PartitionRecords<K, V> partitionRecords : records)
+ fetchable.remove(partitionRecords.partition);
+ return fetchable;
+ }
+
/**
* Create fetch requests for all nodes for which we have assigned partitions
* that have no existing requests in flight.
@@ -501,7 +527,7 @@ public class Fetcher<K, V> {
private Map<Node, FetchRequest> createFetchRequests(Cluster cluster) {
// create the fetch info
Map<Node, Map<TopicPartition, FetchRequest.PartitionData>> fetchable = new HashMap<>();
- for (TopicPartition partition : subscriptions.fetchablePartitions()) {
+ for (TopicPartition partition : fetchablePartitions()) {
Node node = cluster.leaderFor(partition);
if (node == null) {
metadata.requestUpdate();
@@ -640,6 +666,37 @@ public class Fetcher<K, V> {
this.partition = partition;
this.records = records;
}
+
+ private boolean isConsumed() {
+ return records == null || records.isEmpty();
+ }
+
+ private void discard() {
+ this.records = null;
+ }
+
+ private List<ConsumerRecord<K, V>> take(int n) {
+ if (records == null)
+ return Collections.emptyList();
+
+ if (n >= records.size()) {
+ List<ConsumerRecord<K, V>> res = this.records;
+ this.records = null;
+ return res;
+ }
+
+ List<ConsumerRecord<K, V>> res = new ArrayList<>(n);
+ Iterator<ConsumerRecord<K, V>> iterator = records.iterator();
+ for (int i = 0; i < n; i++) {
+ res.add(iterator.next());
+ iterator.remove();
+ }
+
+ if (iterator.hasNext())
+ this.fetchOffset = iterator.next().offset();
+
+ return res;
+ }
}
private class FetchManagerMetrics {
http://git-wip-us.apache.org/repos/asf/kafka/blob/73ecd7a1/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
index 97c3d85..823d04e 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
@@ -40,6 +40,7 @@ import org.apache.kafka.common.protocol.types.Struct;
import org.apache.kafka.common.record.CompressionType;
import org.apache.kafka.common.record.MemoryRecords;
import org.apache.kafka.common.record.Record;
+import org.apache.kafka.common.requests.FetchRequest;
import org.apache.kafka.common.requests.FetchResponse;
import org.apache.kafka.common.requests.ListOffsetRequest;
import org.apache.kafka.common.requests.ListOffsetResponse;
@@ -87,6 +88,7 @@ public class FetcherTest {
private ConsumerNetworkClient consumerClient = new ConsumerNetworkClient(client, metadata, time, 100);
private MemoryRecords records = MemoryRecords.emptyRecords(ByteBuffer.allocate(1024), CompressionType.NONE);
+ private MemoryRecords nextRecords = MemoryRecords.emptyRecords(ByteBuffer.allocate(1024), CompressionType.NONE);
private Fetcher<byte[], byte[]> fetcher = createFetcher(subscriptions, metrics);
private Metrics fetcherMetrics = new Metrics(time);
private Fetcher<byte[], byte[]> fetcherNoAutoReset = createFetcher(subscriptionsNoAutoReset, fetcherMetrics);
@@ -100,6 +102,10 @@ public class FetcherTest {
records.append(2L, 0L, "key".getBytes(), "value-2".getBytes());
records.append(3L, 0L, "key".getBytes(), "value-3".getBytes());
records.close();
+
+ nextRecords.append(4L, 0L, "key".getBytes(), "value-4".getBytes());
+ nextRecords.append(5L, 0L, "key".getBytes(), "value-5".getBytes());
+ nextRecords.close();
}
@After
@@ -115,7 +121,7 @@ public class FetcherTest {
subscriptions.seek(tp, 0);
// normal fetch
- fetcher.initFetches(cluster);
+ fetcher.sendFetches(cluster);
client.prepareResponse(fetchResponse(this.records.buffer(), Errors.NONE.code(), 100L, 0));
consumerClient.poll(0);
records = fetcher.fetchedRecords().get(tp);
@@ -128,6 +134,52 @@ public class FetcherTest {
}
}
+ private MockClient.RequestMatcher matchesOffset(final TopicPartition tp, final long offset) {
+ return new MockClient.RequestMatcher() {
+ @Override
+ public boolean matches(ClientRequest request) {
+ FetchRequest fetch = new FetchRequest(request.request().body());
+ return fetch.fetchData().containsKey(tp) &&
+ fetch.fetchData().get(tp).offset == offset;
+ }
+ };
+ }
+
+ @Test
+ public void testFetchMaxPollRecords() {
+ Fetcher<byte[], byte[]> fetcher = createFetcher(2, subscriptions, new Metrics(time));
+
+ List<ConsumerRecord<byte[], byte[]>> records;
+ subscriptions.assignFromUser(Arrays.asList(tp));
+ subscriptions.seek(tp, 1);
+
+ client.prepareResponse(matchesOffset(tp, 1), fetchResponse(this.records.buffer(), Errors.NONE.code(), 100L, 0));
+ client.prepareResponse(matchesOffset(tp, 4), fetchResponse(this.nextRecords.buffer(), Errors.NONE.code(), 100L, 0));
+
+ fetcher.sendFetches(cluster);
+ consumerClient.poll(0);
+ records = fetcher.fetchedRecords().get(tp);
+ assertEquals(2, records.size());
+ assertEquals(3L, (long) subscriptions.position(tp));
+ assertEquals(1, records.get(0).offset());
+ assertEquals(2, records.get(1).offset());
+
+ fetcher.sendFetches(cluster);
+ consumerClient.poll(0);
+ records = fetcher.fetchedRecords().get(tp);
+ assertEquals(1, records.size());
+ assertEquals(4L, (long) subscriptions.position(tp));
+ assertEquals(3, records.get(0).offset());
+
+ fetcher.sendFetches(cluster);
+ consumerClient.poll(0);
+ records = fetcher.fetchedRecords().get(tp);
+ assertEquals(2, records.size());
+ assertEquals(6L, (long) subscriptions.position(tp));
+ assertEquals(4, records.get(0).offset());
+ assertEquals(5, records.get(1).offset());
+ }
+
@Test
public void testFetchNonContinuousRecords() {
// if we are fetching from a compacted topic, there may be gaps in the returned records
@@ -144,7 +196,7 @@ public class FetcherTest {
subscriptions.seek(tp, 0);
// normal fetch
- fetcher.initFetches(cluster);
+ fetcher.sendFetches(cluster);
client.prepareResponse(fetchResponse(records.buffer(), Errors.NONE.code(), 100L, 0));
consumerClient.poll(0);
consumerRecords = fetcher.fetchedRecords().get(tp);
@@ -169,7 +221,7 @@ public class FetcherTest {
records.close();
// resize the limit of the buffer to pretend it is only fetch-size large
- fetcher.initFetches(cluster);
+ fetcher.sendFetches(cluster);
client.prepareResponse(fetchResponse((ByteBuffer) records.buffer().limit(this.fetchSize), Errors.NONE.code(), 100L, 0));
consumerClient.poll(0);
fetcher.fetchedRecords();
@@ -181,7 +233,7 @@ public class FetcherTest {
subscriptions.seek(tp, 0);
// resize the limit of the buffer to pretend it is only fetch-size large
- fetcher.initFetches(cluster);
+ fetcher.sendFetches(cluster);
client.prepareResponse(fetchResponse(this.records.buffer(), Errors.TOPIC_AUTHORIZATION_FAILED.code(), 100L, 0));
consumerClient.poll(0);
try {
@@ -198,7 +250,7 @@ public class FetcherTest {
subscriptions.assignFromSubscribed(Arrays.asList(tp));
subscriptions.seek(tp, 0);
- fetcher.initFetches(cluster);
+ fetcher.sendFetches(cluster);
// Now the rebalance happens and fetch positions are cleared
subscriptions.assignFromSubscribed(Arrays.asList(tp));
@@ -214,7 +266,7 @@ public class FetcherTest {
subscriptions.assignFromUser(Arrays.asList(tp));
subscriptions.seek(tp, 0);
- fetcher.initFetches(cluster);
+ fetcher.sendFetches(cluster);
subscriptions.pause(tp);
client.prepareResponse(fetchResponse(this.records.buffer(), Errors.NONE.code(), 100L, 0));
@@ -228,7 +280,7 @@ public class FetcherTest {
subscriptions.seek(tp, 0);
subscriptions.pause(tp);
- fetcher.initFetches(cluster);
+ fetcher.sendFetches(cluster);
assertTrue(client.requests().isEmpty());
}
@@ -237,7 +289,7 @@ public class FetcherTest {
subscriptions.assignFromUser(Arrays.asList(tp));
subscriptions.seek(tp, 0);
- fetcher.initFetches(cluster);
+ fetcher.sendFetches(cluster);
client.prepareResponse(fetchResponse(this.records.buffer(), Errors.NOT_LEADER_FOR_PARTITION.code(), 100L, 0));
consumerClient.poll(0);
assertEquals(0, fetcher.fetchedRecords().size());
@@ -249,7 +301,7 @@ public class FetcherTest {
subscriptions.assignFromUser(Arrays.asList(tp));
subscriptions.seek(tp, 0);
- fetcher.initFetches(cluster);
+ fetcher.sendFetches(cluster);
client.prepareResponse(fetchResponse(this.records.buffer(), Errors.UNKNOWN_TOPIC_OR_PARTITION.code(), 100L, 0));
consumerClient.poll(0);
assertEquals(0, fetcher.fetchedRecords().size());
@@ -261,7 +313,7 @@ public class FetcherTest {
subscriptions.assignFromUser(Arrays.asList(tp));
subscriptions.seek(tp, 0);
- fetcher.initFetches(cluster);
+ fetcher.sendFetches(cluster);
client.prepareResponse(fetchResponse(this.records.buffer(), Errors.OFFSET_OUT_OF_RANGE.code(), 100L, 0));
consumerClient.poll(0);
assertTrue(subscriptions.isOffsetResetNeeded(tp));
@@ -274,7 +326,7 @@ public class FetcherTest {
subscriptionsNoAutoReset.assignFromUser(Arrays.asList(tp));
subscriptionsNoAutoReset.seek(tp, 0);
- fetcherNoAutoReset.initFetches(cluster);
+ fetcherNoAutoReset.sendFetches(cluster);
client.prepareResponse(fetchResponse(this.records.buffer(), Errors.OFFSET_OUT_OF_RANGE.code(), 100L, 0));
consumerClient.poll(0);
assertFalse(subscriptionsNoAutoReset.isOffsetResetNeeded(tp));
@@ -287,7 +339,7 @@ public class FetcherTest {
subscriptionsNoAutoReset.assignFromUser(Arrays.asList(tp));
subscriptionsNoAutoReset.seek(tp, 0);
- fetcherNoAutoReset.initFetches(cluster);
+ fetcherNoAutoReset.sendFetches(cluster);
client.prepareResponse(fetchResponse(this.records.buffer(), Errors.OFFSET_OUT_OF_RANGE.code(), 100L, 0));
consumerClient.poll(0);
assertFalse(subscriptionsNoAutoReset.isOffsetResetNeeded(tp));
@@ -306,7 +358,7 @@ public class FetcherTest {
subscriptions.assignFromUser(Arrays.asList(tp));
subscriptions.seek(tp, 0);
- fetcher.initFetches(cluster);
+ fetcher.sendFetches(cluster);
client.prepareResponse(fetchResponse(this.records.buffer(), Errors.NONE.code(), 100L, 0), true);
consumerClient.poll(0);
assertEquals(0, fetcher.fetchedRecords().size());
@@ -474,7 +526,7 @@ public class FetcherTest {
}
this.records.close();
}
- fetcher.initFetches(cluster);
+ fetcher.sendFetches(cluster);
client.prepareResponse(fetchResponse(this.records.buffer(), Errors.NONE.code(), 100L, 100 * i));
consumerClient.poll(0);
records = fetcher.fetchedRecords().get(tp);
@@ -513,11 +565,14 @@ public class FetcherTest {
return response.toStruct();
}
- private Fetcher<byte[], byte[]> createFetcher(SubscriptionState subscriptions, Metrics metrics) {
+ private Fetcher<byte[], byte[]> createFetcher(int maxPollRecords,
+ SubscriptionState subscriptions,
+ Metrics metrics) {
return new Fetcher<>(consumerClient,
minBytes,
maxWaitMs,
fetchSize,
+ maxPollRecords,
true, // check crc
new ByteArrayDeserializer(),
new ByteArrayDeserializer(),
@@ -528,4 +583,9 @@ public class FetcherTest {
time,
retryBackoffMs);
}
+
+
+ private Fetcher<byte[], byte[]> createFetcher(SubscriptionState subscriptions, Metrics metrics) {
+ return createFetcher(Integer.MAX_VALUE, subscriptions, metrics);
+ }
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/73ecd7a1/core/src/test/scala/integration/kafka/api/BaseConsumerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/BaseConsumerTest.scala b/core/src/test/scala/integration/kafka/api/BaseConsumerTest.scala
index f15c005..a66fe35 100644
--- a/core/src/test/scala/integration/kafka/api/BaseConsumerTest.scala
+++ b/core/src/test/scala/integration/kafka/api/BaseConsumerTest.scala
@@ -289,8 +289,9 @@ abstract class BaseConsumerTest extends IntegrationTestHarness with Logging {
startingKeyAndValueIndex: Int = 0,
startingTimestamp: Long = 0L,
timestampType: TimestampType = TimestampType.CREATE_TIME,
- tp: TopicPartition = tp) {
- val records = consumeRecords(consumer, numRecords)
+ tp: TopicPartition = tp,
+ maxPollRecords: Int = Int.MaxValue) {
+ val records = consumeRecords(consumer, numRecords, maxPollRecords = maxPollRecords)
val now = System.currentTimeMillis()
for (i <- 0 until numRecords) {
val record = records.get(i)
@@ -311,12 +312,16 @@ abstract class BaseConsumerTest extends IntegrationTestHarness with Logging {
}
}
- protected def consumeRecords[K, V](consumer: Consumer[K, V], numRecords: Int): ArrayList[ConsumerRecord[K, V]] = {
+ protected def consumeRecords[K, V](consumer: Consumer[K, V],
+ numRecords: Int,
+ maxPollRecords: Int = Int.MaxValue): ArrayList[ConsumerRecord[K, V]] = {
val records = new ArrayList[ConsumerRecord[K, V]]
val maxIters = numRecords * 300
var iters = 0
while (records.size < numRecords) {
- for (record <- consumer.poll(50).asScala)
+ val polledRecords = consumer.poll(50).asScala
+ assertTrue(polledRecords.size <= maxPollRecords)
+ for (record <- polledRecords)
records.add(record)
if (iters > maxIters)
throw new IllegalStateException("Failed to consume the expected records after " + iters + " iterations.")
http://git-wip-us.apache.org/repos/asf/kafka/blob/73ecd7a1/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala b/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala
index 3d7cad3..f8ad633 100644
--- a/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala
+++ b/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala
@@ -40,6 +40,21 @@ import scala.collection.mutable.Buffer
class PlaintextConsumerTest extends BaseConsumerTest {
@Test
+ def testMaxPollRecords() {
+ val maxPollRecords = 2
+ val numRecords = 10000
+
+ sendRecords(numRecords)
+
+ this.consumerConfig.setProperty(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, maxPollRecords.toString)
+ val consumer0 = new KafkaConsumer(this.consumerConfig, new ByteArrayDeserializer(), new ByteArrayDeserializer())
+ consumer0.assign(List(tp).asJava)
+
+ consumeAndVerifyRecords(consumer0, numRecords = numRecords, startingOffset = 0,
+ maxPollRecords = maxPollRecords)
+ }
+
+ @Test
def testAutoCommitOnClose() {
this.consumerConfig.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true")
val consumer0 = new KafkaConsumer(this.consumerConfig, new ByteArrayDeserializer(), new ByteArrayDeserializer())