You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gw...@apache.org on 2015/11/06 05:32:22 UTC

kafka git commit: HOTFIX: unsubscribe does not clear user assignment properly

Repository: kafka
Updated Branches:
  refs/heads/trunk c3c0c04e6 -> b1eaa46a5


HOTFIX: unsubscribe does not clear user assignment properly

Author: Jason Gustafson <ja...@confluent.io>

Reviewers: Gwen Shapira

Closes #439 from hachikuji/unsubscribe-hotfix


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/b1eaa46a
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/b1eaa46a
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/b1eaa46a

Branch: refs/heads/trunk
Commit: b1eaa46a512295abb4b4eb8b98402fc60a2c911a
Parents: c3c0c04
Author: Jason Gustafson <ja...@confluent.io>
Authored: Thu Nov 5 20:32:10 2015 -0800
Committer: Gwen Shapira <cs...@gmail.com>
Committed: Thu Nov 5 20:32:10 2015 -0800

----------------------------------------------------------------------
 .../consumer/internals/SubscriptionState.java       |  1 +
 .../consumer/internals/SubscriptionStateTest.java   | 16 ++++++++++++++++
 2 files changed, 17 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/b1eaa46a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
index a9ff35f..a142196 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
@@ -185,6 +185,7 @@ public class SubscriptionState {
 
     public void unsubscribe() {
         this.subscription.clear();
+        this.userAssignment.clear();
         this.assignment.clear();
         this.needsPartitionAssignment = true;
         this.subscribedPattern = null;

http://git-wip-us.apache.org/repos/asf/kafka/blob/b1eaa46a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/SubscriptionStateTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/SubscriptionStateTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/SubscriptionStateTest.java
index c5fce61..6566025 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/SubscriptionStateTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/SubscriptionStateTest.java
@@ -177,6 +177,22 @@ public class SubscriptionStateTest {
     }
 
     @Test
+    public void unsubscribeUserAssignment() {
+        state.assignFromUser(Arrays.asList(tp0, tp1));
+        state.unsubscribe();
+        state.subscribe(Arrays.asList(topic), rebalanceListener);
+        assertEquals(Collections.singleton(topic), state.subscription());
+    }
+
+    @Test
+    public void unsubscribeUserSubscribe() {
+        state.subscribe(Arrays.asList(topic), rebalanceListener);
+        state.unsubscribe();
+        state.assignFromUser(Arrays.asList(tp0));
+        assertEquals(Collections.singleton(tp0), state.assignedPartitions());
+    }
+
+    @Test
     public void unsubscription() {
         state.subscribe(Pattern.compile(".*"), rebalanceListener);
         state.changeSubscription(Arrays.asList(topic, topic1));