You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gw...@apache.org on 2016/04/12 01:09:34 UTC
[18/50] [abbrv] kafka git commit: KAFKA-3486: fix autocommit when
partitions assigned manually
KAFKA-3486: fix autocommit when partitions assigned manually
Author: Jason Gustafson <ja...@confluent.io>
Reviewers: Ewen Cheslack-Postava <ew...@confluent.io>
Closes #1169 from hachikuji/KAFKA-3486
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/dd5480a4
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/dd5480a4
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/dd5480a4
Branch: refs/heads/0.10.0
Commit: dd5480a47eb0f45214c179b7f14ffaf493164222
Parents: bd5325d
Author: Jason Gustafson <ja...@confluent.io>
Authored: Sat Apr 2 23:02:19 2016 -0700
Committer: Ewen Cheslack-Postava <me...@ewencp.org>
Committed: Sat Apr 2 23:02:19 2016 -0700
----------------------------------------------------------------------
.../consumer/internals/ConsumerCoordinator.java | 54 +++-----
.../internals/ConsumerCoordinatorTest.java | 122 +++++++++++++++++--
2 files changed, 134 insertions(+), 42 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/dd5480a4/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
index e582ce3..a364987 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
@@ -110,7 +110,13 @@ public final class ConsumerCoordinator extends AbstractCoordinator {
addMetadataListener();
- this.autoCommitTask = autoCommitEnabled ? new AutoCommitTask(autoCommitIntervalMs) : null;
+ if (autoCommitEnabled) {
+ this.autoCommitTask = new AutoCommitTask(autoCommitIntervalMs);
+ this.autoCommitTask.reschedule();
+ } else {
+ this.autoCommitTask = null;
+ }
+
this.sensors = new ConsumerCoordinatorMetrics(metrics, metricGrpPrefix);
this.interceptors = interceptors;
this.excludeInternalTopics = excludeInternalTopics;
@@ -187,9 +193,9 @@ public final class ConsumerCoordinator extends AbstractCoordinator {
// give the assignor a chance to update internal state based on the received assignment
assignor.onAssignment(assignment);
- // restart the autocommit task if needed
+ // reschedule the auto commit starting from now
if (autoCommitEnabled)
- autoCommitTask.enable();
+ autoCommitTask.reschedule();
// execute the user's callback after rebalance
ConsumerRebalanceListener listener = subscriptions.listener();
@@ -384,52 +390,36 @@ public final class ConsumerCoordinator extends AbstractCoordinator {
private class AutoCommitTask implements DelayedTask {
private final long interval;
- private boolean enabled = false;
- private boolean requestInFlight = false;
public AutoCommitTask(long interval) {
this.interval = interval;
}
- public void enable() {
- if (!enabled) {
- // there shouldn't be any instances scheduled, but call unschedule anyway to ensure
- // that this task is only ever scheduled once
- client.unschedule(this);
- this.enabled = true;
-
- if (!requestInFlight) {
- long now = time.milliseconds();
- client.schedule(this, interval + now);
- }
- }
- }
-
- public void disable() {
- this.enabled = false;
- client.unschedule(this);
+ private void reschedule() {
+ client.schedule(this, time.milliseconds() + interval);
}
private void reschedule(long at) {
- if (enabled)
- client.schedule(this, at);
+ client.schedule(this, at);
}
public void run(final long now) {
- if (!enabled)
- return;
-
if (coordinatorUnknown()) {
log.debug("Cannot auto-commit offsets for group {} since the coordinator is unknown", groupId);
- client.schedule(this, now + retryBackoffMs);
+ reschedule(now + retryBackoffMs);
+ return;
+ }
+
+ if (needRejoin()) {
+ // skip the commit when we're rejoining since we'll commit offsets synchronously
+ // before the revocation callback is invoked
+ reschedule(now + interval);
return;
}
- requestInFlight = true;
commitOffsetsAsync(subscriptions.allConsumed(), new OffsetCommitCallback() {
@Override
public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception exception) {
- requestInFlight = false;
if (exception == null) {
reschedule(now + interval);
} else if (exception instanceof SendFailedException) {
@@ -446,10 +436,6 @@ public final class ConsumerCoordinator extends AbstractCoordinator {
private void maybeAutoCommitOffsetsSync() {
if (autoCommitEnabled) {
- // disable periodic commits prior to committing synchronously. note that they will
- // be re-enabled after a rebalance completes
- autoCommitTask.disable();
-
try {
commitOffsetsSync(subscriptions.allConsumed());
} catch (WakeupException e) {
http://git-wip-us.apache.org/repos/asf/kafka/blob/dd5480a4/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
index 8844adc..623e5ef 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
@@ -77,11 +77,11 @@ public class ConsumerCoordinatorTest {
private String topicName = "test";
private String groupId = "test-group";
private TopicPartition tp = new TopicPartition(topicName, 0);
- private int sessionTimeoutMs = 10;
- private int heartbeatIntervalMs = 2;
+ private int sessionTimeoutMs = 10000;
+ private int heartbeatIntervalMs = 5000;
private long retryBackoffMs = 100;
private boolean autoCommitEnabled = false;
- private long autoCommitIntervalMs = 5000;
+ private long autoCommitIntervalMs = 2000;
private MockPartitionAssignor partitionAssignor = new MockPartitionAssignor();
private List<PartitionAssignor> assignors = Arrays.<PartitionAssignor>asList(partitionAssignor);
private MockTime time;
@@ -110,7 +110,7 @@ public class ConsumerCoordinatorTest {
this.partitionAssignor.clear();
client.setNode(node);
- this.coordinator = buildCoordinator(metrics, assignors, ConsumerConfig.EXCLUDE_INTERNAL_TOPICS_DEFAULT);
+ this.coordinator = buildCoordinator(metrics, assignors, ConsumerConfig.EXCLUDE_INTERNAL_TOPICS_DEFAULT, autoCommitEnabled);
}
@After
@@ -546,7 +546,7 @@ public class ConsumerCoordinatorTest {
@Test
public void testIncludeInternalTopicsConfigOption() {
- coordinator = buildCoordinator(new Metrics(), assignors, false);
+ coordinator = buildCoordinator(new Metrics(), assignors, false, false);
subscriptions.subscribe(Pattern.compile(".*"), rebalanceListener);
metadata.update(TestUtils.singletonCluster(TopicConstants.GROUP_METADATA_TOPIC_NAME, 2), time.milliseconds());
@@ -633,6 +633,107 @@ public class ConsumerCoordinatorTest {
}
@Test
+ public void testAutoCommitDynamicAssignment() {
+ final String consumerId = "consumer";
+
+ ConsumerCoordinator coordinator = buildCoordinator(new Metrics(), assignors,
+ ConsumerConfig.EXCLUDE_INTERNAL_TOPICS_DEFAULT, true);
+
+ subscriptions.subscribe(Arrays.asList(topicName), rebalanceListener);
+ subscriptions.needReassignment();
+
+ client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code()));
+ coordinator.ensureCoordinatorKnown();
+
+ client.prepareResponse(joinGroupFollowerResponse(1, consumerId, "leader", Errors.NONE.code()));
+ client.prepareResponse(syncGroupResponse(Arrays.asList(tp), Errors.NONE.code()));
+ coordinator.ensurePartitionAssignment();
+
+ subscriptions.seek(tp, 100);
+
+ client.prepareResponse(offsetCommitResponse(Collections.singletonMap(tp, Errors.NONE.code())));
+ time.sleep(autoCommitIntervalMs);
+ consumerClient.poll(0);
+
+ assertEquals(100L, subscriptions.committed(tp).offset());
+ }
+
+ @Test
+ public void testAutoCommitDynamicAssignmentRebalance() {
+ final String consumerId = "consumer";
+
+ ConsumerCoordinator coordinator = buildCoordinator(new Metrics(), assignors,
+ ConsumerConfig.EXCLUDE_INTERNAL_TOPICS_DEFAULT, true);
+
+ subscriptions.subscribe(Arrays.asList(topicName), rebalanceListener);
+ subscriptions.needReassignment();
+
+ client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code()));
+ coordinator.ensureCoordinatorKnown();
+
+ // haven't joined, so should not cause a commit
+ time.sleep(autoCommitIntervalMs);
+ consumerClient.poll(0);
+
+ client.prepareResponse(joinGroupFollowerResponse(1, consumerId, "leader", Errors.NONE.code()));
+ client.prepareResponse(syncGroupResponse(Arrays.asList(tp), Errors.NONE.code()));
+ coordinator.ensurePartitionAssignment();
+
+ subscriptions.seek(tp, 100);
+
+ client.prepareResponse(offsetCommitResponse(Collections.singletonMap(tp, Errors.NONE.code())));
+ time.sleep(autoCommitIntervalMs);
+ consumerClient.poll(0);
+
+ assertEquals(100L, subscriptions.committed(tp).offset());
+ }
+
+ @Test
+ public void testAutoCommitManualAssignment() {
+ ConsumerCoordinator coordinator = buildCoordinator(new Metrics(), assignors,
+ ConsumerConfig.EXCLUDE_INTERNAL_TOPICS_DEFAULT, true);
+
+ subscriptions.assignFromUser(Arrays.asList(tp));
+ subscriptions.seek(tp, 100);
+
+ client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code()));
+ coordinator.ensureCoordinatorKnown();
+
+ client.prepareResponse(offsetCommitResponse(Collections.singletonMap(tp, Errors.NONE.code())));
+ time.sleep(autoCommitIntervalMs);
+ consumerClient.poll(0);
+
+ assertEquals(100L, subscriptions.committed(tp).offset());
+ }
+
+ @Test
+ public void testAutoCommitManualAssignmentCoordinatorUnknown() {
+ ConsumerCoordinator coordinator = buildCoordinator(new Metrics(), assignors,
+ ConsumerConfig.EXCLUDE_INTERNAL_TOPICS_DEFAULT, true);
+
+ subscriptions.assignFromUser(Arrays.asList(tp));
+ subscriptions.seek(tp, 100);
+
+ // no commit initially since coordinator is unknown
+ consumerClient.poll(0);
+ time.sleep(autoCommitIntervalMs);
+ consumerClient.poll(0);
+
+ assertNull(subscriptions.committed(tp));
+
+ // now find the coordinator
+ client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code()));
+ coordinator.ensureCoordinatorKnown();
+
+ // sleep only for the retry backoff
+ time.sleep(retryBackoffMs);
+ client.prepareResponse(offsetCommitResponse(Collections.singletonMap(tp, Errors.NONE.code())));
+ consumerClient.poll(0);
+
+ assertEquals(100L, subscriptions.committed(tp).offset());
+ }
+
+ @Test
public void testCommitOffsetMetadata() {
subscriptions.assignFromUser(Arrays.asList(tp));
@@ -896,7 +997,8 @@ public class ConsumerCoordinatorTest {
RangeAssignor range = new RangeAssignor();
try (Metrics metrics = new Metrics(time)) {
- ConsumerCoordinator coordinator = buildCoordinator(metrics, Arrays.<PartitionAssignor>asList(roundRobin, range), ConsumerConfig.EXCLUDE_INTERNAL_TOPICS_DEFAULT);
+ ConsumerCoordinator coordinator = buildCoordinator(metrics, Arrays.<PartitionAssignor>asList(roundRobin, range),
+ ConsumerConfig.EXCLUDE_INTERNAL_TOPICS_DEFAULT, false);
List<ProtocolMetadata> metadata = coordinator.metadata();
assertEquals(2, metadata.size());
assertEquals(roundRobin.name(), metadata.get(0).name());
@@ -904,7 +1006,8 @@ public class ConsumerCoordinatorTest {
}
try (Metrics metrics = new Metrics(time)) {
- ConsumerCoordinator coordinator = buildCoordinator(metrics, Arrays.<PartitionAssignor>asList(range, roundRobin), ConsumerConfig.EXCLUDE_INTERNAL_TOPICS_DEFAULT);
+ ConsumerCoordinator coordinator = buildCoordinator(metrics, Arrays.<PartitionAssignor>asList(range, roundRobin),
+ ConsumerConfig.EXCLUDE_INTERNAL_TOPICS_DEFAULT, false);
List<ProtocolMetadata> metadata = coordinator.metadata();
assertEquals(2, metadata.size());
assertEquals(range.name(), metadata.get(0).name());
@@ -912,7 +1015,10 @@ public class ConsumerCoordinatorTest {
}
}
- private ConsumerCoordinator buildCoordinator(Metrics metrics, List<PartitionAssignor> assignors, boolean excludeInternalTopics) {
+ private ConsumerCoordinator buildCoordinator(Metrics metrics,
+ List<PartitionAssignor> assignors,
+ boolean excludeInternalTopics,
+ boolean autoCommitEnabled) {
return new ConsumerCoordinator(
consumerClient,
groupId,