You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ew...@apache.org on 2016/02/24 00:46:50 UTC

kafka git commit: KAFKA-3007: implement max.poll.records (KIP-41)

Repository: kafka
Updated Branches:
  refs/heads/trunk c00a036e0 -> 73ecd7a17


KAFKA-3007: implement max.poll.records (KIP-41)

Author: Jason Gustafson <ja...@confluent.io>

Reviewers: Grant Henke <gr...@gmail.com>, Ewen Cheslack-Postava <ew...@confluent.io>

Closes #931 from hachikuji/KAFKA-3007


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/73ecd7a1
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/73ecd7a1
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/73ecd7a1

Branch: refs/heads/trunk
Commit: 73ecd7a1797a023039fea365a08dbdba4171d10b
Parents: c00a036
Author: Jason Gustafson <ja...@confluent.io>
Authored: Tue Feb 23 15:46:32 2016 -0800
Committer: Ewen Cheslack-Postava <me...@ewencp.org>
Committed: Tue Feb 23 15:46:32 2016 -0800

----------------------------------------------------------------------
 .../kafka/clients/consumer/ConsumerConfig.java  |  10 ++
 .../kafka/clients/consumer/KafkaConsumer.java   |   5 +-
 .../clients/consumer/internals/Fetcher.java     | 129 +++++++++++++------
 .../clients/consumer/internals/FetcherTest.java |  90 ++++++++++---
 .../kafka/api/BaseConsumerTest.scala            |  13 +-
 .../kafka/api/PlaintextConsumerTest.scala       |  15 +++
 6 files changed, 205 insertions(+), 57 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/73ecd7a1/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java
