You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2015/10/27 02:25:27 UTC
kafka git commit: MINOR: follow-up to KAFKA-2464 for renaming/cleanup
Repository: kafka
Updated Branches:
refs/heads/trunk 71399ffe4 -> b251bebbc
MINOR: follow-up to KAFKA-2464 for renaming/cleanup
Author: Jason Gustafson <ja...@confluent.io>
Reviewers: Ewen Cheslack-Postava, Jiangjie Qin, Guozhang Wang
Closes #354 from hachikuji/KAFKA-2464
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/b251bebb
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/b251bebb
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/b251bebb
Branch: refs/heads/trunk
Commit: b251bebbcc3074f0491fce988f1d46ccdc97f1fa
Parents: 71399ff
Author: Jason Gustafson <ja...@confluent.io>
Authored: Mon Oct 26 18:30:41 2015 -0700
Committer: Guozhang Wang <wa...@gmail.com>
Committed: Mon Oct 26 18:30:41 2015 -0700
----------------------------------------------------------------------
.../consumer/internals/AbstractCoordinator.java | 110 ++++++++++---------
.../consumer/internals/ConsumerCoordinator.java | 16 +--
.../clients/consumer/internals/Fetcher.java | 3 +-
.../runtime/distributed/WorkerCoordinator.java | 6 +-
.../distributed/WorkerCoordinatorTest.java | 8 +-
5 files changed, 73 insertions(+), 70 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/b251bebb/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
index a2b9ec5..4f8c802 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
@@ -71,8 +71,8 @@ import java.util.concurrent.TimeUnit;
*
* To leverage this protocol, an implementation must define the format of metadata provided by each
* member for group registration in {@link #metadata()} and the format of the state assignment provided
- * by the leader in {@link #doSync(String, String, Map)} and becomes available to members in
- * {@link #onJoin(int, String, String, ByteBuffer)}.
+ * by the leader in {@link #performAssignment(String, String, Map)} and becomes available to members in
+ * {@link #onJoinComplete(int, String, String, ByteBuffer)}.
*
*/
public abstract class AbstractCoordinator {
@@ -89,6 +89,7 @@ public abstract class AbstractCoordinator {
protected final long retryBackoffMs;
protected final long requestTimeoutMs;
+ private boolean needsJoinPrepare = true;
private boolean rejoinNeeded = true;
protected Node coordinator;
protected String memberId;
@@ -124,7 +125,7 @@ public abstract class AbstractCoordinator {
/**
* Unique identifier for the class of protocols implements (e.g. "consumer" or "copycat").
- * @return Non-null protocol type namej
+ * @return Non-null protocol type name
*/
protected abstract String protocolType();
@@ -140,35 +141,35 @@ public abstract class AbstractCoordinator {
protected abstract LinkedHashMap<String, ByteBuffer> metadata();
/**
- * Invoked when a group member has successfully joined a group.
- * @param generation The generation that was joined
- * @param memberId The identifier for the local member in the group
- * @param protocol The protocol selected by the coordinator
- * @param memberAssignment The assignment propagated from the group leader
+ * Invoked prior to each group join or rejoin. This is typically used to perform any
+ * cleanup from the previous generation (such as committing offsets for the consumer)
+ * @param generation The previous generation or -1 if there was none
+ * @param memberId The identifier of this member in the previous group or "" if there was none
*/
- protected abstract void onJoin(int generation,
- String memberId,
- String protocol,
- ByteBuffer memberAssignment);
+ protected abstract void onJoinPrepare(int generation, String memberId);
/**
- * Perform synchronization for the group. This is used by the leader to push state to all the members
+ * Perform assignment for the group. This is used by the leader to push state to all the members
* of the group (e.g. to push partition assignments in the case of the new consumer)
* @param leaderId The id of the leader (which is this member)
* @param allMemberMetadata Metadata from all members of the group
* @return A map from each member to their state assignment
*/
- protected abstract Map<String, ByteBuffer> doSync(String leaderId,
- String protocol,
- Map<String, ByteBuffer> allMemberMetadata);
+ protected abstract Map<String, ByteBuffer> performAssignment(String leaderId,
+ String protocol,
+ Map<String, ByteBuffer> allMemberMetadata);
/**
- * Invoked when the group is left (whether because of shutdown, metadata change, stale generation, etc.)
- * @param generation The generation that was left
- * @param memberId The identifier of the local member in the group
+ * Invoked when a group member has successfully joined a group.
+ * @param generation The generation that was joined
+ * @param memberId The identifier for the local member in the group
+ * @param protocol The protocol selected by the coordinator
+ * @param memberAssignment The assignment propagated from the group leader
*/
- protected abstract void onLeave(int generation, String memberId);
-
+ protected abstract void onJoinComplete(int generation,
+ String memberId,
+ String protocol,
+ ByteBuffer memberAssignment);
/**
* Block until the coordinator for this group is known.
@@ -199,7 +200,7 @@ public abstract class AbstractCoordinator {
this.memberId = JoinGroupRequest.UNKNOWN_MEMBER_ID;
rejoinNeeded = true;
}
- private boolean needsOnLeave = true;
+
/**
* Ensure that the group is active (i.e. joined and synced)
*/
@@ -207,10 +208,9 @@ public abstract class AbstractCoordinator {
if (!needRejoin())
return;
- // onLeave only invoked if we have a valid current generation
- if (needsOnLeave) {
- onLeave(generation, memberId);
- needsOnLeave = false;
+ if (needsJoinPrepare) {
+ onJoinPrepare(generation, memberId);
+ needsJoinPrepare = false;
}
while (needRejoin()) {
@@ -223,12 +223,12 @@ public abstract class AbstractCoordinator {
continue;
}
- RequestFuture<ByteBuffer> future = sendJoinGroupRequest();
+ RequestFuture<ByteBuffer> future = performGroupJoin();
client.poll(future);
if (future.succeeded()) {
- onJoin(generation, memberId, protocol, future.value());
- needsOnLeave = true;
+ onJoinComplete(generation, memberId, protocol, future.value());
+ needsJoinPrepare = true;
heartbeatTask.reset();
} else {
if (future.exception() instanceof UnknownMemberIdException)
@@ -290,12 +290,12 @@ public abstract class AbstractCoordinator {
}
/**
- * Send a request to get a new partition assignment. This is a non-blocking call which sends
- * a JoinGroup request to the coordinator (if it is available). The returned future must
- * be polled to see if the request completed successfully.
- * @return A request future whose completion indicates the result of the JoinGroup request.
+ * Join the group and return the assignment for the next generation. This function handles both
+ * JoinGroup and SyncGroup, delegating to {@link #performAssignment(String, String, Map)} if
+ * elected leader by the coordinator.
+ * @return A request future which wraps the assignment returned from the group leader
*/
- private RequestFuture<ByteBuffer> sendJoinGroupRequest() {
+ private RequestFuture<ByteBuffer> performGroupJoin() {
if (coordinatorUnknown())
return RequestFuture.coordinatorNotAvailable();
@@ -338,7 +338,11 @@ public abstract class AbstractCoordinator {
AbstractCoordinator.this.rejoinNeeded = false;
AbstractCoordinator.this.protocol = joinResponse.groupProtocol();
sensors.joinLatency.record(response.requestLatencyMs());
- performSync(joinResponse).chain(future);
+ if (joinResponse.isLeader()) {
+ onJoinLeader(joinResponse).chain(future);
+ } else {
+ onJoinFollower().chain(future);
+ }
} else if (errorCode == Errors.UNKNOWN_MEMBER_ID.code()) {
// reset the member id and retry immediately
AbstractCoordinator.this.memberId = JoinGroupRequest.UNKNOWN_MEMBER_ID;
@@ -367,25 +371,25 @@ public abstract class AbstractCoordinator {
}
}
- private RequestFuture<ByteBuffer> performSync(JoinGroupResponse joinResponse) {
- if (joinResponse.isLeader()) {
- try {
- // perform the leader synchronization and send back the assignment for the group
- Map<String, ByteBuffer> groupAssignment = doSync(joinResponse.leaderId(), joinResponse.groupProtocol(),
- joinResponse.members());
-
- SyncGroupRequest request = new SyncGroupRequest(groupId, generation, memberId, groupAssignment);
- log.debug("Issuing leader SyncGroup ({}: {}) to coordinator {}", ApiKeys.SYNC_GROUP, request, this.coordinator.id());
- return sendSyncGroupRequest(request);
- } catch (RuntimeException e) {
- return RequestFuture.failure(e);
- }
- } else {
- // send follower's sync group with an empty assignment
- SyncGroupRequest request = new SyncGroupRequest(groupId, generation,
- memberId, Collections.<String, ByteBuffer>emptyMap());
- log.debug("Issuing follower SyncGroup ({}: {}) to coordinator {}", ApiKeys.SYNC_GROUP, request, this.coordinator.id());
+ private RequestFuture<ByteBuffer> onJoinFollower() {
+ // send follower's sync group with an empty assignment
+ SyncGroupRequest request = new SyncGroupRequest(groupId, generation,
+ memberId, Collections.<String, ByteBuffer>emptyMap());
+ log.debug("Issuing follower SyncGroup ({}: {}) to coordinator {}", ApiKeys.SYNC_GROUP, request, this.coordinator.id());
+ return sendSyncGroupRequest(request);
+ }
+
+ private RequestFuture<ByteBuffer> onJoinLeader(JoinGroupResponse joinResponse) {
+ try {
+ // perform the leader synchronization and send back the assignment for the group
+ Map<String, ByteBuffer> groupAssignment = performAssignment(joinResponse.leaderId(), joinResponse.groupProtocol(),
+ joinResponse.members());
+
+ SyncGroupRequest request = new SyncGroupRequest(groupId, generation, memberId, groupAssignment);
+ log.debug("Issuing leader SyncGroup ({}: {}) to coordinator {}", ApiKeys.SYNC_GROUP, request, this.coordinator.id());
return sendSyncGroupRequest(request);
+ } catch (RuntimeException e) {
+ return RequestFuture.failure(e);
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/b251bebb/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
index d6291bf..20d1564 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
@@ -155,10 +155,10 @@ public final class ConsumerCoordinator extends AbstractCoordinator implements Cl
}
@Override
- protected void onJoin(int generation,
- String memberId,
- String assignmentStrategy,
- ByteBuffer assignmentBuffer) {
+ protected void onJoinComplete(int generation,
+ String memberId,
+ String assignmentStrategy,
+ ByteBuffer assignmentBuffer) {
PartitionAssignor assignor = protocolMap.get(assignmentStrategy);
if (assignor == null)
throw new IllegalStateException("Coordinator selected invalid assignment protocol: " + assignmentStrategy);
@@ -187,9 +187,9 @@ public final class ConsumerCoordinator extends AbstractCoordinator implements Cl
}
@Override
- protected Map<String, ByteBuffer> doSync(String leaderId,
- String assignmentStrategy,
- Map<String, ByteBuffer> allSubscriptions) {
+ protected Map<String, ByteBuffer> performAssignment(String leaderId,
+ String assignmentStrategy,
+ Map<String, ByteBuffer> allSubscriptions) {
PartitionAssignor assignor = protocolMap.get(protocol);
if (assignor == null)
throw new IllegalStateException("Coordinator selected invalid assignment protocol: " + assignmentStrategy);
@@ -224,7 +224,7 @@ public final class ConsumerCoordinator extends AbstractCoordinator implements Cl
}
@Override
- protected void onLeave(int generation, String memberId) {
+ protected void onJoinPrepare(int generation, String memberId) {
// commit offsets prior to rebalance if auto-commit enabled
maybeAutoCommitOffsetsSync();
http://git-wip-us.apache.org/repos/asf/kafka/blob/b251bebb/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
index f119552..e18a58b 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
@@ -460,9 +460,8 @@ public class Fetcher<K, V> {
long fetched = this.subscriptions.fetched(partition);
long consumed = this.subscriptions.consumed(partition);
// Only fetch data for partitions whose previously fetched data has been consumed
- if (consumed == fetched) {
+ if (consumed == fetched)
fetch.put(partition, new FetchRequest.PartitionData(fetched, this.fetchSize));
- }
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/b251bebb/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/distributed/WorkerCoordinator.java
----------------------------------------------------------------------
diff --git a/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/distributed/WorkerCoordinator.java b/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/distributed/WorkerCoordinator.java
index c70ed4f..2fef37c 100644
--- a/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/distributed/WorkerCoordinator.java
+++ b/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/distributed/WorkerCoordinator.java
@@ -107,7 +107,7 @@ public final class WorkerCoordinator extends AbstractCoordinator implements Clos
}
@Override
- protected void onJoin(int generation, String memberId, String protocol, ByteBuffer memberAssignment) {
+ protected void onJoinComplete(int generation, String memberId, String protocol, ByteBuffer memberAssignment) {
assignmentSnapshot = CopycatProtocol.deserializeAssignment(memberAssignment);
// At this point we always consider ourselves to be a member of the cluster, even if there was an assignment
// error (the leader couldn't make the assignment) or we are behind the config and cannot yet work on our assigned
@@ -118,7 +118,7 @@ public final class WorkerCoordinator extends AbstractCoordinator implements Clos
}
@Override
- protected Map<String, ByteBuffer> doSync(String leaderId, String protocol, Map<String, ByteBuffer> allMemberMetadata) {
+ protected Map<String, ByteBuffer> performAssignment(String leaderId, String protocol, Map<String, ByteBuffer> allMemberMetadata) {
log.debug("Performing task assignment");
Map<String, CopycatProtocol.ConfigState> allConfigs = new HashMap<>();
@@ -227,7 +227,7 @@ public final class WorkerCoordinator extends AbstractCoordinator implements Clos
}
@Override
- protected void onLeave(int generation, String memberId) {
+ protected void onJoinPrepare(int generation, String memberId) {
log.debug("Revoking previous assignment {}", assignmentSnapshot);
if (assignmentSnapshot != null && !assignmentSnapshot.failed())
listener.onRevoked(assignmentSnapshot.leader(), assignmentSnapshot.connectors(), assignmentSnapshot.tasks());
http://git-wip-us.apache.org/repos/asf/kafka/blob/b251bebb/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/distributed/WorkerCoordinatorTest.java
----------------------------------------------------------------------
diff --git a/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/distributed/WorkerCoordinatorTest.java b/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/distributed/WorkerCoordinatorTest.java
index 30c76a2..2278045 100644
--- a/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/distributed/WorkerCoordinatorTest.java
+++ b/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/distributed/WorkerCoordinatorTest.java
@@ -309,7 +309,7 @@ public class WorkerCoordinatorTest {
}
@Test
- public void testLeaderDoSync1() throws Exception {
+ public void testLeaderPerformAssignment1() throws Exception {
// Since all the protocol responses are mocked, the other tests validate doSync runs, but don't validate its
// output. So we test it directly here.
@@ -324,7 +324,7 @@ public class WorkerCoordinatorTest {
// Mark everyone as in sync with configState1
configs.put("leader", CopycatProtocol.serializeMetadata(new CopycatProtocol.ConfigState(1L)));
configs.put("member", CopycatProtocol.serializeMetadata(new CopycatProtocol.ConfigState(1L)));
- Map<String, ByteBuffer> result = Whitebox.invokeMethod(coordinator, "doSync", "leader", WorkerCoordinator.DEFAULT_SUBPROTOCOL, configs);
+ Map<String, ByteBuffer> result = Whitebox.invokeMethod(coordinator, "performAssignment", "leader", WorkerCoordinator.DEFAULT_SUBPROTOCOL, configs);
// configState1 has 1 connector, 1 task
CopycatProtocol.Assignment leaderAssignment = CopycatProtocol.deserializeAssignment(result.get("leader"));
@@ -345,7 +345,7 @@ public class WorkerCoordinatorTest {
}
@Test
- public void testLeaderDoSync2() throws Exception {
+ public void testLeaderPerformAssignment2() throws Exception {
// Since all the protocol responses are mocked, the other tests validate doSync runs, but don't validate its
// output. So we test it directly here.
@@ -360,7 +360,7 @@ public class WorkerCoordinatorTest {
// Mark everyone as in sync with configState1
configs.put("leader", CopycatProtocol.serializeMetadata(new CopycatProtocol.ConfigState(1L)));
configs.put("member", CopycatProtocol.serializeMetadata(new CopycatProtocol.ConfigState(1L)));
- Map<String, ByteBuffer> result = Whitebox.invokeMethod(coordinator, "doSync", "leader", WorkerCoordinator.DEFAULT_SUBPROTOCOL, configs);
+ Map<String, ByteBuffer> result = Whitebox.invokeMethod(coordinator, "performAssignment", "leader", WorkerCoordinator.DEFAULT_SUBPROTOCOL, configs);
// configState2 has 2 connector, 3 tasks and should trigger round robin assignment
CopycatProtocol.Assignment leaderAssignment = CopycatProtocol.deserializeAssignment(result.get("leader"));