You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2015/10/27 02:25:27 UTC

kafka git commit: MINOR: follow-up to KAFKA-2464 for renaming/cleanup

Repository: kafka
Updated Branches:
  refs/heads/trunk 71399ffe4 -> b251bebbc


MINOR: follow-up to KAFKA-2464 for renaming/cleanup

Author: Jason Gustafson <ja...@confluent.io>

Reviewers: Ewen Cheslack-Postava, Jiangjie Qin, Guozhang Wang

Closes #354 from hachikuji/KAFKA-2464


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/b251bebb
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/b251bebb
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/b251bebb

Branch: refs/heads/trunk
Commit: b251bebbcc3074f0491fce988f1d46ccdc97f1fa
Parents: 71399ff
Author: Jason Gustafson <ja...@confluent.io>
Authored: Mon Oct 26 18:30:41 2015 -0700
Committer: Guozhang Wang <wa...@gmail.com>
Committed: Mon Oct 26 18:30:41 2015 -0700

----------------------------------------------------------------------
 .../consumer/internals/AbstractCoordinator.java | 110 ++++++++++---------
 .../consumer/internals/ConsumerCoordinator.java |  16 +--
 .../clients/consumer/internals/Fetcher.java     |   3 +-
 .../runtime/distributed/WorkerCoordinator.java  |   6 +-
 .../distributed/WorkerCoordinatorTest.java      |   8 +-
 5 files changed, 73 insertions(+), 70 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/b251bebb/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
