You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2016/07/12 18:31:43 UTC

kafka git commit: KAFKA-3931: Fix transient failures in pattern subscription tests

Repository: kafka
Updated Branches:
  refs/heads/trunk fc47b9fa6 -> 98dfc4b30


KAFKA-3931: Fix transient failures in pattern subscription tests

Full credit for figuring out the cause of these failures goes to hachikuji.

Author: Vahid Hashemian <va...@us.ibm.com>

Reviewers: Guozhang Wang, Ismael Juma, Jason Gustafson

Closes #1594 from vahidhashemian/KAFKA-3931


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/98dfc4b3
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/98dfc4b3
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/98dfc4b3

Branch: refs/heads/trunk
Commit: 98dfc4b307c2e41a7fb0fff330048aa9ff78addd
Parents: fc47b9f
Author: Vahid Hashemian <va...@us.ibm.com>
Authored: Tue Jul 12 11:31:39 2016 -0700
Committer: Guozhang Wang <wa...@gmail.com>
Committed: Tue Jul 12 11:31:39 2016 -0700

----------------------------------------------------------------------
 .../clients/consumer/internals/ConsumerCoordinator.java | 12 +++++++++++-
 1 file changed, 11 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/98dfc4b3/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
index a642512..2880efc 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
@@ -341,8 +341,18 @@ public final class ConsumerCoordinator extends AbstractCoordinator {
      * Ensure that we have a valid partition assignment from the coordinator.
      */
     public void ensurePartitionAssignment() {
-        if (subscriptions.partitionsAutoAssigned())
+        if (subscriptions.partitionsAutoAssigned()) {
+            // Due to a race condition between the initial metadata fetch and the initial rebalance, we need to ensure that
+            // the metadata is fresh before joining initially, and then request the metadata update. If metadata update arrives
+            // while the rebalance is still pending (for example, when the join group is still inflight), then we will lose
+            // track of the fact that we need to rebalance again to reflect the change to the topic subscription. Without
+            // ensuring that the metadata is fresh, any metadata update that changes the topic subscriptions and arrives with a
+            // rebalance in progress will essentially be ignored. See KAFKA-3949 for the complete description of the problem.
+            if (subscriptions.hasPatternSubscription())
+                client.ensureFreshMetadata();
+
             ensureActiveGroup();
+        }
     }
 
     @Override