You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ij...@apache.org on 2016/08/22 21:24:37 UTC

kafka git commit: KAFKA-4066; Fix NPE in consumer due to multi-threaded updates

Repository: kafka
Updated Branches:
  refs/heads/trunk 6ed3e6b1c -> 7b16b4731


KAFKA-4066; Fix NPE in consumer due to multi-threaded updates

Author: Rajini Sivaram <ra...@googlemail.com>

Reviewers: Jason Gustafson <ja...@confluent.io>, Ismael Juma <is...@juma.me.uk>

Closes #1763 from rajinisivaram/KAFKA-4066


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/7b16b473
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/7b16b473
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/7b16b473

Branch: refs/heads/trunk
Commit: 7b16b4731666ff321fbe46828d526872ff5f56d7
Parents: 6ed3e6b
Author: Rajini Sivaram <ra...@googlemail.com>
Authored: Mon Aug 22 21:34:26 2016 +0100
Committer: Ismael Juma <is...@juma.me.uk>
Committed: Mon Aug 22 21:34:26 2016 +0100

----------------------------------------------------------------------
 .../consumer/internals/AbstractCoordinator.java | 32 +++++++++-----------
 1 file changed, 15 insertions(+), 17 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/7b16b473/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
index bf6b920..f2e15ca 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
@@ -206,24 +206,16 @@ public abstract class AbstractCoordinator implements Closeable {
         }
     }
 
-    protected RequestFuture<Void> lookupCoordinator() {
-        if (findCoordinatorFuture == null) {
+    protected synchronized RequestFuture<Void> lookupCoordinator() {
+        if (findCoordinatorFuture == null)
             findCoordinatorFuture = sendGroupCoordinatorRequest();
-            findCoordinatorFuture.addListener(new RequestFutureListener<Void>() {
-                @Override
-                public void onSuccess(Void value) {
-                    findCoordinatorFuture = null;
-                }
-
-                @Override
-                public void onFailure(RuntimeException e) {
-                    findCoordinatorFuture = null;
-                }
-            });
-        }
         return findCoordinatorFuture;
     }
 
+    private synchronized void clearFindCoordinatorFuture() {
+        findCoordinatorFuture = null;
+    }
+
     /**
      * Check whether the group should be rejoined (e.g. if metadata changes)
      * @return true if it should, false otherwise
@@ -532,6 +524,7 @@ public abstract class AbstractCoordinator implements Closeable {
             // for the coordinator in the underlying network client layer
             // TODO: this needs to be better handled in KAFKA-1935
             Errors error = Errors.forCode(groupCoordinatorResponse.errorCode());
+            clearFindCoordinatorFuture();
             if (error == Errors.NONE) {
                 synchronized (AbstractCoordinator.this) {
                     AbstractCoordinator.this.coordinator = new Node(
@@ -550,6 +543,12 @@ public abstract class AbstractCoordinator implements Closeable {
                 future.raise(error);
             }
         }
+
+        @Override
+        public void onFailure(RuntimeException e, RequestFuture<Void> future) {
+            clearFindCoordinatorFuture();
+            super.onFailure(e, future);
+        }
     }
 
     /**
@@ -820,7 +819,6 @@ public abstract class AbstractCoordinator implements Closeable {
         @Override
         public void run() {
             try {
-                RequestFuture findCoordinatorFuture = null;
 
                 while (true) {
                     synchronized (AbstractCoordinator.this) {
@@ -843,8 +841,8 @@ public abstract class AbstractCoordinator implements Closeable {
                         long now = time.milliseconds();
 
                         if (coordinatorUnknown()) {
-                            if (findCoordinatorFuture == null || findCoordinatorFuture.isDone())
-                                findCoordinatorFuture = lookupCoordinator();
+                            if (findCoordinatorFuture == null)
+                                lookupCoordinator();
                             else
                                 AbstractCoordinator.this.wait(retryBackoffMs);
                         } else if (heartbeat.sessionTimeoutExpired(now)) {