index 3132cae..356f0aa 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java
@@ -168,6 +168,10 @@ public class ConsumerConfig extends AbstractConfig {
                                                         + "Implementing the <code>ConsumerInterceptor</code> interface allows you to intercept (and possibly mutate) records "
                                                         + "received by the consumer. By default, there are no interceptors.";
 
+    /** <code>max.poll.records</code> */
+    public static final String MAX_POLL_RECORDS_CONFIG = "max.poll.records";
+    private static final String MAX_POLL_RECORDS_DOC = "The maximum number of records returned in a single call to poll().";
+
     static {
         CONFIG = new ConfigDef().define(BOOTSTRAP_SERVERS_CONFIG,
                                         Type.LIST,
@@ -306,6 +310,12 @@ public class ConsumerConfig extends AbstractConfig {
                                         null,
                                         Importance.LOW,
                                         INTERCEPTOR_CLASSES_DOC)
+                                .define(MAX_POLL_RECORDS_CONFIG,
+                                        Type.INT,
+                                        Integer.MAX_VALUE,
+                                        atLeast(1),
+                                        Importance.MEDIUM,
+                                        MAX_POLL_RECORDS_DOC)
 
                                 // security support
                                 .define(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG,

http://git-wip-us.apache.org/repos/asf/kafka/blob/73ecd7a1/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
index faa9a78..f907922 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
@@ -632,6 +632,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
                     config.getInt(ConsumerConfig.FETCH_MIN_BYTES_CONFIG),
                     config.getInt(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG),
                     config.getInt(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG),
+                    config.getInt(ConsumerConfig.MAX_POLL_RECORDS_CONFIG),
                     config.getBoolean(ConsumerConfig.CHECK_CRCS_CONFIG),
                     this.keyDeserializer,
                     this.valueDeserializer,
@@ -867,7 +868,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
                     // task execution since the consumed positions has already been updated and we
                     // must return these records to users to process before being interrupted or
                     // auto-committing offsets
-                    fetcher.initFetches(metadata.fetch());
+                    fetcher.sendFetches(metadata.fetch());
                     client.quickPoll();
                     return this.interceptors == null
                         ? new ConsumerRecords<>(records) : this.interceptors.onConsume(new ConsumerRecords<>(records));
@@ -912,7 +913,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
             return records;
         }
 
-        fetcher.initFetches(cluster);
+        fetcher.sendFetches(cluster);
         client.poll(timeout);
         return fetcher.fetchedRecords();
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/73ecd7a1/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
index 427664a..5d92a76 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
@@ -58,6 +58,7 @@ import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
+import java.util.Iterator;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
@@ -76,6 +77,7 @@ public class Fetcher<K, V> {
     private final int maxWaitMs;
     private final int fetchSize;
     private final long retryBackoffMs;
+    private final int maxPollRecords;
     private final boolean checkCrcs;
     private final Metadata metadata;
     private final FetchManagerMetrics sensors;
@@ -92,6 +94,7 @@ public class Fetcher<K, V> {
                    int minBytes,
                    int maxWaitMs,
                    int fetchSize,
+                   int maxPollRecords,
                    boolean checkCrcs,
                    Deserializer<K> keyDeserializer,
                    Deserializer<V> valueDeserializer,
@@ -109,6 +112,7 @@ public class Fetcher<K, V> {
         this.minBytes = minBytes;
         this.maxWaitMs = maxWaitMs;
         this.fetchSize = fetchSize;
+        this.maxPollRecords = maxPollRecords;
         this.checkCrcs = checkCrcs;
 
         this.keyDeserializer = keyDeserializer;
@@ -128,7 +132,7 @@ public class Fetcher<K, V> {
      *
      * @param cluster The current cluster metadata
      */
-    public void initFetches(Cluster cluster) {
+    public void sendFetches(Cluster cluster) {
         for (Map.Entry<Node, FetchRequest> fetchEntry: createFetchRequests(cluster).entrySet()) {
             final FetchRequest fetch = fetchEntry.getValue();
             client.send(fetchEntry.getKey(), ApiKeys.FETCH, fetch)
@@ -393,44 +397,57 @@ public class Fetcher<K, V> {
             throwIfUnauthorizedTopics();
             throwIfRecordTooLarge();
 
-            for (PartitionRecords<K, V> part : this.records) {
-                if (!subscriptions.isAssigned(part.partition)) {
-                    // this can happen when a rebalance happened before fetched records are returned to the consumer's poll call
-                    log.debug("Not returning fetched records for partition {} since it is no longer assigned", part.partition);
-                    continue;
-                }
-
-                // note that the consumed position should always be available
-                // as long as the partition is still assigned
-                long position = subscriptions.position(part.partition);
-                if (!subscriptions.isFetchable(part.partition)) {
-                    // this can happen when a partition is paused before fetched records are returned to the consumer's poll call
-                    log.debug("Not returning fetched records for assigned partition {} since it is no longer fetchable", part.partition);
-                } else if (part.fetchOffset == position) {
-                    long nextOffset = part.records.get(part.records.size() - 1).offset() + 1;
-
-                    log.trace("Returning fetched records at offset {} for assigned partition {} and update " +
-                            "position to {}", position, part.partition, nextOffset);
-
-                    List<ConsumerRecord<K, V>> records = drained.get(part.partition);
-                    if (records == null) {
-                        records = part.records;
-                        drained.put(part.partition, records);
-                    } else {
-                        records.addAll(part.records);
-                    }
+            int maxRecords = maxPollRecords;
+            Iterator<PartitionRecords<K, V>> iterator = records.iterator();
+            while (iterator.hasNext() && maxRecords > 0) {
+                PartitionRecords<K, V> part = iterator.next();
+                maxRecords -= append(drained, part, maxRecords);
+                if (part.isConsumed())
+                    iterator.remove();
+            }
+            return drained;
+        }
+    }
 
-                    subscriptions.position(part.partition, nextOffset);
+    private int append(Map<TopicPartition, List<ConsumerRecord<K, V>>> drained,
+                       PartitionRecords<K, V> part,
+                       int maxRecords) {
+        if (!subscriptions.isAssigned(part.partition)) {
+            // this can happen when a rebalance happened before fetched records are returned to the consumer's poll call
+            log.debug("Not returning fetched records for partition {} since it is no longer assigned", part.partition);
+        } else {
+            // note that the consumed position should always be available as long as the partition is still assigned
+            long position = subscriptions.position(part.partition);
+            if (!subscriptions.isFetchable(part.partition)) {
+                // this can happen when a partition is paused before fetched records are returned to the consumer's poll call
+                log.debug("Not returning fetched records for assigned partition {} since it is no longer fetchable", part.partition);
+            } else if (part.fetchOffset == position) {
+                List<ConsumerRecord<K, V>> partRecords = part.take(maxRecords);
+                long nextOffset = partRecords.get(partRecords.size() - 1).offset() + 1;
+
+                log.trace("Returning fetched records at offset {} for assigned partition {} and update " +
+                        "position to {}", position, part.partition, nextOffset);
+
+                List<ConsumerRecord<K, V>> records = drained.get(part.partition);
+                if (records == null) {
+                    records = partRecords;
+                    drained.put(part.partition, records);
                 } else {
-                    // these records aren't next in line based on the last consumed position, ignore them
-                    // they must be from an obsolete request
-                    log.debug("Ignoring fetched records for {} at offset {} since the current position is {}",
-                            part.partition, part.fetchOffset, position);
+                    records.addAll(partRecords);
                 }
+
+                subscriptions.position(part.partition, nextOffset);
+                return partRecords.size();
+            } else {
+                // these records aren't next in line based on the last consumed position, ignore them
+                // they must be from an obsolete request
+                log.debug("Ignoring fetched records for {} at offset {} since the current position is {}",
+                        part.partition, part.fetchOffset, position);
             }
-            this.records.clear();
-            return drained;
         }
+
+        part.discard();
+        return 0;
     }
 
     /**
@@ -441,7 +458,7 @@ public class Fetcher<K, V> {
      * @return A response which can be polled to obtain the corresponding offset.
      */
     private RequestFuture<Long> sendListOffsetRequest(final TopicPartition topicPartition, long timestamp) {
-        Map<TopicPartition, ListOffsetRequest.PartitionData> partitions = new HashMap<TopicPartition, ListOffsetRequest.PartitionData>(1);
+        Map<TopicPartition, ListOffsetRequest.PartitionData> partitions = new HashMap<>(1);
         partitions.put(topicPartition, new ListOffsetRequest.PartitionData(timestamp, 1));
         PartitionInfo info = metadata.fetch().partition(topicPartition);
         if (info == null) {
@@ -494,6 +511,15 @@ public class Fetcher<K, V> {
         }
     }
 
+    private Set<TopicPartition> fetchablePartitions() {
+        Set<TopicPartition> fetchable = subscriptions.fetchablePartitions();
+        if (records.isEmpty())
+            return fetchable;
+        for (PartitionRecords<K, V> partitionRecords : records)
+            fetchable.remove(partitionRecords.partition);
+        return fetchable;
+    }
+
     /**
      * Create fetch requests for all nodes for which we have assigned partitions
      * that have no existing requests in flight.
@@ -501,7 +527,7 @@ public class Fetcher<K, V> {
     private Map<Node, FetchRequest> createFetchRequests(Cluster cluster) {
         // create the fetch info
         Map<Node, Map<TopicPartition, FetchRequest.PartitionData>> fetchable = new HashMap<>();
-        for (TopicPartition partition : subscriptions.fetchablePartitions()) {
+        for (TopicPartition partition : fetchablePartitions()) {
             Node node = cluster.leaderFor(partition);
             if (node == null) {
                 metadata.requestUpdate();
@@ -640,6 +666,37 @@ public class Fetcher<K, V> {
             this.partition = partition;
             this.records = records;
         }
+
+        private boolean isConsumed() {
+            return records == null || records.isEmpty();
+        }
+
+        private void discard() {
+            this.records = null;
+        }
+
+        private List<ConsumerRecord<K, V>> take(int n) {
+            if (records == null)
+                return Collections.emptyList();
+
+            if (n >= records.size()) {
+                List<ConsumerRecord<K, V>> res = this.records;
+                this.records = null;
+                return res;
+            }
+
+            List<ConsumerRecord<K, V>> res = new ArrayList<>(n);
+            Iterator<ConsumerRecord<K, V>> iterator = records.iterator();
+            for (int i = 0; i < n; i++) {
+                res.add(iterator.next());
+                iterator.remove();
+            }
+
+            if (iterator.hasNext())
+                this.fetchOffset = iterator.next().offset();
+
+            return res;
+        }
     }
 
     private class FetchManagerMetrics {

http://git-wip-us.apache.org/repos/asf/kafka/blob/73ecd7a1/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
index 97c3d85..823d04e 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
@@ -40,6 +40,7 @@ import org.apache.kafka.common.protocol.types.Struct;
 import org.apache.kafka.common.record.CompressionType;
 import org.apache.kafka.common.record.MemoryRecords;
 import org.apache.kafka.common.record.Record;
+import org.apache.kafka.common.requests.FetchRequest;
 import org.apache.kafka.common.requests.FetchResponse;
 import org.apache.kafka.common.requests.ListOffsetRequest;
 import org.apache.kafka.common.requests.ListOffsetResponse;
@@ -87,6 +88,7 @@ public class FetcherTest {
     private ConsumerNetworkClient consumerClient = new ConsumerNetworkClient(client, metadata, time, 100);
 
     private MemoryRecords records = MemoryRecords.emptyRecords(ByteBuffer.allocate(1024), CompressionType.NONE);
+    private MemoryRecords nextRecords = MemoryRecords.emptyRecords(ByteBuffer.allocate(1024), CompressionType.NONE);
     private Fetcher<byte[], byte[]> fetcher = createFetcher(subscriptions, metrics);
     private Metrics fetcherMetrics = new Metrics(time);
     private Fetcher<byte[], byte[]> fetcherNoAutoReset = createFetcher(subscriptionsNoAutoReset, fetcherMetrics);
@@ -100,6 +102,10 @@ public class FetcherTest {
         records.append(2L, 0L, "key".getBytes(), "value-2".getBytes());
         records.append(3L, 0L, "key".getBytes(), "value-3".getBytes());
         records.close();
+
+        nextRecords.append(4L, 0L, "key".getBytes(), "value-4".getBytes());
+        nextRecords.append(5L, 0L, "key".getBytes(), "value-5".getBytes());
+        nextRecords.close();
     }
 
     @After
@@ -115,7 +121,7 @@ public class FetcherTest {
         subscriptions.seek(tp, 0);
 
         // normal fetch
-        fetcher.initFetches(cluster);
+        fetcher.sendFetches(cluster);
         client.prepareResponse(fetchResponse(this.records.buffer(), Errors.NONE.code(), 100L, 0));
         consumerClient.poll(0);
         records = fetcher.fetchedRecords().get(tp);
@@ -128,6 +134,52 @@ public class FetcherTest {
         }
     }
 
+    private MockClient.RequestMatcher matchesOffset(final TopicPartition tp, final long offset) {
+        return new MockClient.RequestMatcher() {
+            @Override
+            public boolean matches(ClientRequest request) {
+                FetchRequest fetch = new FetchRequest(request.request().body());
+                return fetch.fetchData().containsKey(tp) &&
+                        fetch.fetchData().get(tp).offset == offset;
+            }
+        };
+    }
+
+    @Test
+    public void testFetchMaxPollRecords() {
+        Fetcher<byte[], byte[]> fetcher = createFetcher(2, subscriptions, new Metrics(time));
+
+        List<ConsumerRecord<byte[], byte[]>> records;
+        subscriptions.assignFromUser(Arrays.asList(tp));
+        subscriptions.seek(tp, 1);
+
+        client.prepareResponse(matchesOffset(tp, 1), fetchResponse(this.records.buffer(), Errors.NONE.code(), 100L, 0));
+        client.prepareResponse(matchesOffset(tp, 4), fetchResponse(this.nextRecords.buffer(), Errors.NONE.code(), 100L, 0));
+
+        fetcher.sendFetches(cluster);
+        consumerClient.poll(0);
+        records = fetcher.fetchedRecords().get(tp);
+        assertEquals(2, records.size());
+        assertEquals(3L, (long) subscriptions.position(tp));
+        assertEquals(1, records.get(0).offset());
+        assertEquals(2, records.get(1).offset());
+
+        fetcher.sendFetches(cluster);
+        consumerClient.poll(0);
+        records = fetcher.fetchedRecords().get(tp);
+        assertEquals(1, records.size());
+        assertEquals(4L, (long) subscriptions.position(tp));
+        assertEquals(3, records.get(0).offset());
+
+        fetcher.sendFetches(cluster);
+        consumerClient.poll(0);
+        records = fetcher.fetchedRecords().get(tp);
+        assertEquals(2, records.size());
+        assertEquals(6L, (long) subscriptions.position(tp));
+        assertEquals(4, records.get(0).offset());
+        assertEquals(5, records.get(1).offset());
+    }
+
     @Test
     public void testFetchNonContinuousRecords() {
         // if we are fetching from a compacted topic, there may be gaps in the returned records
@@ -144,7 +196,7 @@ public class FetcherTest {
         subscriptions.seek(tp, 0);
 
         // normal fetch
-        fetcher.initFetches(cluster);
+        fetcher.sendFetches(cluster);
         client.prepareResponse(fetchResponse(records.buffer(), Errors.NONE.code(), 100L, 0));
         consumerClient.poll(0);
         consumerRecords = fetcher.fetchedRecords().get(tp);
@@ -169,7 +221,7 @@ public class FetcherTest {
         records.close();
 
         // resize the limit of the buffer to pretend it is only fetch-size large
-        fetcher.initFetches(cluster);
+        fetcher.sendFetches(cluster);
         client.prepareResponse(fetchResponse((ByteBuffer) records.buffer().limit(this.fetchSize), Errors.NONE.code(), 100L, 0));
         consumerClient.poll(0);
         fetcher.fetchedRecords();
@@ -181,7 +233,7 @@ public class FetcherTest {
         subscriptions.seek(tp, 0);
 
         // resize the limit of the buffer to pretend it is only fetch-size large
-        fetcher.initFetches(cluster);
+        fetcher.sendFetches(cluster);
         client.prepareResponse(fetchResponse(this.records.buffer(), Errors.TOPIC_AUTHORIZATION_FAILED.code(), 100L, 0));
         consumerClient.poll(0);
         try {
@@ -198,7 +250,7 @@ public class FetcherTest {
         subscriptions.assignFromSubscribed(Arrays.asList(tp));
         subscriptions.seek(tp, 0);
 
-        fetcher.initFetches(cluster);
+        fetcher.sendFetches(cluster);
 
         // Now the rebalance happens and fetch positions are cleared
         subscriptions.assignFromSubscribed(Arrays.asList(tp));
@@ -214,7 +266,7 @@ public class FetcherTest {
         subscriptions.assignFromUser(Arrays.asList(tp));
         subscriptions.seek(tp, 0);
 
-        fetcher.initFetches(cluster);
+        fetcher.sendFetches(cluster);
         subscriptions.pause(tp);
 
         client.prepareResponse(fetchResponse(this.records.buffer(), Errors.NONE.code(), 100L, 0));
@@ -228,7 +280,7 @@ public class FetcherTest {
         subscriptions.seek(tp, 0);
 
         subscriptions.pause(tp);
-        fetcher.initFetches(cluster);
+        fetcher.sendFetches(cluster);
         assertTrue(client.requests().isEmpty());
     }
 
@@ -237,7 +289,7 @@ public class FetcherTest {
         subscriptions.assignFromUser(Arrays.asList(tp));
         subscriptions.seek(tp, 0);
 
-        fetcher.initFetches(cluster);
+        fetcher.sendFetches(cluster);
         client.prepareResponse(fetchResponse(this.records.buffer(), Errors.NOT_LEADER_FOR_PARTITION.code(), 100L, 0));
         consumerClient.poll(0);
         assertEquals(0, fetcher.fetchedRecords().size());
@@ -249,7 +301,7 @@ public class FetcherTest {
         subscriptions.assignFromUser(Arrays.asList(tp));
         subscriptions.seek(tp, 0);
 
-        fetcher.initFetches(cluster);
+        fetcher.sendFetches(cluster);
         client.prepareResponse(fetchResponse(this.records.buffer(), Errors.UNKNOWN_TOPIC_OR_PARTITION.code(), 100L, 0));
         consumerClient.poll(0);
         assertEquals(0, fetcher.fetchedRecords().size());
@@ -261,7 +313,7 @@ public class FetcherTest {
         subscriptions.assignFromUser(Arrays.asList(tp));
         subscriptions.seek(tp, 0);
 
-        fetcher.initFetches(cluster);
+        fetcher.sendFetches(cluster);
         client.prepareResponse(fetchResponse(this.records.buffer(), Errors.OFFSET_OUT_OF_RANGE.code(), 100L, 0));
         consumerClient.poll(0);
         assertTrue(subscriptions.isOffsetResetNeeded(tp));
@@ -274,7 +326,7 @@ public class FetcherTest {
         subscriptionsNoAutoReset.assignFromUser(Arrays.asList(tp));
         subscriptionsNoAutoReset.seek(tp, 0);
 
-        fetcherNoAutoReset.initFetches(cluster);
+        fetcherNoAutoReset.sendFetches(cluster);
         client.prepareResponse(fetchResponse(this.records.buffer(), Errors.OFFSET_OUT_OF_RANGE.code(), 100L, 0));
         consumerClient.poll(0);
         assertFalse(subscriptionsNoAutoReset.isOffsetResetNeeded(tp));
@@ -287,7 +339,7 @@ public class FetcherTest {
         subscriptionsNoAutoReset.assignFromUser(Arrays.asList(tp));
         subscriptionsNoAutoReset.seek(tp, 0);
 
-        fetcherNoAutoReset.initFetches(cluster);
+        fetcherNoAutoReset.sendFetches(cluster);
         client.prepareResponse(fetchResponse(this.records.buffer(), Errors.OFFSET_OUT_OF_RANGE.code(), 100L, 0));
         consumerClient.poll(0);
         assertFalse(subscriptionsNoAutoReset.isOffsetResetNeeded(tp));
@@ -306,7 +358,7 @@ public class FetcherTest {
         subscriptions.assignFromUser(Arrays.asList(tp));
         subscriptions.seek(tp, 0);
 
-        fetcher.initFetches(cluster);
+        fetcher.sendFetches(cluster);
         client.prepareResponse(fetchResponse(this.records.buffer(), Errors.NONE.code(), 100L, 0), true);
         consumerClient.poll(0);
         assertEquals(0, fetcher.fetchedRecords().size());
@@ -474,7 +526,7 @@ public class FetcherTest {
                 }
                 this.records.close();
             }
-            fetcher.initFetches(cluster);
+            fetcher.sendFetches(cluster);
             client.prepareResponse(fetchResponse(this.records.buffer(), Errors.NONE.code(), 100L, 100 * i));
             consumerClient.poll(0);
             records = fetcher.fetchedRecords().get(tp);
@@ -513,11 +565,14 @@ public class FetcherTest {
         return response.toStruct();
     }
 
-    private  Fetcher<byte[], byte[]> createFetcher(SubscriptionState subscriptions, Metrics metrics) {
+    private Fetcher<byte[], byte[]> createFetcher(int maxPollRecords,
+                                                  SubscriptionState subscriptions,
+                                                  Metrics metrics) {
         return new Fetcher<>(consumerClient,
                 minBytes,
                 maxWaitMs,
                 fetchSize,
+                maxPollRecords,
                 true, // check crc
                 new ByteArrayDeserializer(),
                 new ByteArrayDeserializer(),
@@ -528,4 +583,9 @@ public class FetcherTest {
                 time,
                 retryBackoffMs);
     }
+
+
+    private  Fetcher<byte[], byte[]> createFetcher(SubscriptionState subscriptions, Metrics metrics) {
+        return createFetcher(Integer.MAX_VALUE, subscriptions, metrics);
+    }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/73ecd7a1/core/src/test/scala/integration/kafka/api/BaseConsumerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/BaseConsumerTest.scala b/core/src/test/scala/integration/kafka/api/BaseConsumerTest.scala
index f15c005..a66fe35 100644
--- a/core/src/test/scala/integration/kafka/api/BaseConsumerTest.scala
+++ b/core/src/test/scala/integration/kafka/api/BaseConsumerTest.scala
@@ -289,8 +289,9 @@ abstract class BaseConsumerTest extends IntegrationTestHarness with Logging {
                                         startingKeyAndValueIndex: Int = 0,
                                         startingTimestamp: Long = 0L,
                                         timestampType: TimestampType = TimestampType.CREATE_TIME,
-                                        tp: TopicPartition = tp) {
-    val records = consumeRecords(consumer, numRecords)
+                                        tp: TopicPartition = tp,
+                                        maxPollRecords: Int = Int.MaxValue) {
+    val records = consumeRecords(consumer, numRecords, maxPollRecords = maxPollRecords)
     val now = System.currentTimeMillis()
     for (i <- 0 until numRecords) {
       val record = records.get(i)
@@ -311,12 +312,16 @@ abstract class BaseConsumerTest extends IntegrationTestHarness with Logging {
     }
   }
 
-  protected def consumeRecords[K, V](consumer: Consumer[K, V], numRecords: Int): ArrayList[ConsumerRecord[K, V]] = {
+  protected def consumeRecords[K, V](consumer: Consumer[K, V],
+                                     numRecords: Int,
+                                     maxPollRecords: Int = Int.MaxValue): ArrayList[ConsumerRecord[K, V]] = {
     val records = new ArrayList[ConsumerRecord[K, V]]
     val maxIters = numRecords * 300
     var iters = 0
     while (records.size < numRecords) {
-      for (record <- consumer.poll(50).asScala)
+      val polledRecords = consumer.poll(50).asScala
+      assertTrue(polledRecords.size <= maxPollRecords)
+      for (record <- polledRecords)
         records.add(record)
       if (iters > maxIters)
         throw new IllegalStateException("Failed to consume the expected records after " + iters + " iterations.")

http://git-wip-us.apache.org/repos/asf/kafka/blob/73ecd7a1/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala b/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala
index 3d7cad3..f8ad633 100644
--- a/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala
+++ b/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala
@@ -40,6 +40,21 @@ import scala.collection.mutable.Buffer
 class PlaintextConsumerTest extends BaseConsumerTest {
 
   @Test
+  def testMaxPollRecords() {
+    val maxPollRecords = 2
+    val numRecords = 10000
+
+    sendRecords(numRecords)
+
+    this.consumerConfig.setProperty(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, maxPollRecords.toString)
+    val consumer0 = new KafkaConsumer(this.consumerConfig, new ByteArrayDeserializer(), new ByteArrayDeserializer())
+    consumer0.assign(List(tp).asJava)
+
+    consumeAndVerifyRecords(consumer0, numRecords = numRecords, startingOffset = 0,
+      maxPollRecords = maxPollRecords)
+  }
+
+  @Test
   def testAutoCommitOnClose() {
     this.consumerConfig.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true")
     val consumer0 = new KafkaConsumer(this.consumerConfig, new ByteArrayDeserializer(), new ByteArrayDeserializer())