index a2b9ec5..4f8c802 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
@@ -71,8 +71,8 @@ import java.util.concurrent.TimeUnit;
  *
  * To leverage this protocol, an implementation must define the format of metadata provided by each
  * member for group registration in {@link #metadata()} and the format of the state assignment provided
- * by the leader in {@link #doSync(String, String, Map)} and becomes available to members in
- * {@link #onJoin(int, String, String, ByteBuffer)}.
+ * by the leader in {@link #performAssignment(String, String, Map)} and becomes available to members in
+ * {@link #onJoinComplete(int, String, String, ByteBuffer)}.
  *
  */
 public abstract class AbstractCoordinator {
@@ -89,6 +89,7 @@ public abstract class AbstractCoordinator {
     protected final long retryBackoffMs;
     protected final long requestTimeoutMs;
 
+    private boolean needsJoinPrepare = true;
     private boolean rejoinNeeded = true;
     protected Node coordinator;
     protected String memberId;
@@ -124,7 +125,7 @@ public abstract class AbstractCoordinator {
 
     /**
      * Unique identifier for the class of protocols implements (e.g. "consumer" or "copycat").
-     * @return Non-null protocol type namej
+     * @return Non-null protocol type name
      */
     protected abstract String protocolType();
 
@@ -140,35 +141,35 @@ public abstract class AbstractCoordinator {
     protected abstract LinkedHashMap<String, ByteBuffer> metadata();
 
     /**
-     * Invoked when a group member has successfully joined a group.
-     * @param generation The generation that was joined
-     * @param memberId The identifier for the local member in the group
-     * @param protocol The protocol selected by the coordinator
-     * @param memberAssignment The assignment propagated from the group leader
+     * Invoked prior to each group join or rejoin. This is typically used to perform any
+     * cleanup from the previous generation (such as committing offsets for the consumer)
+     * @param generation The previous generation or -1 if there was none
+     * @param memberId The identifier of this member in the previous group or "" if there was none
      */
-    protected abstract void onJoin(int generation,
-                                   String memberId,
-                                   String protocol,
-                                   ByteBuffer memberAssignment);
+    protected abstract void onJoinPrepare(int generation, String memberId);
 
     /**
-     * Perform synchronization for the group. This is used by the leader to push state to all the members
+     * Perform assignment for the group. This is used by the leader to push state to all the members
      * of the group (e.g. to push partition assignments in the case of the new consumer)
      * @param leaderId The id of the leader (which is this member)
      * @param allMemberMetadata Metadata from all members of the group
      * @return A map from each member to their state assignment
      */
-    protected abstract Map<String, ByteBuffer> doSync(String leaderId,
-                                                      String protocol,
-                                                      Map<String, ByteBuffer> allMemberMetadata);
+    protected abstract Map<String, ByteBuffer> performAssignment(String leaderId,
+                                                                 String protocol,
+                                                                 Map<String, ByteBuffer> allMemberMetadata);
 
     /**
-     * Invoked when the group is left (whether because of shutdown, metadata change, stale generation, etc.)
-     * @param generation The generation that was left
-     * @param memberId The identifier of the local member in the group
+     * Invoked when a group member has successfully joined a group.
+     * @param generation The generation that was joined
+     * @param memberId The identifier for the local member in the group
+     * @param protocol The protocol selected by the coordinator
+     * @param memberAssignment The assignment propagated from the group leader
      */
-    protected abstract void onLeave(int generation, String memberId);
-
+    protected abstract void onJoinComplete(int generation,
+                                           String memberId,
+                                           String protocol,
+                                           ByteBuffer memberAssignment);
 
     /**
      * Block until the coordinator for this group is known.
@@ -199,7 +200,7 @@ public abstract class AbstractCoordinator {
         this.memberId = JoinGroupRequest.UNKNOWN_MEMBER_ID;
         rejoinNeeded = true;
     }
-    private boolean needsOnLeave = true;
+
     /**
      * Ensure that the group is active (i.e. joined and synced)
      */
@@ -207,10 +208,9 @@ public abstract class AbstractCoordinator {
         if (!needRejoin())
             return;
 
-        // onLeave only invoked if we have a valid current generation
-        if (needsOnLeave) {
-            onLeave(generation, memberId);
-            needsOnLeave = false;
+        if (needsJoinPrepare) {
+            onJoinPrepare(generation, memberId);
+            needsJoinPrepare = false;
         }
 
         while (needRejoin()) {
@@ -223,12 +223,12 @@ public abstract class AbstractCoordinator {
                 continue;
             }
 
-            RequestFuture<ByteBuffer> future = sendJoinGroupRequest();
+            RequestFuture<ByteBuffer> future = performGroupJoin();
             client.poll(future);
 
             if (future.succeeded()) {
-                onJoin(generation, memberId, protocol, future.value());
-                needsOnLeave = true;
+                onJoinComplete(generation, memberId, protocol, future.value());
+                needsJoinPrepare = true;
                 heartbeatTask.reset();
             } else {
                 if (future.exception() instanceof UnknownMemberIdException)
@@ -290,12 +290,12 @@ public abstract class AbstractCoordinator {
     }
 
     /**
-     * Send a request to get a new partition assignment. This is a non-blocking call which sends
-     * a JoinGroup request to the coordinator (if it is available). The returned future must
-     * be polled to see if the request completed successfully.
-     * @return A request future whose completion indicates the result of the JoinGroup request.
+     * Join the group and return the assignment for the next generation. This function handles both
+     * JoinGroup and SyncGroup, delegating to {@link #performAssignment(String, String, Map)} if
+     * elected leader by the coordinator.
+     * @return A request future which wraps the assignment returned from the group leader
      */
-    private RequestFuture<ByteBuffer> sendJoinGroupRequest() {
+    private RequestFuture<ByteBuffer> performGroupJoin() {
         if (coordinatorUnknown())
             return RequestFuture.coordinatorNotAvailable();
 
@@ -338,7 +338,11 @@ public abstract class AbstractCoordinator {
                 AbstractCoordinator.this.rejoinNeeded = false;
                 AbstractCoordinator.this.protocol = joinResponse.groupProtocol();
                 sensors.joinLatency.record(response.requestLatencyMs());
-                performSync(joinResponse).chain(future);
+                if (joinResponse.isLeader()) {
+                    onJoinLeader(joinResponse).chain(future);
+                } else {
+                    onJoinFollower().chain(future);
+                }
             } else if (errorCode == Errors.UNKNOWN_MEMBER_ID.code()) {
                 // reset the member id and retry immediately
                 AbstractCoordinator.this.memberId = JoinGroupRequest.UNKNOWN_MEMBER_ID;
@@ -367,25 +371,25 @@ public abstract class AbstractCoordinator {
         }
     }
 
-    private RequestFuture<ByteBuffer> performSync(JoinGroupResponse joinResponse) {
-        if (joinResponse.isLeader()) {
-            try {
-                // perform the leader synchronization and send back the assignment for the group
-                Map<String, ByteBuffer> groupAssignment = doSync(joinResponse.leaderId(), joinResponse.groupProtocol(),
-                        joinResponse.members());
-
-                SyncGroupRequest request = new SyncGroupRequest(groupId, generation, memberId, groupAssignment);
-                log.debug("Issuing leader SyncGroup ({}: {}) to coordinator {}", ApiKeys.SYNC_GROUP, request, this.coordinator.id());
-                return sendSyncGroupRequest(request);
-            } catch (RuntimeException e) {
-                return RequestFuture.failure(e);
-            }
-        } else {
-            // send follower's sync group with an empty assignment
-            SyncGroupRequest request = new SyncGroupRequest(groupId, generation,
-                    memberId, Collections.<String, ByteBuffer>emptyMap());
-            log.debug("Issuing follower SyncGroup ({}: {}) to coordinator {}", ApiKeys.SYNC_GROUP, request, this.coordinator.id());
+    private RequestFuture<ByteBuffer> onJoinFollower() {
+        // send follower's sync group with an empty assignment
+        SyncGroupRequest request = new SyncGroupRequest(groupId, generation,
+                memberId, Collections.<String, ByteBuffer>emptyMap());
+        log.debug("Issuing follower SyncGroup ({}: {}) to coordinator {}", ApiKeys.SYNC_GROUP, request, this.coordinator.id());
+        return sendSyncGroupRequest(request);
+    }
+
+    private RequestFuture<ByteBuffer> onJoinLeader(JoinGroupResponse joinResponse) {
+        try {
+            // perform the leader synchronization and send back the assignment for the group
+            Map<String, ByteBuffer> groupAssignment = performAssignment(joinResponse.leaderId(), joinResponse.groupProtocol(),
+                    joinResponse.members());
+
+            SyncGroupRequest request = new SyncGroupRequest(groupId, generation, memberId, groupAssignment);
+            log.debug("Issuing leader SyncGroup ({}: {}) to coordinator {}", ApiKeys.SYNC_GROUP, request, this.coordinator.id());
             return sendSyncGroupRequest(request);
+        } catch (RuntimeException e) {
+            return RequestFuture.failure(e);
         }
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/b251bebb/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
index d6291bf..20d1564 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
@@ -155,10 +155,10 @@ public final class ConsumerCoordinator extends AbstractCoordinator implements Cl
     }
 
     @Override
-    protected void onJoin(int generation,
-                          String memberId,
-                          String assignmentStrategy,
-                          ByteBuffer assignmentBuffer) {
+    protected void onJoinComplete(int generation,
+                                  String memberId,
+                                  String assignmentStrategy,
+                                  ByteBuffer assignmentBuffer) {
         PartitionAssignor assignor = protocolMap.get(assignmentStrategy);
         if (assignor == null)
             throw new IllegalStateException("Coordinator selected invalid assignment protocol: " + assignmentStrategy);
@@ -187,9 +187,9 @@ public final class ConsumerCoordinator extends AbstractCoordinator implements Cl
     }
 
     @Override
-    protected Map<String, ByteBuffer> doSync(String leaderId,
-                                             String assignmentStrategy,
-                                             Map<String, ByteBuffer> allSubscriptions) {
+    protected Map<String, ByteBuffer> performAssignment(String leaderId,
+                                                        String assignmentStrategy,
+                                                        Map<String, ByteBuffer> allSubscriptions) {
         PartitionAssignor assignor = protocolMap.get(protocol);
         if (assignor == null)
             throw new IllegalStateException("Coordinator selected invalid assignment protocol: " + assignmentStrategy);
@@ -224,7 +224,7 @@ public final class ConsumerCoordinator extends AbstractCoordinator implements Cl
     }
 
     @Override
-    protected void onLeave(int generation, String memberId) {
+    protected void onJoinPrepare(int generation, String memberId) {
         // commit offsets prior to rebalance if auto-commit enabled
         maybeAutoCommitOffsetsSync();
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/b251bebb/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
index f119552..e18a58b 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
@@ -460,9 +460,8 @@ public class Fetcher<K, V> {
                 long fetched = this.subscriptions.fetched(partition);
                 long consumed = this.subscriptions.consumed(partition);
                 // Only fetch data for partitions whose previously fetched data has been consumed
-                if (consumed == fetched) {
+                if (consumed == fetched)
                     fetch.put(partition, new FetchRequest.PartitionData(fetched, this.fetchSize));
-                }
             }
         }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/b251bebb/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/distributed/WorkerCoordinator.java
----------------------------------------------------------------------
diff --git a/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/distributed/WorkerCoordinator.java b/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/distributed/WorkerCoordinator.java
index c70ed4f..2fef37c 100644
--- a/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/distributed/WorkerCoordinator.java
+++ b/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/distributed/WorkerCoordinator.java
@@ -107,7 +107,7 @@ public final class WorkerCoordinator extends AbstractCoordinator implements Clos
     }
 
     @Override
-    protected void onJoin(int generation, String memberId, String protocol, ByteBuffer memberAssignment) {
+    protected void onJoinComplete(int generation, String memberId, String protocol, ByteBuffer memberAssignment) {
         assignmentSnapshot = CopycatProtocol.deserializeAssignment(memberAssignment);
         // At this point we always consider ourselves to be a member of the cluster, even if there was an assignment
         // error (the leader couldn't make the assignment) or we are behind the config and cannot yet work on our assigned
@@ -118,7 +118,7 @@ public final class WorkerCoordinator extends AbstractCoordinator implements Clos
     }
 
     @Override
-    protected Map<String, ByteBuffer> doSync(String leaderId, String protocol, Map<String, ByteBuffer> allMemberMetadata) {
+    protected Map<String, ByteBuffer> performAssignment(String leaderId, String protocol, Map<String, ByteBuffer> allMemberMetadata) {
         log.debug("Performing task assignment");
 
         Map<String, CopycatProtocol.ConfigState> allConfigs = new HashMap<>();
@@ -227,7 +227,7 @@ public final class WorkerCoordinator extends AbstractCoordinator implements Clos
     }
 
     @Override
-    protected void onLeave(int generation, String memberId) {
+    protected void onJoinPrepare(int generation, String memberId) {
         log.debug("Revoking previous assignment {}", assignmentSnapshot);
         if (assignmentSnapshot != null && !assignmentSnapshot.failed())
             listener.onRevoked(assignmentSnapshot.leader(), assignmentSnapshot.connectors(), assignmentSnapshot.tasks());

http://git-wip-us.apache.org/repos/asf/kafka/blob/b251bebb/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/distributed/WorkerCoordinatorTest.java
----------------------------------------------------------------------
diff --git a/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/distributed/WorkerCoordinatorTest.java b/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/distributed/WorkerCoordinatorTest.java
index 30c76a2..2278045 100644
--- a/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/distributed/WorkerCoordinatorTest.java
+++ b/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/distributed/WorkerCoordinatorTest.java
@@ -309,7 +309,7 @@ public class WorkerCoordinatorTest {
     }
 
     @Test
-    public void testLeaderDoSync1() throws Exception {
+    public void testLeaderPerformAssignment1() throws Exception {
         // Since all the protocol responses are mocked, the other tests validate doSync runs, but don't validate its
         // output. So we test it directly here.
 
@@ -324,7 +324,7 @@ public class WorkerCoordinatorTest {
         // Mark everyone as in sync with configState1
         configs.put("leader", CopycatProtocol.serializeMetadata(new CopycatProtocol.ConfigState(1L)));
         configs.put("member", CopycatProtocol.serializeMetadata(new CopycatProtocol.ConfigState(1L)));
-        Map<String, ByteBuffer> result = Whitebox.invokeMethod(coordinator, "doSync", "leader", WorkerCoordinator.DEFAULT_SUBPROTOCOL, configs);
+        Map<String, ByteBuffer> result = Whitebox.invokeMethod(coordinator, "performAssignment", "leader", WorkerCoordinator.DEFAULT_SUBPROTOCOL, configs);
 
         // configState1 has 1 connector, 1 task
         CopycatProtocol.Assignment leaderAssignment = CopycatProtocol.deserializeAssignment(result.get("leader"));
@@ -345,7 +345,7 @@ public class WorkerCoordinatorTest {
     }
 
     @Test
-    public void testLeaderDoSync2() throws Exception {
+    public void testLeaderPerformAssignment2() throws Exception {
         // Since all the protocol responses are mocked, the other tests validate doSync runs, but don't validate its
         // output. So we test it directly here.
 
@@ -360,7 +360,7 @@ public class WorkerCoordinatorTest {
         // Mark everyone as in sync with configState1
         configs.put("leader", CopycatProtocol.serializeMetadata(new CopycatProtocol.ConfigState(1L)));
         configs.put("member", CopycatProtocol.serializeMetadata(new CopycatProtocol.ConfigState(1L)));
-        Map<String, ByteBuffer> result = Whitebox.invokeMethod(coordinator, "doSync", "leader", WorkerCoordinator.DEFAULT_SUBPROTOCOL, configs);
+        Map<String, ByteBuffer> result = Whitebox.invokeMethod(coordinator, "performAssignment", "leader", WorkerCoordinator.DEFAULT_SUBPROTOCOL, configs);
 
         // configState2 has 2 connector, 3 tasks and should trigger round robin assignment
         CopycatProtocol.Assignment leaderAssignment = CopycatProtocol.deserializeAssignment(result.get("leader"));