You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2017/06/01 20:31:33 UTC

[2/2] kafka git commit: set the rejoinNeeded in listener's onSuccess

set the rejoinNeeded in listener's onSuccess


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/e4d829c4
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/e4d829c4
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/e4d829c4

Branch: refs/heads/trunk
Commit: e4d829c47b9e143aa914cc492500d568d44836ef
Parents: 1b16aca
Author: Guozhang Wang <wa...@gmail.com>
Authored: Thu Jun 1 13:31:25 2017 -0700
Committer: Guozhang Wang <wa...@gmail.com>
Committed: Thu Jun 1 13:31:25 2017 -0700

----------------------------------------------------------------------
 .../kafka/clients/consumer/internals/AbstractCoordinator.java      | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/e4d829c4/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
index e380bae..aa3807e 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
@@ -398,6 +398,7 @@ public abstract class AbstractCoordinator implements Closeable {
                     synchronized (AbstractCoordinator.this) {
                         log.info("Successfully joined group {} with generation {}", groupId, generation.generationId);
                         state = MemberState.STABLE;
+                        rejoinNeeded = false;
 
                         if (heartbeatThread != null)
                             heartbeatThread.enable();
@@ -536,7 +537,6 @@ public abstract class AbstractCoordinator implements Closeable {
             if (error == Errors.NONE) {
                 sensors.syncLatency.record(response.requestLatencyMs());
                 future.complete(syncResponse.memberAssignment());
-                AbstractCoordinator.this.rejoinNeeded = false;
             } else {
                 requestRejoin();