You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ij...@apache.org on 2016/08/22 21:24:37 UTC
kafka git commit: KAFKA-4066;
Fix NPE in consumer due to multi-threaded updates
Repository: kafka
Updated Branches:
refs/heads/trunk 6ed3e6b1c -> 7b16b4731
KAFKA-4066; Fix NPE in consumer due to multi-threaded updates
Author: Rajini Sivaram <ra...@googlemail.com>
Reviewers: Jason Gustafson <ja...@confluent.io>, Ismael Juma <is...@juma.me.uk>
Closes #1763 from rajinisivaram/KAFKA-4066
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/7b16b473
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/7b16b473
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/7b16b473
Branch: refs/heads/trunk
Commit: 7b16b4731666ff321fbe46828d526872ff5f56d7
Parents: 6ed3e6b
Author: Rajini Sivaram <ra...@googlemail.com>
Authored: Mon Aug 22 21:34:26 2016 +0100
Committer: Ismael Juma <is...@juma.me.uk>
Committed: Mon Aug 22 21:34:26 2016 +0100
----------------------------------------------------------------------
.../consumer/internals/AbstractCoordinator.java | 32 +++++++++-----------
1 file changed, 15 insertions(+), 17 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/7b16b473/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
index bf6b920..f2e15ca 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
@@ -206,24 +206,16 @@ public abstract class AbstractCoordinator implements Closeable {
}
}
- protected RequestFuture<Void> lookupCoordinator() {
- if (findCoordinatorFuture == null) {
+ protected synchronized RequestFuture<Void> lookupCoordinator() {
+ if (findCoordinatorFuture == null)
findCoordinatorFuture = sendGroupCoordinatorRequest();
- findCoordinatorFuture.addListener(new RequestFutureListener<Void>() {
- @Override
- public void onSuccess(Void value) {
- findCoordinatorFuture = null;
- }
-
- @Override
- public void onFailure(RuntimeException e) {
- findCoordinatorFuture = null;
- }
- });
- }
return findCoordinatorFuture;
}
+ private synchronized void clearFindCoordinatorFuture() {
+ findCoordinatorFuture = null;
+ }
+
/**
* Check whether the group should be rejoined (e.g. if metadata changes)
* @return true if it should, false otherwise
@@ -532,6 +524,7 @@ public abstract class AbstractCoordinator implements Closeable {
// for the coordinator in the underlying network client layer
// TODO: this needs to be better handled in KAFKA-1935
Errors error = Errors.forCode(groupCoordinatorResponse.errorCode());
+ clearFindCoordinatorFuture();
if (error == Errors.NONE) {
synchronized (AbstractCoordinator.this) {
AbstractCoordinator.this.coordinator = new Node(
@@ -550,6 +543,12 @@ public abstract class AbstractCoordinator implements Closeable {
future.raise(error);
}
}
+
+ @Override
+ public void onFailure(RuntimeException e, RequestFuture<Void> future) {
+ clearFindCoordinatorFuture();
+ super.onFailure(e, future);
+ }
}
/**
@@ -820,7 +819,6 @@ public abstract class AbstractCoordinator implements Closeable {
@Override
public void run() {
try {
- RequestFuture findCoordinatorFuture = null;
while (true) {
synchronized (AbstractCoordinator.this) {
@@ -843,8 +841,8 @@ public abstract class AbstractCoordinator implements Closeable {
long now = time.milliseconds();
if (coordinatorUnknown()) {
- if (findCoordinatorFuture == null || findCoordinatorFuture.isDone())
- findCoordinatorFuture = lookupCoordinator();
+ if (findCoordinatorFuture == null)
+ lookupCoordinator();
else
AbstractCoordinator.this.wait(retryBackoffMs);
} else if (heartbeat.sessionTimeoutExpired(now)) {