You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2015/07/30 23:23:05 UTC

kafka git commit: KAFKA-2350; KafkaConsumer pause/resume API

Repository: kafka
Updated Branches:
  refs/heads/trunk 1162cc1dd -> be82a2afc


KAFKA-2350; KafkaConsumer pause/resume API

Author: Jason Gustafson <ja...@confluent.io>

Reviewers: Ismael, Ashish, Guozhang

Closes #100 from hachikuji/KAFKA-2350 and squashes the following commits:

250e823 [Jason Gustafson] KAFKA-2350; KafkaConsumer pause/resume API


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/be82a2af
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/be82a2af
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/be82a2af

Branch: refs/heads/trunk
Commit: be82a2afc9e38adc0109dc694834ca5947128877
Parents: 1162cc1
Author: Jason Gustafson <ja...@confluent.io>
Authored: Thu Jul 30 14:23:43 2015 -0700
Committer: Guozhang Wang <wa...@gmail.com>
Committed: Thu Jul 30 14:23:43 2015 -0700

----------------------------------------------------------------------
 .../apache/kafka/clients/consumer/Consumer.java |  10 +
 .../kafka/clients/consumer/KafkaConsumer.java   |  48 +++-
 .../kafka/clients/consumer/MockConsumer.java    |  39 ++-
 .../clients/consumer/internals/Coordinator.java |   8 +-
 .../clients/consumer/internals/Fetcher.java     |  45 ++--
 .../consumer/internals/SubscriptionState.java   | 238 +++++++++++++------
 .../clients/consumer/MockConsumerTest.java      |   2 +-
 .../clients/consumer/internals/FetcherTest.java |  36 ++-
 .../internals/SubscriptionStateTest.java        |  58 ++++-
 .../integration/kafka/api/ConsumerTest.scala    |  32 ++-
 10 files changed, 386 insertions(+), 130 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/be82a2af/clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java
