You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gw...@apache.org on 2015/10/28 20:42:16 UTC

kafka git commit: MINOR: KAFKA-2371 follow-up, DistributedHerder should wakeup WorkerGroupMember after assignment to ensure work is started immediately

Repository: kafka
Updated Branches:
  refs/heads/trunk 9855bb9c6 -> 8838fa801


MINOR: KAFKA-2371 follow-up, DistributedHerder should wakeup WorkerGroupMember after assignment to ensure work is started immediately

Author: Ewen Cheslack-Postava <me...@ewencp.org>

Reviewers: Gwen Shapira

Closes #360 from ewencp/minor-kafka-2371-follow-up-wakeup-after-rebalance


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/8838fa80
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/8838fa80
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/8838fa80

Branch: refs/heads/trunk
Commit: 8838fa8010c146af6aab014a41bc7e68318b4eb0
Parents: 9855bb9
Author: Ewen Cheslack-Postava <me...@ewencp.org>
Authored: Wed Oct 28 12:42:03 2015 -0700
Committer: Gwen Shapira <cs...@gmail.com>
Committed: Wed Oct 28 12:42:03 2015 -0700

----------------------------------------------------------------------
 .../kafka/copycat/runtime/distributed/DistributedHerder.java     | 4 ++++
 .../kafka/copycat/runtime/distributed/DistributedHerderTest.java | 2 ++
 2 files changed, 6 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/8838fa80/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/distributed/DistributedHerder.java
----------------------------------------------------------------------
diff --git a/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/distributed/DistributedHerder.java b/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/distributed/DistributedHerder.java
index 17bf7b7..46c7686 100644
--- a/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/distributed/DistributedHerder.java
+++ b/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/distributed/DistributedHerder.java
@@ -614,6 +614,10 @@ public class DistributedHerder implements Herder, Runnable {
                 log.info("Joined group and got assignment: {}", assignment);
                 DistributedHerder.this.assignment = assignment;
                 rebalanceResolved = false;
+                // We *must* interrupt any poll() call since this could occur when the poll starts, and we might then
+                // sleep in the poll() for a long time. Forcing a wakeup ensures we'll get to process this event in the
+                // main thread.
+                member.wakeup();
             }
 
             @Override

http://git-wip-us.apache.org/repos/asf/kafka/blob/8838fa80/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/distributed/DistributedHerderTest.java
----------------------------------------------------------------------
diff --git a/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/distributed/DistributedHerderTest.java b/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/distributed/DistributedHerderTest.java
index 1213656..c8b4874 100644
--- a/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/distributed/DistributedHerderTest.java
+++ b/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/distributed/DistributedHerderTest.java
@@ -371,6 +371,8 @@ public class DistributedHerderTest {
                 return null;
             }
         });
+        member.wakeup();
+        PowerMock.expectLastCall();
     }
 
     private void expectPostRebalanceCatchup(final ClusterConfigState readToEndSnapshot) {