You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gw...@apache.org on 2015/11/06 05:32:22 UTC
kafka git commit: HOTFIX: unsubscribe does not clear user assignment
properly
Repository: kafka
Updated Branches:
refs/heads/trunk c3c0c04e6 -> b1eaa46a5
HOTFIX: unsubscribe does not clear user assignment properly
Author: Jason Gustafson <ja...@confluent.io>
Reviewers: Gwen Shapira
Closes #439 from hachikuji/unsubscribe-hotfix
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/b1eaa46a
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/b1eaa46a
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/b1eaa46a
Branch: refs/heads/trunk
Commit: b1eaa46a512295abb4b4eb8b98402fc60a2c911a
Parents: c3c0c04
Author: Jason Gustafson <ja...@confluent.io>
Authored: Thu Nov 5 20:32:10 2015 -0800
Committer: Gwen Shapira <cs...@gmail.com>
Committed: Thu Nov 5 20:32:10 2015 -0800
----------------------------------------------------------------------
.../consumer/internals/SubscriptionState.java | 1 +
.../consumer/internals/SubscriptionStateTest.java | 16 ++++++++++++++++
2 files changed, 17 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/b1eaa46a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
index a9ff35f..a142196 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
@@ -185,6 +185,7 @@ public class SubscriptionState {
public void unsubscribe() {
this.subscription.clear();
+ this.userAssignment.clear();
this.assignment.clear();
this.needsPartitionAssignment = true;
this.subscribedPattern = null;
http://git-wip-us.apache.org/repos/asf/kafka/blob/b1eaa46a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/SubscriptionStateTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/SubscriptionStateTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/SubscriptionStateTest.java
index c5fce61..6566025 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/SubscriptionStateTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/SubscriptionStateTest.java
@@ -177,6 +177,22 @@ public class SubscriptionStateTest {
}
@Test
+ public void unsubscribeUserAssignment() {
+ state.assignFromUser(Arrays.asList(tp0, tp1));
+ state.unsubscribe();
+ state.subscribe(Arrays.asList(topic), rebalanceListener);
+ assertEquals(Collections.singleton(topic), state.subscription());
+ }
+
+ @Test
+ public void unsubscribeUserSubscribe() {
+ state.subscribe(Arrays.asList(topic), rebalanceListener);
+ state.unsubscribe();
+ state.assignFromUser(Arrays.asList(tp0));
+ assertEquals(Collections.singleton(tp0), state.assignedPartitions());
+ }
+
+ @Test
public void unsubscription() {
state.subscribe(Pattern.compile(".*"), rebalanceListener);
state.changeSubscription(Arrays.asList(topic, topic1));