You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ij...@apache.org on 2016/08/12 23:35:13 UTC
kafka git commit: KAFKA-4034;
Avoid unnecessary consumer coordinator lookup
Repository: kafka
Updated Branches:
refs/heads/trunk fc55f804e -> be36b3227
KAFKA-4034; Avoid unnecessary consumer coordinator lookup
Author: Jason Gustafson <ja...@confluent.io>
Reviewers: Guozhang Wang <wa...@gmail.com>, Ismael Juma <is...@juma.me.uk>
Closes #1720 from hachikuji/KAFKA-4034
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/be36b322
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/be36b322
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/be36b322
Branch: refs/heads/trunk
Commit: be36b322749003581474e2c84a3ec9ba2aaec53c
Parents: fc55f80
Author: Jason Gustafson <ja...@confluent.io>
Authored: Fri Aug 12 23:26:41 2016 +0100
Committer: Ismael Juma <is...@juma.me.uk>
Committed: Fri Aug 12 23:26:41 2016 +0100
----------------------------------------------------------------------
.../kafka/clients/consumer/KafkaConsumer.java | 22 ++++++----
.../RetriableCommitFailedException.java | 4 ++
.../consumer/internals/AbstractCoordinator.java | 25 ++++++++++-
.../consumer/internals/ConsumerCoordinator.java | 38 +++++++++++++----
.../clients/consumer/internals/Fetcher.java | 13 +++++-
.../consumer/internals/SubscriptionState.java | 2 +-
.../clients/consumer/KafkaConsumerTest.java | 44 +++++++++++++++++++-
.../kafka/api/AuthorizerIntegrationTest.scala | 18 +++++++-
8 files changed, 147 insertions(+), 19 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/be36b322/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
index ff94dc8..522cfde 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
@@ -997,9 +997,6 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
* @return The fetched records (may be empty)
*/
private Map<TopicPartition, List<ConsumerRecord<K, V>>> pollOnce(long timeout) {
- // TODO: Sub-requests should take into account the poll timeout (KAFKA-1894)
- coordinator.ensureCoordinatorReady();
-
// ensure we have partitions assigned if we expect to
if (subscriptions.partitionsAutoAssigned())
coordinator.ensurePartitionAssignment();
@@ -1429,11 +1426,22 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
* defined
*/
private void updateFetchPositions(Set<TopicPartition> partitions) {
- // refresh commits for all assigned partitions
- coordinator.refreshCommittedOffsetsIfNeeded();
+ // lookup any positions for partitions which are awaiting reset (which may be the
+ // case if the user called seekToBeginning or seekToEnd. We do this check first to
+ // avoid an unnecessary lookup of committed offsets (which typically occurs when
+ // the user is manually assigning partitions and managing their own offsets).
+ fetcher.resetOffsetsIfNeeded(partitions);
+
+ if (!subscriptions.hasAllFetchPositions()) {
+ // if we still don't have offsets for all partitions, then we should either seek
+ // to the last committed position or reset using the auto reset policy
- // then do any offset lookups in case some positions are not known
- fetcher.updateFetchPositions(partitions);
+ // first refresh commits for all assigned partitions
+ coordinator.refreshCommittedOffsetsIfNeeded();
+
+ // then do any offset lookups in case some positions are not known
+ fetcher.updateFetchPositions(partitions);
+ }
}
/*
http://git-wip-us.apache.org/repos/asf/kafka/blob/be36b322/clients/src/main/java/org/apache/kafka/clients/consumer/RetriableCommitFailedException.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/RetriableCommitFailedException.java b/clients/src/main/java/org/apache/kafka/clients/consumer/RetriableCommitFailedException.java
index 459a8ac..1c1a2f5 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/RetriableCommitFailedException.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/RetriableCommitFailedException.java
@@ -22,6 +22,10 @@ public class RetriableCommitFailedException extends RetriableException {
private static final long serialVersionUID = 1L;
+ public RetriableCommitFailedException(Throwable t) {
+ super("Offset commit failed with a retriable exception. You should retry committing offsets.", t);
+ }
+
public RetriableCommitFailedException(String message) {
super(message);
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/be36b322/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
index 6bb4406..e957856 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
@@ -98,6 +98,8 @@ public abstract class AbstractCoordinator implements Closeable {
protected String protocol;
protected int generation;
+ private RequestFuture<Void> findCoordinatorFuture = null;
+
/**
* Initialize the coordination manager.
*/
@@ -175,7 +177,7 @@ public abstract class AbstractCoordinator implements Closeable {
*/
public void ensureCoordinatorReady() {
while (coordinatorUnknown()) {
- RequestFuture<Void> future = sendGroupCoordinatorRequest();
+ RequestFuture<Void> future = lookupCoordinator();
client.poll(future);
if (future.failed()) {
@@ -189,8 +191,25 @@ public abstract class AbstractCoordinator implements Closeable {
coordinatorDead();
time.sleep(retryBackoffMs);
}
+ }
+ }
+
+ protected RequestFuture<Void> lookupCoordinator() {
+ if (findCoordinatorFuture == null) {
+ findCoordinatorFuture = sendGroupCoordinatorRequest();
+ findCoordinatorFuture.addListener(new RequestFutureListener<Void>() {
+ @Override
+ public void onSuccess(Void value) {
+ findCoordinatorFuture = null;
+ }
+ @Override
+ public void onFailure(RuntimeException e) {
+ findCoordinatorFuture = null;
+ }
+ });
}
+ return findCoordinatorFuture;
}
/**
@@ -205,6 +224,10 @@ public abstract class AbstractCoordinator implements Closeable {
* Ensure that the group is active (i.e. joined and synced)
*/
public void ensureActiveGroup() {
+ // always ensure that the coordinator is ready because we may have been disconnected
+ // when sending heartbeats and does not necessarily require us to rejoin the group.
+ ensureCoordinatorReady();
+
if (!needRejoin())
return;
http://git-wip-us.apache.org/repos/asf/kafka/blob/be36b322/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
index b210746..81a40f1 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
@@ -366,7 +366,36 @@ public final class ConsumerCoordinator extends AbstractCoordinator {
}
- public void commitOffsetsAsync(final Map<TopicPartition, OffsetAndMetadata> offsets, OffsetCommitCallback callback) {
+ public void commitOffsetsAsync(final Map<TopicPartition, OffsetAndMetadata> offsets, final OffsetCommitCallback callback) {
+ if (!coordinatorUnknown()) {
+ doCommitOffsetsAsync(offsets, callback);
+ } else {
+ // we don't know the current coordinator, so try to find it and then send the commit
+ // or fail (we don't want recursive retries which can cause offset commits to arrive
+ // out of order). Note that there may be multiple offset commits chained to the same
+ // coordinator lookup request. This is fine because the listeners will be invoked in
+ // the same order that they were added. Note also that AbstractCoordinator prevents
+ // multiple concurrent coordinator lookup requests.
+ lookupCoordinator().addListener(new RequestFutureListener<Void>() {
+ @Override
+ public void onSuccess(Void value) {
+ doCommitOffsetsAsync(offsets, callback);
+ }
+
+ @Override
+ public void onFailure(RuntimeException e) {
+ callback.onComplete(offsets, new RetriableCommitFailedException(e));
+ }
+ });
+ }
+
+ // ensure the commit has a chance to be transmitted (without blocking on its completion).
+ // Note that commits are treated as heartbeats by the coordinator, so there is no need to
+ // explicitly allow heartbeats through delayed task execution.
+ client.pollNoWakeup();
+ }
+
+ private void doCommitOffsetsAsync(final Map<TopicPartition, OffsetAndMetadata> offsets, final OffsetCommitCallback callback) {
this.subscriptions.needRefreshCommits();
RequestFuture<Void> future = sendOffsetCommitRequest(offsets);
final OffsetCommitCallback cb = callback == null ? defaultOffsetCommitCallback : callback;
@@ -381,17 +410,12 @@ public final class ConsumerCoordinator extends AbstractCoordinator {
@Override
public void onFailure(RuntimeException e) {
if (e instanceof RetriableException) {
- cb.onComplete(offsets, new RetriableCommitFailedException("Commit offsets failed with retriable exception. You should retry committing offsets.", e));
+ cb.onComplete(offsets, new RetriableCommitFailedException(e));
} else {
cb.onComplete(offsets, e);
}
}
});
-
- // ensure the commit has a chance to be transmitted (without blocking on its completion).
- // Note that commits are treated as heartbeats by the coordinator, so there is no need to
- // explicitly allow heartbeats through delayed task execution.
- client.pollNoWakeup();
}
/**
http://git-wip-us.apache.org/repos/asf/kafka/blob/be36b322/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
index cf2ebc3..fec9b6e 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
@@ -155,6 +155,18 @@ public class Fetcher<K, V> {
}
/**
+ * Lookup and set offsets for any partitions which are awaiting an explicit reset.
+ * @param partitions the partitions to reset
+ */
+ public void resetOffsetsIfNeeded(Set<TopicPartition> partitions) {
+ for (TopicPartition tp : partitions) {
+ // TODO: If there are several offsets to reset, we could submit offset requests in parallel
+ if (subscriptions.isAssigned(tp) && subscriptions.isOffsetResetNeeded(tp))
+ resetOffset(tp);
+ }
+ }
+
+ /**
* Update the fetch positions for the provided partitions.
* @param partitions the partitions to update positions for
* @throws NoOffsetForPartitionException If no offset is stored for a given partition and no reset policy is available
@@ -165,7 +177,6 @@ public class Fetcher<K, V> {
if (!subscriptions.isAssigned(tp) || subscriptions.isFetchable(tp))
continue;
- // TODO: If there are several offsets to reset, we could submit offset requests in parallel
if (subscriptions.isOffsetResetNeeded(tp)) {
resetOffset(tp);
} else if (subscriptions.committed(tp) == null) {
http://git-wip-us.apache.org/repos/asf/kafka/blob/be36b322/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
index 38660e1..e9b2eb2 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
@@ -49,7 +49,7 @@ public class SubscriptionState {
private enum SubscriptionType {
NONE, AUTO_TOPICS, AUTO_PATTERN, USER_ASSIGNED
- };
+ }
/* the type of subscription */
private SubscriptionType subscriptionType;
http://git-wip-us.apache.org/repos/asf/kafka/blob/be36b322/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
index 9affa79..8b52664 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
@@ -43,6 +43,7 @@ import org.apache.kafka.common.requests.FetchResponse;
import org.apache.kafka.common.requests.GroupCoordinatorResponse;
import org.apache.kafka.common.requests.HeartbeatResponse;
import org.apache.kafka.common.requests.JoinGroupResponse;
+import org.apache.kafka.common.requests.ListOffsetResponse;
import org.apache.kafka.common.requests.OffsetCommitRequest;
import org.apache.kafka.common.requests.OffsetCommitResponse;
import org.apache.kafka.common.requests.OffsetFetchResponse;
@@ -89,7 +90,7 @@ public class KafkaConsumerTest {
final int oldInitCount = MockMetricsReporter.INIT_COUNT.get();
final int oldCloseCount = MockMetricsReporter.CLOSE_COUNT.get();
try {
- KafkaConsumer<byte[], byte[]> consumer = new KafkaConsumer<byte[], byte[]>(
+ KafkaConsumer<byte[], byte[]> consumer = new KafkaConsumer<>(
props, new ByteArrayDeserializer(), new ByteArrayDeserializer());
} catch (KafkaException e) {
assertEquals(oldInitCount + 1, MockMetricsReporter.INIT_COUNT.get());
@@ -455,6 +456,38 @@ public class KafkaConsumerTest {
}
@Test
+ public void verifyNoCoordinatorLookupForManualAssignmentWithSeek() {
+ String topic = "topic";
+ final TopicPartition partition = new TopicPartition(topic, 0);
+ int sessionTimeoutMs = 3000;
+ int heartbeatIntervalMs = 2000;
+ int autoCommitIntervalMs = 1000;
+
+ Time time = new MockTime();
+ MockClient client = new MockClient(time);
+ Cluster cluster = TestUtils.singletonCluster(topic, 1);
+ Node node = cluster.nodes().get(0);
+ client.setNode(node);
+ Metadata metadata = new Metadata(0, Long.MAX_VALUE);
+ metadata.update(cluster, time.milliseconds());
+ PartitionAssignor assignor = new RoundRobinAssignor();
+
+ final KafkaConsumer<String, String> consumer = newConsumer(time, client, metadata, assignor,
+ sessionTimeoutMs, heartbeatIntervalMs, autoCommitIntervalMs);
+ consumer.assign(Arrays.asList(partition));
+ consumer.seekToBeginning(Arrays.asList(partition));
+
+ // there shouldn't be any need to lookup the coordinator or fetch committed offsets.
+ // we just lookup the starting position and send the record fetch.
+ client.prepareResponse(listOffsetsResponse(Collections.singletonMap(partition, 50L), Errors.NONE.code()));
+ client.prepareResponse(fetchResponse(partition, 50L, 5));
+
+ ConsumerRecords<String, String> records = consumer.poll(0);
+ assertEquals(5, records.count());
+ assertEquals(55L, consumer.position(partition));
+ }
+
+ @Test
public void testCommitsFetchedDuringAssign() {
String topic = "topic";
final TopicPartition partition1 = new TopicPartition(topic, 0);
@@ -669,6 +702,15 @@ public class KafkaConsumerTest {
return new OffsetFetchResponse(partitionData).toStruct();
}
+ private Struct listOffsetsResponse(Map<TopicPartition, Long> offsets, short error) {
+ Map<TopicPartition, ListOffsetResponse.PartitionData> partitionData = new HashMap<>();
+ for (Map.Entry<TopicPartition, Long> partitionOffset : offsets.entrySet()) {
+ partitionData.put(partitionOffset.getKey(), new ListOffsetResponse.PartitionData(error,
+ Collections.singletonList(partitionOffset.getValue())));
+ }
+ return new ListOffsetResponse(partitionData).toStruct();
+ }
+
private Struct fetchResponse(TopicPartition tp, long fetchOffset, int count) {
MemoryRecords records = MemoryRecords.emptyRecords(ByteBuffer.allocate(1024), CompressionType.NONE);
for (int i = 0; i < count; i++)
http://git-wip-us.apache.org/repos/asf/kafka/blob/be36b322/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
index 332b681..817cdf7 100644
--- a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
@@ -340,13 +340,15 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
}
@Test
- def testConsumeWithNoGroupAccess(): Unit = {
+ def testSimpleConsumeWithOffsetLookupAndNoGroupAccess(): Unit = {
addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Write)), topicResource)
sendRecords(1, tp)
removeAllAcls()
addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Read)), topicResource)
try {
+ // note this still depends on group access because we haven't set offsets explicitly, which means
+ // they will first be fetched from the consumer coordinator (which requires group access)
this.consumers.head.assign(List(tp).asJava)
consumeRecords(this.consumers.head)
Assert.fail("should have thrown exception")
@@ -356,6 +358,20 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
}
@Test
+ def testSimpleConsumeWithExplicitSeekAndNoGroupAccess(): Unit = {
+ addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Write)), topicResource)
+ sendRecords(1, tp)
+ removeAllAcls()
+
+ addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Read)), topicResource)
+
+ // in this case, we do an explicit seek, so there should be no need to query the coordinator at all
+ this.consumers.head.assign(List(tp).asJava)
+ this.consumers.head.seekToBeginning(List(tp).asJava)
+ consumeRecords(this.consumers.head)
+ }
+
+ @Test
def testConsumeWithNoTopicAccess() {
addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Write)), topicResource)
sendRecords(1, tp)