index 23e410b..158e1ea 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java
@@ -119,6 +119,16 @@ public interface Consumer<K, V> extends Closeable {
     public Map<String, List<PartitionInfo>> listTopics();
 
     /**
+     * @see KafkaConsumer#pause(TopicPartition...)
+     */
+    public void pause(TopicPartition... partitions);
+
+    /**
+     * @see KafkaConsumer#resume(TopicPartition...)
+     */
+    public void resume(TopicPartition... partitions);
+
+    /**
      * @see KafkaConsumer#close()
      */
     public void close();

http://git-wip-us.apache.org/repos/asf/kafka/blob/be82a2af/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
index 923ff99..7851644 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
@@ -43,7 +43,6 @@ import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.ConcurrentModificationException;
-import java.util.HashMap;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
@@ -852,9 +851,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
     public void commit(CommitType commitType, ConsumerCommitCallback callback) {
         acquire();
         try {
-            // need defensive copy to ensure offsets are not removed before completion (e.g. in rebalance)
-            Map<TopicPartition, Long> allConsumed = new HashMap<TopicPartition, Long>(this.subscriptions.allConsumed());
-            commit(allConsumed, commitType, callback);
+            commit(subscriptions.allConsumed(), commitType, callback);
         } finally {
             release();
         }
@@ -941,7 +938,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
     public long position(TopicPartition partition) {
         acquire();
         try {
-            if (!this.subscriptions.assignedPartitions().contains(partition))
+            if (!this.subscriptions.isAssigned(partition))
                 throw new IllegalArgumentException("You can only check the position for partitions assigned to this consumer.");
             Long offset = this.subscriptions.consumed(partition);
             if (offset == null) {
@@ -972,7 +969,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
         acquire();
         try {
             Long committed;
-            if (subscriptions.assignedPartitions().contains(partition)) {
+            if (subscriptions.isAssigned(partition)) {
                 committed = this.subscriptions.committed(partition);
                 if (committed == null) {
                     coordinator.refreshCommittedOffsetsIfNeeded();
@@ -1040,6 +1037,45 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
         }
     }
 
+    /**
+     * Suspend fetching from the requested partitions. Future calls to {@link #poll(long)} will not return
+     * any records from these partitions until they have been resumed using {@link #resume(TopicPartition...)}.
+     * Note that this method does not affect partition subscription. In particular, it does not cause a group
+     * rebalance when automatic assignment is used.
+     * @param partitions The partitions which should be paused
+     */
+    @Override
+    public void pause(TopicPartition... partitions) {
+        acquire();
+        try {
+            for (TopicPartition partition: partitions) {
+                log.debug("Pausing partition {}", partition);
+                subscriptions.pause(partition);
+            }
+        } finally {
+            release();
+        }
+    }
+
+    /**
+     * Resume any partitions which have been paused with {@link #pause(TopicPartition...)}. New calls to
+     * {@link #poll(long)} will return records from these partitions if there are any to be fetched.
+     * If the partitions were not previously paused, this method is a no-op.
+     * @param partitions The partitions which should be resumed
+     */
+    @Override
+    public void resume(TopicPartition... partitions) {
+        acquire();
+        try {
+            for (TopicPartition partition: partitions) {
+                log.debug("Resuming partition {}", partition);
+                subscriptions.resume(partition);
+            }
+        } finally {
+            release();
+        }
+    }
+
     @Override
     public void close() {
         acquire();

http://git-wip-us.apache.org/repos/asf/kafka/blob/be82a2af/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java
index 5b22fa0..b07e760 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java
@@ -12,6 +12,12 @@
  */
 package org.apache.kafka.clients.consumer;
 
+import org.apache.kafka.clients.consumer.internals.SubscriptionState;
+import org.apache.kafka.common.Metric;
+import org.apache.kafka.common.MetricName;
+import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.TopicPartition;
+
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
@@ -20,12 +26,6 @@ import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Set;
 
-import org.apache.kafka.clients.consumer.internals.SubscriptionState;
-import org.apache.kafka.common.Metric;
-import org.apache.kafka.common.PartitionInfo;
-import org.apache.kafka.common.TopicPartition;
-import org.apache.kafka.common.MetricName;
-
 /**
  * A mock of the {@link Consumer} interface you can use for testing code that uses Kafka. This class is <i> not
  * threadsafe </i>
@@ -83,9 +83,11 @@ public class MockConsumer<K, V> implements Consumer<K, V> {
         ensureNotClosed();
         // update the consumed offset
         for (Map.Entry<TopicPartition, List<ConsumerRecord<K, V>>> entry : this.records.entrySet()) {
-            List<ConsumerRecord<K, V>> recs = entry.getValue();
-            if (!recs.isEmpty())
-                this.subscriptions.consumed(entry.getKey(), recs.get(recs.size() - 1).offset());
+            if (!subscriptions.isPaused(entry.getKey())) {
+                List<ConsumerRecord<K, V>> recs = entry.getValue();
+                if (!recs.isEmpty())
+                    this.subscriptions.consumed(entry.getKey(), recs.get(recs.size() - 1).offset());
+            }
         }
 
         ConsumerRecords<K, V> copy = new ConsumerRecords<K, V>(this.records);
@@ -96,7 +98,12 @@ public class MockConsumer<K, V> implements Consumer<K, V> {
     public synchronized void addRecord(ConsumerRecord<K, V> record) {
         ensureNotClosed();
         TopicPartition tp = new TopicPartition(record.topic(), record.partition());
-        this.subscriptions.assignedPartitions().add(tp);
+        ArrayList<TopicPartition> currentAssigned = new ArrayList<>(this.subscriptions.assignedPartitions());
+        if (!currentAssigned.contains(tp)) {
+            currentAssigned.add(tp);
+            this.subscriptions.changePartitionAssignment(currentAssigned);
+        }
+        subscriptions.seek(tp, record.offset());
         List<ConsumerRecord<K, V>> recs = this.records.get(tp);
         if (recs == null) {
             recs = new ArrayList<ConsumerRecord<K, V>>();
@@ -189,6 +196,18 @@ public class MockConsumer<K, V> implements Consumer<K, V> {
     }
 
     @Override
+    public void pause(TopicPartition... partitions) {
+        for (TopicPartition partition : partitions)
+            subscriptions.pause(partition);
+    }
+
+    @Override
+    public void resume(TopicPartition... partitions) {
+        for (TopicPartition partition : partitions)
+            subscriptions.resume(partition);
+    }
+
+    @Override
     public synchronized void close() {
         ensureNotClosed();
         this.closed = true;

http://git-wip-us.apache.org/repos/asf/kafka/blob/be82a2af/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java
index 6026b23..cd5cdc3 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java
@@ -119,7 +119,9 @@ public final class Coordinator {
             Map<TopicPartition, Long> offsets = fetchCommittedOffsets(subscriptions.assignedPartitions());
             for (Map.Entry<TopicPartition, Long> entry : offsets.entrySet()) {
                 TopicPartition tp = entry.getKey();
-                this.subscriptions.committed(tp, entry.getValue());
+                // verify assignment is still active
+                if (subscriptions.isAssigned(tp))
+                    this.subscriptions.committed(tp, entry.getValue());
             }
             this.subscriptions.commitsRefreshed();
         }
@@ -459,7 +461,9 @@ public final class Coordinator {
                 short errorCode = entry.getValue();
                 if (errorCode == Errors.NONE.code()) {
                     log.debug("Committed offset {} for partition {}", offset, tp);
-                    subscriptions.committed(tp, offset);
+                    if (subscriptions.isAssigned(tp))
+                        // update the local cache only if the partition is still assigned
+                        subscriptions.committed(tp, offset);
                 } else if (errorCode == Errors.CONSUMER_COORDINATOR_NOT_AVAILABLE.code()
                         || errorCode == Errors.NOT_COORDINATOR_FOR_CONSUMER.code()) {
                     coordinatorDead();

http://git-wip-us.apache.org/repos/asf/kafka/blob/be82a2af/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
index 9f71451..956197b 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
@@ -143,8 +143,7 @@ public class Fetcher<K, V> {
     public void updateFetchPositions(Set<TopicPartition> partitions) {
         // reset the fetch position to the committed position
         for (TopicPartition tp : partitions) {
-            // skip if we already have a fetch position
-            if (subscriptions.fetched(tp) != null)
+            if (!subscriptions.isAssigned(tp) || subscriptions.isFetchable(tp))
                 continue;
 
             // TODO: If there are several offsets to reset, we could submit offset requests in parallel
@@ -222,7 +221,10 @@ public class Fetcher<K, V> {
 
         log.debug("Resetting offset for partition {} to {} offset.", partition, strategy.name().toLowerCase());
         long offset = listOffset(partition, timestamp);
-        this.subscriptions.seek(partition, offset);
+
+        // we might lose the assignment while fetching the offset, so check it is still active
+        if (subscriptions.isAssigned(partition))
+            this.subscriptions.seek(partition, offset);
     }
 
     /**
@@ -259,11 +261,15 @@ public class Fetcher<K, V> {
         if (this.subscriptions.partitionAssignmentNeeded()) {
             return Collections.emptyMap();
         } else {
-            Map<TopicPartition, List<ConsumerRecord<K, V>>> drained = new HashMap<TopicPartition, List<ConsumerRecord<K, V>>>();
+            Map<TopicPartition, List<ConsumerRecord<K, V>>> drained = new HashMap<>();
             for (PartitionRecords<K, V> part : this.records) {
+                if (!subscriptions.isFetchable(part.partition)) {
+                    log.debug("Ignoring fetched records for {} since it is no longer fetchable", part.partition);
+                    continue;
+                }
+
                 Long consumed = subscriptions.consumed(part.partition);
-                if (this.subscriptions.assignedPartitions().contains(part.partition)
-                    && consumed != null && part.fetchOffset == consumed) {
+                if (consumed != null && part.fetchOffset == consumed) {
                     List<ConsumerRecord<K, V>> records = drained.get(part.partition);
                     if (records == null) {
                         records = part.records;
@@ -354,8 +360,8 @@ public class Fetcher<K, V> {
      */
     private Map<Node, FetchRequest> createFetchRequests(Cluster cluster) {
         // create the fetch info
-        Map<Node, Map<TopicPartition, FetchRequest.PartitionData>> fetchable = new HashMap<Node, Map<TopicPartition, FetchRequest.PartitionData>>();
-        for (TopicPartition partition : subscriptions.assignedPartitions()) {
+        Map<Node, Map<TopicPartition, FetchRequest.PartitionData>> fetchable = new HashMap<>();
+        for (TopicPartition partition : subscriptions.fetchablePartitions()) {
             Node node = cluster.leaderFor(partition);
             if (node == null) {
                 metadata.requestUpdate();
@@ -363,16 +369,17 @@ public class Fetcher<K, V> {
                 // if there is a leader and no in-flight requests, issue a new fetch
                 Map<TopicPartition, FetchRequest.PartitionData> fetch = fetchable.get(node);
                 if (fetch == null) {
-                    fetch = new HashMap<TopicPartition, FetchRequest.PartitionData>();
+                    fetch = new HashMap<>();
                     fetchable.put(node, fetch);
                 }
+
                 long offset = this.subscriptions.fetched(partition);
                 fetch.put(partition, new FetchRequest.PartitionData(offset, this.fetchSize));
             }
         }
 
         // create the fetches
-        Map<Node, FetchRequest> requests = new HashMap<Node, FetchRequest>();
+        Map<Node, FetchRequest> requests = new HashMap<>();
         for (Map.Entry<Node, Map<TopicPartition, FetchRequest.PartitionData>> entry : fetchable.entrySet()) {
             Node node = entry.getKey();
             FetchRequest fetch = new FetchRequest(this.maxWaitMs, this.minBytes, entry.getValue());
@@ -399,15 +406,7 @@ public class Fetcher<K, V> {
                 if (!subscriptions.assignedPartitions().contains(tp)) {
                     log.debug("Ignoring fetched data for partition {} which is no longer assigned.", tp);
                 } else if (partition.errorCode == Errors.NONE.code()) {
-                    int bytes = 0;
-                    ByteBuffer buffer = partition.recordSet;
-                    MemoryRecords records = MemoryRecords.readableRecords(buffer);
                     long fetchOffset = request.fetchData().get(tp).offset;
-                    List<ConsumerRecord<K, V>> parsed = new ArrayList<ConsumerRecord<K, V>>();
-                    for (LogEntry logEntry : records) {
-                        parsed.add(parseRecord(tp, logEntry));
-                        bytes += logEntry.size();
-                    }
 
                     // we are interested in this fetch only if the beginning offset matches the
                     // current consumed position
@@ -422,7 +421,15 @@ public class Fetcher<K, V> {
                         continue;
                     }
 
-                    if (parsed.size() > 0) {
+                    int bytes = 0;
+                    ByteBuffer buffer = partition.recordSet;
+                    MemoryRecords records = MemoryRecords.readableRecords(buffer);
+                    List<ConsumerRecord<K, V>> parsed = new ArrayList<ConsumerRecord<K, V>>();
+                    for (LogEntry logEntry : records) {
+                        parsed.add(parseRecord(tp, logEntry));
+                        bytes += logEntry.size();
+                    }
+                    if (!parsed.isEmpty()) {
                         ConsumerRecord<K, V> record = parsed.get(parsed.size() - 1);
                         this.subscriptions.fetched(tp, record.offset() + 1);
                         this.records.add(new PartitionRecords<K, V>(fetchOffset, tp, parsed));

http://git-wip-us.apache.org/repos/asf/kafka/blob/be82a2af/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
index 8a2cb12..6788ee6 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
@@ -23,7 +23,25 @@ import java.util.Map;
 import java.util.Set;
 
 /**
- * A class for tracking the topics, partitions, and offsets for the consumer
+ * A class for tracking the topics, partitions, and offsets for the consumer. A partition
+ * is "assigned" either directly with {@link #subscribe(TopicPartition)} (manual assignment)
+ * or with {@link #changePartitionAssignment(List)} (automatic assignment).
+ *
+ * Once assigned, the partition is not considered "fetchable" until its initial position has
+ * been set with {@link #seek(TopicPartition, long)}. Fetchable partitions track a fetch
+ * position which is used to set the offset of the next fetch, and a consumed position
+ * which is the last offset that has been returned to the user. You can suspend fetching
+ * from a partition through {@link #pause(TopicPartition)} without affecting the fetched/consumed
+ * offsets. The partition will remain unfetchable until the {@link #resume(TopicPartition)} is
+ * used. You can also query the pause state independently with {@link #isPaused(TopicPartition)}.
+ *
+ * Note that pause state as well as fetch/consumed positions are not preserved when partition
+ * assignment is changed either with {@link #unsubscribe(TopicPartition)} or
+ * {@link #changePartitionAssignment(List)}.
+ *
+ * This class also maintains a cache of the latest commit position for each of the assigned
+ * partitions. This is updated through {@link #committed(TopicPartition, long)} and can be used
+ * to set the initial fetch position (e.g. {@link Fetcher#resetOffset(TopicPartition)}.
  */
 public class SubscriptionState {
 
@@ -34,16 +52,7 @@ public class SubscriptionState {
     private final Set<TopicPartition> subscribedPartitions;
 
     /* the list of partitions currently assigned */
-    private final Set<TopicPartition> assignedPartitions;
-
-    /* the offset exposed to the user */
-    private final Map<TopicPartition, Long> consumed;
-
-    /* the current point we have fetched up to */
-    private final Map<TopicPartition, Long> fetched;
-
-    /* the last committed offset for each partition */
-    private final Map<TopicPartition, Long> committed;
+    private final Map<TopicPartition, TopicPartitionState> assignedPartitions;
 
     /* do we need to request a partition assignment from the coordinator? */
     private boolean needsPartitionAssignment;
@@ -51,28 +60,21 @@ public class SubscriptionState {
     /* do we need to request the latest committed offsets from the coordinator? */
     private boolean needsFetchCommittedOffsets;
 
-    /* Partitions that need to be reset before fetching */
-    private Map<TopicPartition, OffsetResetStrategy> resetPartitions;
-
     /* Default offset reset strategy */
-    private OffsetResetStrategy offsetResetStrategy;
-
-    public SubscriptionState(OffsetResetStrategy offsetResetStrategy) {
-        this.offsetResetStrategy = offsetResetStrategy;
-        this.subscribedTopics = new HashSet<String>();
-        this.subscribedPartitions = new HashSet<TopicPartition>();
-        this.assignedPartitions = new HashSet<TopicPartition>();
-        this.consumed = new HashMap<TopicPartition, Long>();
-        this.fetched = new HashMap<TopicPartition, Long>();
-        this.committed = new HashMap<TopicPartition, Long>();
+    private final OffsetResetStrategy defaultResetStrategy;
+
+    public SubscriptionState(OffsetResetStrategy defaultResetStrategy) {
+        this.defaultResetStrategy = defaultResetStrategy;
+        this.subscribedTopics = new HashSet<>();
+        this.subscribedPartitions = new HashSet<>();
+        this.assignedPartitions = new HashMap<>();
         this.needsPartitionAssignment = false;
         this.needsFetchCommittedOffsets = true; // initialize to true for the consumers to fetch offset upon starting up
-        this.resetPartitions = new HashMap<TopicPartition, OffsetResetStrategy>();
     }
 
     public void subscribe(String topic) {
-        if (this.subscribedPartitions.size() > 0)
-            throw new IllegalStateException("Subcription to topics and partitions are mutually exclusive");
+        if (!this.subscribedPartitions.isEmpty())
+            throw new IllegalStateException("Subscription to topics and partitions are mutually exclusive");
         if (!this.subscribedTopics.contains(topic)) {
             this.subscribedTopics.add(topic);
             this.needsPartitionAssignment = true;
@@ -95,10 +97,10 @@ public class SubscriptionState {
     }
 
     public void subscribe(TopicPartition tp) {
-        if (this.subscribedTopics.size() > 0)
-            throw new IllegalStateException("Subcription to topics and partitions are mutually exclusive");
+        if (!this.subscribedTopics.isEmpty())
+            throw new IllegalStateException("Subscription to topics and partitions are mutually exclusive");
         this.subscribedPartitions.add(tp);
-        this.assignedPartitions.add(tp);
+        addAssignedPartition(tp);
     }
 
     public void unsubscribe(TopicPartition partition) {
@@ -110,17 +112,10 @@ public class SubscriptionState {
     
     private void clearPartition(TopicPartition tp) {
         this.assignedPartitions.remove(tp);
-        this.committed.remove(tp);
-        this.fetched.remove(tp);
-        this.consumed.remove(tp);
-        this.resetPartitions.remove(tp);
     }
 
     public void clearAssignment() {
         this.assignedPartitions.clear();
-        this.committed.clear();
-        this.fetched.clear();
-        this.consumed.clear();
         this.needsPartitionAssignment = !subscribedTopics().isEmpty();
     }
 
@@ -129,21 +124,26 @@ public class SubscriptionState {
     }
 
     public Long fetched(TopicPartition tp) {
-        return this.fetched.get(tp);
+        return assignedState(tp).fetched;
     }
 
     public void fetched(TopicPartition tp, long offset) {
-        if (!this.assignedPartitions.contains(tp))
-            throw new IllegalArgumentException("Can't change the fetch position for a partition you are not currently subscribed to.");
-        this.fetched.put(tp, offset);
+        assignedState(tp).fetched(offset);
+    }
+
+    private TopicPartitionState assignedState(TopicPartition tp) {
+        TopicPartitionState state = this.assignedPartitions.get(tp);
+        if (state == null)
+            throw new IllegalStateException("No current assignment for partition " + tp);
+        return state;
     }
 
     public void committed(TopicPartition tp, long offset) {
-        this.committed.put(tp, offset);
+        assignedState(tp).committed(offset);
     }
 
     public Long committed(TopicPartition tp) {
-        return this.committed.get(tp);
+        return assignedState(tp).committed;
     }
 
     public void needRefreshCommits() {
@@ -157,15 +157,22 @@ public class SubscriptionState {
     public void commitsRefreshed() {
         this.needsFetchCommittedOffsets = false;
     }
-    
+
     public void seek(TopicPartition tp, long offset) {
-        fetched(tp, offset);
-        consumed(tp, offset);
-        resetPartitions.remove(tp);
+        assignedState(tp).seek(offset);
     }
 
     public Set<TopicPartition> assignedPartitions() {
-        return this.assignedPartitions;
+        return this.assignedPartitions.keySet();
+    }
+
+    public Set<TopicPartition> fetchablePartitions() {
+        Set<TopicPartition> fetchable = new HashSet<>();
+        for (Map.Entry<TopicPartition, TopicPartitionState> entry : assignedPartitions.entrySet()) {
+            if (entry.getValue().isFetchable())
+                fetchable.add(entry.getKey());
+        }
+        return fetchable;
     }
 
     public boolean partitionsAutoAssigned() {
@@ -173,49 +180,52 @@ public class SubscriptionState {
     }
 
     public void consumed(TopicPartition tp, long offset) {
-        if (!this.assignedPartitions.contains(tp))
-            throw new IllegalArgumentException("Can't change the consumed position for a partition you are not currently subscribed to.");
-        this.consumed.put(tp, offset);
+        assignedState(tp).consumed(offset);
     }
 
-    public Long consumed(TopicPartition partition) {
-        return this.consumed.get(partition);
+    public Long consumed(TopicPartition tp) {
+        return assignedState(tp).consumed;
     }
 
     public Map<TopicPartition, Long> allConsumed() {
-        return this.consumed;
+        Map<TopicPartition, Long> allConsumed = new HashMap<>();
+        for (Map.Entry<TopicPartition, TopicPartitionState> entry : assignedPartitions.entrySet()) {
+            TopicPartitionState state = entry.getValue();
+            if (state.hasValidPosition)
+                allConsumed.put(entry.getKey(), state.consumed);
+        }
+        return allConsumed;
     }
 
     public void needOffsetReset(TopicPartition partition, OffsetResetStrategy offsetResetStrategy) {
-        this.resetPartitions.put(partition, offsetResetStrategy);
-        this.fetched.remove(partition);
-        this.consumed.remove(partition);
+        assignedState(partition).awaitReset(offsetResetStrategy);
     }
 
     public void needOffsetReset(TopicPartition partition) {
-        needOffsetReset(partition, offsetResetStrategy);
+        needOffsetReset(partition, defaultResetStrategy);
     }
 
     public boolean isOffsetResetNeeded(TopicPartition partition) {
-        return resetPartitions.containsKey(partition);
-    }
-
-    public boolean isOffsetResetNeeded() {
-        return !resetPartitions.isEmpty();
+        return assignedState(partition).awaitingReset;
     }
 
     public OffsetResetStrategy resetStrategy(TopicPartition partition) {
-        return resetPartitions.get(partition);
+        return assignedState(partition).resetStrategy;
     }
 
     public boolean hasAllFetchPositions() {
-        return this.fetched.size() >= this.assignedPartitions.size();
+        for (TopicPartitionState state : assignedPartitions.values())
+            if (!state.hasValidPosition)
+                return false;
+        return true;
     }
 
     public Set<TopicPartition> missingFetchPositions() {
-        Set<TopicPartition> copy = new HashSet<TopicPartition>(this.assignedPartitions);
-        copy.removeAll(this.fetched.keySet());
-        return copy;
+        Set<TopicPartition> missing = new HashSet<>(this.assignedPartitions.keySet());
+        for (Map.Entry<TopicPartition, TopicPartitionState> entry : assignedPartitions.entrySet())
+            if (!entry.getValue().hasValidPosition)
+                missing.add(entry.getKey());
+        return missing;
     }
 
     public boolean partitionAssignmentNeeded() {
@@ -227,9 +237,99 @@ public class SubscriptionState {
             if (!this.subscribedTopics.contains(tp.topic()))
                 throw new IllegalArgumentException("Assigned partition " + tp + " for non-subscribed topic.");
         this.clearAssignment();
-        this.assignedPartitions.addAll(assignments);
+        for (TopicPartition tp: assignments)
+            addAssignedPartition(tp);
         this.needsPartitionAssignment = false;
     }
 
+    public boolean isAssigned(TopicPartition tp) {
+        return assignedPartitions.containsKey(tp);
+    }
+
+    public boolean isPaused(TopicPartition tp) {
+        return isAssigned(tp) && assignedState(tp).paused;
+    }
+
+    public boolean isFetchable(TopicPartition tp) {
+        return isAssigned(tp) && assignedState(tp).isFetchable();
+    }
+
+    public void pause(TopicPartition tp) {
+        assignedState(tp).pause();
+    }
+
+    public void resume(TopicPartition tp) {
+        assignedState(tp).resume();
+    }
+
+    private void addAssignedPartition(TopicPartition tp) {
+        this.assignedPartitions.put(tp, new TopicPartitionState());
+    }
+
+    private static class TopicPartitionState {
+        private Long consumed;   // offset exposed to the user
+        private Long fetched;    // current fetch position
+        private Long committed;  // last committed position
+
+        private boolean hasValidPosition; // whether we have valid consumed and fetched positions
+        private boolean paused;  // whether this partition has been paused by the user
+        private boolean awaitingReset; // whether we are awaiting reset
+        private OffsetResetStrategy resetStrategy;  // the reset strategy if awaitingReset is set
+
+        public TopicPartitionState() {
+            this.paused = false;
+            this.consumed = null;
+            this.fetched = null;
+            this.committed = null;
+            this.awaitingReset = false;
+            this.hasValidPosition = false;
+            this.resetStrategy = null;
+        }
+
+        private void awaitReset(OffsetResetStrategy strategy) {
+            this.awaitingReset = true;
+            this.resetStrategy = strategy;
+            this.consumed = null;
+            this.fetched = null;
+            this.hasValidPosition = false;
+        }
+
+        private void seek(long offset) {
+            this.consumed = offset;
+            this.fetched = offset;
+            this.awaitingReset = false;
+            this.resetStrategy = null;
+            this.hasValidPosition = true;
+        }
+
+        private void fetched(long offset) {
+            if (!hasValidPosition)
+                throw new IllegalStateException("Cannot update fetch position without valid consumed/fetched positions");
+            this.fetched = offset;
+        }
+
+        private void consumed(long offset) {
+            if (!hasValidPosition)
+                throw new IllegalStateException("Cannot update consumed position without valid consumed/fetched positions");
+            this.consumed = offset;
+        }
+
+        private void committed(Long offset) {
+            this.committed = offset;
+        }
+
+        private void pause() {
+            this.paused = true;
+        }
+
+        private void resume() {
+            this.paused = false;
+        }
+
+        private boolean isFetchable() {
+            return !paused && hasValidPosition;
+        }
+
+    }
 
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/be82a2af/clients/src/test/java/org/apache/kafka/clients/consumer/MockConsumerTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/MockConsumerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/MockConsumerTest.java
index 26b6b40..d4da642 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/MockConsumerTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/MockConsumerTest.java
@@ -29,7 +29,7 @@ public class MockConsumerTest {
 
     @Test
     public void testSimpleMock() {
-        consumer.subscribe("topic");
+        consumer.subscribe("test");
         assertEquals(0, consumer.poll(1000).count());
         ConsumerRecord<String, String> rec1 = new ConsumerRecord<String, String>("test", 0, 0, "key1", "value1");
         ConsumerRecord<String, String> rec2 = new ConsumerRecord<String, String>("test", 0, 1, "key2", "value2");

http://git-wip-us.apache.org/repos/asf/kafka/blob/be82a2af/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
index 06e2990..56850bb 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
@@ -45,6 +45,7 @@ import java.util.List;
 import java.util.Map;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 
 public class FetcherTest {
@@ -99,8 +100,7 @@ public class FetcherTest {
     public void testFetchNormal() {
         List<ConsumerRecord<byte[], byte[]>> records;
         subscriptions.subscribe(tp);
-        subscriptions.fetched(tp, 0);
-        subscriptions.consumed(tp, 0);
+        subscriptions.seek(tp, 0);
 
         // normal fetch
         fetcher.initFetches(cluster);
@@ -121,8 +121,7 @@ public class FetcherTest {
     public void testFetchDuringRebalance() {
         subscriptions.subscribe(topicName);
         subscriptions.changePartitionAssignment(Arrays.asList(tp));
-        subscriptions.fetched(tp, 0);
-        subscriptions.consumed(tp, 0);
+        subscriptions.seek(tp, 0);
 
         fetcher.initFetches(cluster);
 
@@ -136,10 +135,32 @@ public class FetcherTest {
     }
 
     @Test
+    public void testInFlightFetchOnPausedPartition() {
+        subscriptions.subscribe(tp);
+        subscriptions.seek(tp, 0);
+
+        fetcher.initFetches(cluster);
+        subscriptions.pause(tp);
+
+        client.prepareResponse(fetchResponse(this.records.buffer(), Errors.NONE.code(), 100L));
+        consumerClient.poll(0);
+        assertNull(fetcher.fetchedRecords().get(tp));
+    }
+
+    @Test
+    public void testFetchOnPausedPartition() {
+        subscriptions.subscribe(tp);
+        subscriptions.seek(tp, 0);
+
+        subscriptions.pause(tp);
+        fetcher.initFetches(cluster);
+        assertTrue(client.requests().isEmpty());
+    }
+
+    @Test
     public void testFetchFailed() {
         subscriptions.subscribe(tp);
-        subscriptions.fetched(tp, 0);
-        subscriptions.consumed(tp, 0);
+        subscriptions.seek(tp, 0);
 
         // fetch with not leader
         fetcher.initFetches(cluster);
@@ -169,8 +190,7 @@ public class FetcherTest {
     @Test
     public void testFetchOutOfRange() {
         subscriptions.subscribe(tp);
-        subscriptions.fetched(tp, 5);
-        subscriptions.consumed(tp, 5);
+        subscriptions.seek(tp, 5);
 
         // fetch with out of range
         fetcher.initFetches(cluster);

http://git-wip-us.apache.org/repos/asf/kafka/blob/be82a2af/clients/src/test/java/org/apache/kafka/clients/consumer/internals/SubscriptionStateTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/SubscriptionStateTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/SubscriptionStateTest.java
index c47f3fb..1ba6f7a 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/SubscriptionStateTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/SubscriptionStateTest.java
@@ -17,6 +17,7 @@
 package org.apache.kafka.clients.consumer.internals;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 import static java.util.Arrays.asList;
 
@@ -37,12 +38,13 @@ public class SubscriptionStateTest {
         state.subscribe(tp0);
         assertEquals(Collections.singleton(tp0), state.assignedPartitions());
         state.committed(tp0, 1);
-        state.fetched(tp0, 1);
-        state.consumed(tp0, 1);
+        state.seek(tp0, 1);
+        assertTrue(state.isFetchable(tp0));
         assertAllPositions(tp0, 1L);
         state.unsubscribe(tp0);
         assertTrue(state.assignedPartitions().isEmpty());
-        assertAllPositions(tp0, null);
+        assertFalse(state.isAssigned(tp0));
+        assertFalse(state.isFetchable(tp0));
     }
 
     @Test
@@ -52,10 +54,15 @@ public class SubscriptionStateTest {
         assertEquals(5L, (long) state.fetched(tp0));
         assertEquals(5L, (long) state.consumed(tp0));
         state.needOffsetReset(tp0);
-        assertTrue(state.isOffsetResetNeeded());
+        assertFalse(state.isFetchable(tp0));
         assertTrue(state.isOffsetResetNeeded(tp0));
         assertEquals(null, state.fetched(tp0));
         assertEquals(null, state.consumed(tp0));
+
+        // seek should clear the reset and make the partition fetchable
+        state.seek(tp0, 0);
+        assertTrue(state.isFetchable(tp0));
+        assertFalse(state.isOffsetResetNeeded(tp0));
     }
 
     @Test
@@ -65,16 +72,28 @@ public class SubscriptionStateTest {
         assertTrue(state.assignedPartitions().isEmpty());
         assertTrue(state.partitionsAutoAssigned());
         state.changePartitionAssignment(asList(tp0));
+        state.seek(tp0, 1);
         state.committed(tp0, 1);
-        state.fetched(tp0, 1);
-        state.consumed(tp0, 1);
         assertAllPositions(tp0, 1L);
         state.changePartitionAssignment(asList(tp1));
-        assertAllPositions(tp0, null);
+        assertTrue(state.isAssigned(tp1));
+        assertFalse(state.isAssigned(tp0));
+        assertFalse(state.isFetchable(tp1));
         assertEquals(Collections.singleton(tp1), state.assignedPartitions());
     }
 
     @Test
+    public void partitionPause() {
+        state.subscribe(tp0);
+        state.seek(tp0, 100);
+        assertTrue(state.isFetchable(tp0));
+        state.pause(tp0);
+        assertFalse(state.isFetchable(tp0));
+        state.resume(tp0);
+        assertTrue(state.isFetchable(tp0));
+    }
+
+    @Test
     public void topicUnsubscription() {
         final String topic = "test";
         state.subscribe(topic);
@@ -83,24 +102,37 @@ public class SubscriptionStateTest {
         assertTrue(state.partitionsAutoAssigned());
         state.changePartitionAssignment(asList(tp0));
         state.committed(tp0, 1);
-        state.fetched(tp0, 1);
-        state.consumed(tp0, 1);
+        state.seek(tp0, 1);
         assertAllPositions(tp0, 1L);
         state.changePartitionAssignment(asList(tp1));
-        assertAllPositions(tp0, null);
+        assertFalse(state.isAssigned(tp0));
         assertEquals(Collections.singleton(tp1), state.assignedPartitions());
 
         state.unsubscribe(topic);
         assertEquals(0, state.subscribedTopics().size());
         assertTrue(state.assignedPartitions().isEmpty());
     }
-    
-    @Test(expected = IllegalArgumentException.class)
+
+    @Test(expected = IllegalStateException.class)
+    public void invalidConsumedPositionUpdate() {
+        state.subscribe("test");
+        state.changePartitionAssignment(asList(tp0));
+        state.consumed(tp0, 0);
+    }
+
+    @Test(expected = IllegalStateException.class)
+    public void invalidFetchPositionUpdate() {
+        state.subscribe("test");
+        state.changePartitionAssignment(asList(tp0));
+        state.fetched(tp0, 0);
+    }
+
+    @Test(expected = IllegalStateException.class)
     public void cantChangeFetchPositionForNonAssignedPartition() {
         state.fetched(tp0, 1);
     }
     
-    @Test(expected = IllegalArgumentException.class)
+    @Test(expected = IllegalStateException.class)
     public void cantChangeConsumedPositionForNonAssignedPartition() {
         state.consumed(tp0, 1);
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/be82a2af/core/src/test/scala/integration/kafka/api/ConsumerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/ConsumerTest.scala b/core/src/test/scala/integration/kafka/api/ConsumerTest.scala
index 0c2755f..4ea49f2 100644
--- a/core/src/test/scala/integration/kafka/api/ConsumerTest.scala
+++ b/core/src/test/scala/integration/kafka/api/ConsumerTest.scala
@@ -18,7 +18,7 @@ import org.apache.kafka.clients.producer.ProducerConfig
 import org.apache.kafka.clients.producer.ProducerRecord
 import org.apache.kafka.clients.consumer._
 import org.apache.kafka.common.serialization.ByteArrayDeserializer
-import org.apache.kafka.common.TopicPartition
+import org.apache.kafka.common.{PartitionInfo, TopicPartition}
 
 import kafka.utils.{TestUtils, Logging}
 import kafka.server.KafkaConfig
@@ -254,6 +254,34 @@ class ConsumerTest extends IntegrationTestHarness with Logging {
     }
   }
 
+  def testPartitionPauseAndResume() {
+    sendRecords(5)
+    this.consumers(0).subscribe(tp)
+    consumeRecords(this.consumers(0), 5, 0)
+    this.consumers(0).pause(tp)
+    sendRecords(5)
+    assertTrue(this.consumers(0).poll(0).isEmpty)
+    this.consumers(0).resume(tp)
+    consumeRecords(this.consumers(0), 5, 5)
+  }
+
+  def testPauseStateNotPreservedByRebalance() {
+    this.consumerConfig.setProperty(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "100"); // timeout quickly to avoid slow test
+    val consumer0 = new KafkaConsumer(this.consumerConfig, null, new ByteArrayDeserializer(), new ByteArrayDeserializer())
+
+    sendRecords(5)
+    consumer0.subscribe(topic)
+    consumeRecords(consumer0, 5, 0)
+    consumer0.pause(tp)
+
+    // subscribe to a new topic to trigger a rebalance
+    consumer0.subscribe("topic2")
+
+    // after rebalance, our position should be reset and our pause state lost,
+    // so we should be able to consume from the beginning
+    consumeRecords(consumer0, 0, 5)
+  }
+
   private class TestConsumerReassignmentCallback extends ConsumerRebalanceCallback {
     var callsToAssigned = 0
     var callsToRevoked = 0
@@ -264,7 +292,7 @@ class ConsumerTest extends IntegrationTestHarness with Logging {
     def onPartitionsRevoked(consumer: Consumer[_,_], partitions: java.util.Collection[TopicPartition]) {
       info("onPartitionsRevoked called.")
       callsToRevoked += 1
-    } 
+    }
   }
 
   private def sendRecords(numRecords: Int): Unit = {