You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by rs...@apache.org on 2017/05/25 10:23:46 UTC
kafka git commit: KAFKA-5263: Avoid tight polling loop in consumer
with no ready nodes
Repository: kafka
Updated Branches:
refs/heads/trunk a10990f44 -> 64fc1a7ca
KAFKA-5263: Avoid tight polling loop in consumer with no ready nodes
For consumers with manual partition assignment, await metadata when there are no ready nodes to avoid busy polling.
Author: Rajini Sivaram <ra...@googlemail.com>
Reviewers: Jason Gustafson <ja...@confluent.io>, Ismael Juma <is...@juma.me.uk>
Closes #3124 from rajinisivaram/KAFKA-5263
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/64fc1a7c
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/64fc1a7c
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/64fc1a7c
Branch: refs/heads/trunk
Commit: 64fc1a7cae348fad10e84c5ebc457c2a391573ee
Parents: a10990f
Author: Rajini Sivaram <ra...@googlemail.com>
Authored: Thu May 25 11:23:18 2017 +0100
Committer: Rajini Sivaram <ra...@googlemail.com>
Committed: Thu May 25 11:23:18 2017 +0100
----------------------------------------------------------------------
.../kafka/clients/ClusterConnectionStates.java | 12 ++++++
.../org/apache/kafka/clients/KafkaClient.java | 5 +++
.../org/apache/kafka/clients/NetworkClient.java | 5 +++
.../kafka/clients/consumer/KafkaConsumer.java | 2 +-
.../consumer/internals/ConsumerCoordinator.java | 42 ++++++++++++++------
.../internals/ConsumerNetworkClient.java | 10 +++--
.../org/apache/kafka/clients/MockClient.java | 5 +++
.../internals/ConsumerCoordinatorTest.java | 32 +++++++--------
8 files changed, 79 insertions(+), 34 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/64fc1a7c/clients/src/main/java/org/apache/kafka/clients/ClusterConnectionStates.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/ClusterConnectionStates.java b/clients/src/main/java/org/apache/kafka/clients/ClusterConnectionStates.java
index 32a222b..4d4bedd 100644
--- a/clients/src/main/java/org/apache/kafka/clients/ClusterConnectionStates.java
+++ b/clients/src/main/java/org/apache/kafka/clients/ClusterConnectionStates.java
@@ -145,6 +145,18 @@ final class ClusterConnectionStates {
}
/**
+ * Return true if there is at least one node with connection in ready state and false otherwise.
+ */
+ public boolean hasReadyNodes() {
+ for (Map.Entry<String, NodeConnectionState> entry : nodeState.entrySet()) {
+ NodeConnectionState state = entry.getValue();
+ if (state != null && state.state == ConnectionState.READY)
+ return true;
+ }
+ return false;
+ }
+
+ /**
* Return true if the connection has been disconnected
* @param id The id of the node to check
*/
http://git-wip-us.apache.org/repos/asf/kafka/blob/64fc1a7c/clients/src/main/java/org/apache/kafka/clients/KafkaClient.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/KafkaClient.java b/clients/src/main/java/org/apache/kafka/clients/KafkaClient.java
index 9d63d43..d563fa0 100644
--- a/clients/src/main/java/org/apache/kafka/clients/KafkaClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/KafkaClient.java
@@ -125,6 +125,11 @@ public interface KafkaClient extends Closeable {
boolean hasInFlightRequests(String nodeId);
/**
+ * Return true if there is at least one node with connection in ready state and false otherwise.
+ */
+ boolean hasReadyNodes();
+
+ /**
* Wake up the client if it is currently blocked waiting for I/O
*/
void wakeup();
http://git-wip-us.apache.org/repos/asf/kafka/blob/64fc1a7c/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java b/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
index 8708218..93fbb85 100644
--- a/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
@@ -417,6 +417,11 @@ public class NetworkClient implements KafkaClient {
return this.inFlightRequests.isEmpty(node);
}
+ @Override
+ public boolean hasReadyNodes() {
+ return connectionStates.hasReadyNodes();
+ }
+
/**
* Interrupt the client if it is blocked waiting on I/O.
*/
http://git-wip-us.apache.org/repos/asf/kafka/blob/64fc1a7c/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
index 3c6d409..6489792 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
@@ -1043,7 +1043,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
*/
private Map<TopicPartition, List<ConsumerRecord<K, V>>> pollOnce(long timeout) {
client.maybeTriggerWakeup();
- coordinator.poll(time.milliseconds());
+ coordinator.poll(time.milliseconds(), timeout);
// fetch positions if we have partitions we're subscribed to that we
// don't know the offset for
http://git-wip-us.apache.org/repos/asf/kafka/blob/64fc1a7c/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
index ca2108d..b35a571 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
@@ -278,23 +278,39 @@ public final class ConsumerCoordinator extends AbstractCoordinator {
*
* @param now current time in milliseconds
*/
- public void poll(long now) {
+ public void poll(long now, long remainingMs) {
invokeCompletedOffsetCommitCallbacks();
- if (subscriptions.partitionsAutoAssigned() && coordinatorUnknown()) {
- ensureCoordinatorReady();
- now = time.milliseconds();
- }
+ if (subscriptions.partitionsAutoAssigned()) {
+ if (coordinatorUnknown()) {
+ ensureCoordinatorReady();
+ now = time.milliseconds();
+ }
- if (needRejoin()) {
- // due to a race condition between the initial metadata fetch and the initial rebalance,
- // we need to ensure that the metadata is fresh before joining initially. This ensures
- // that we have matched the pattern against the cluster's topics at least once before joining.
- if (subscriptions.hasPatternSubscription())
- client.ensureFreshMetadata();
+ if (needRejoin()) {
+ // due to a race condition between the initial metadata fetch and the initial rebalance,
+ // we need to ensure that the metadata is fresh before joining initially. This ensures
+ // that we have matched the pattern against the cluster's topics at least once before joining.
+ if (subscriptions.hasPatternSubscription())
+ client.ensureFreshMetadata();
- ensureActiveGroup();
- now = time.milliseconds();
+ ensureActiveGroup();
+ now = time.milliseconds();
+ }
+ } else {
+ // For manually assigned partitions, if there are no ready nodes, await metadata.
+ // If connections to all nodes fail, wakeups triggered while attempting to send fetch
+ // requests result in polls returning immediately, causing a tight loop of polls. Without
+ // the wakeup, poll() with no channels would block for the timeout, delaying re-connection.
+ // awaitMetadataUpdate() initiates new connections with configured backoff and avoids the busy loop.
+ // When group management is used, metadata wait is already performed for this scenario as
+ // coordinator is unknown, hence this check is not required.
+ if (metadata.updateRequested() && !client.hasReadyNodes()) {
+ boolean metadataUpdated = client.awaitMetadataUpdate(remainingMs);
+ if (!metadataUpdated && !client.hasReadyNodes())
+ return;
+ now = time.milliseconds();
+ }
}
pollHeartbeat(now);
http://git-wip-us.apache.org/repos/asf/kafka/blob/64fc1a7c/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java
index 92d049a..84e9a81 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java
@@ -107,10 +107,12 @@ public class ConsumerNetworkClient implements Closeable {
return completionHandler.future;
}
- public Node leastLoadedNode() {
- synchronized (this) {
- return client.leastLoadedNode(time.milliseconds());
- }
+ public synchronized Node leastLoadedNode() {
+ return client.leastLoadedNode(time.milliseconds());
+ }
+
+ public synchronized boolean hasReadyNodes() {
+ return client.hasReadyNodes();
}
/**
http://git-wip-us.apache.org/repos/asf/kafka/blob/64fc1a7c/clients/src/test/java/org/apache/kafka/clients/MockClient.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/MockClient.java b/clients/src/test/java/org/apache/kafka/clients/MockClient.java
index 8fff3cc..9ca95e3 100644
--- a/clients/src/test/java/org/apache/kafka/clients/MockClient.java
+++ b/clients/src/test/java/org/apache/kafka/clients/MockClient.java
@@ -318,6 +318,11 @@ public class MockClient implements KafkaClient {
}
@Override
+ public boolean hasReadyNodes() {
+ return !ready.isEmpty();
+ }
+
+ @Override
public ClientRequest newClientRequest(String nodeId, AbstractRequest.Builder<?> requestBuilder, long createdTimeMs,
boolean expectResponse) {
return newClientRequest(nodeId, requestBuilder, createdTimeMs, expectResponse, null);
http://git-wip-us.apache.org/repos/asf/kafka/blob/64fc1a7c/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
index 8b582ca..770d4f7 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
@@ -170,7 +170,7 @@ public class ConsumerCoordinatorTest {
client.prepareResponse(joinGroupLeaderResponse(0, "memberId", Collections.<String, List<String>>emptyMap(),
Errors.GROUP_AUTHORIZATION_FAILED));
- coordinator.poll(time.milliseconds());
+ coordinator.poll(time.milliseconds(), Long.MAX_VALUE);
}
@Test
@@ -299,7 +299,7 @@ public class ConsumerCoordinatorTest {
client.prepareResponse(joinGroupLeaderResponse(0, consumerId, Collections.<String, List<String>>emptyMap(),
Errors.INVALID_GROUP_ID));
- coordinator.poll(time.milliseconds());
+ coordinator.poll(time.milliseconds(), Long.MAX_VALUE);
}
@Test
@@ -329,7 +329,7 @@ public class ConsumerCoordinatorTest {
sync.groupAssignment().containsKey(consumerId);
}
}, syncGroupResponse(singletonList(t1p), Errors.NONE));
- coordinator.poll(time.milliseconds());
+ coordinator.poll(time.milliseconds(), Long.MAX_VALUE);
assertFalse(coordinator.needRejoin());
assertEquals(singleton(t1p), subscriptions.assignedPartitions());
@@ -371,7 +371,7 @@ public class ConsumerCoordinatorTest {
// expect client to force updating the metadata, if yes gives it both topics
client.prepareMetadataUpdate(cluster, Collections.<String>emptySet());
- coordinator.poll(time.milliseconds());
+ coordinator.poll(time.milliseconds(), Long.MAX_VALUE);
assertFalse(coordinator.needRejoin());
assertEquals(2, subscriptions.assignedPartitions().size());
@@ -434,7 +434,7 @@ public class ConsumerCoordinatorTest {
}, joinGroupLeaderResponse(2, consumerId, updatedSubscriptions, Errors.NONE));
client.prepareResponse(syncGroupResponse(newAssignment, Errors.NONE));
- coordinator.poll(time.milliseconds());
+ coordinator.poll(time.milliseconds(), Long.MAX_VALUE);
assertFalse(coordinator.needRejoin());
assertEquals(updatedSubscriptionSet, subscriptions.subscription());
@@ -466,14 +466,14 @@ public class ConsumerCoordinatorTest {
consumerClient.wakeup();
try {
- coordinator.poll(time.milliseconds());
+ coordinator.poll(time.milliseconds(), Long.MAX_VALUE);
} catch (WakeupException e) {
// ignore
}
// now complete the second half
client.prepareResponse(syncGroupResponse(singletonList(t1p), Errors.NONE));
- coordinator.poll(time.milliseconds());
+ coordinator.poll(time.milliseconds(), Long.MAX_VALUE);
assertFalse(coordinator.needRejoin());
assertEquals(singleton(t1p), subscriptions.assignedPartitions());
@@ -726,7 +726,7 @@ public class ConsumerCoordinatorTest {
client.prepareResponse(joinGroupLeaderResponse(1, consumerId, memberSubscriptions, Errors.NONE));
client.prepareResponse(syncGroupResponse(singletonList(t1p), Errors.NONE));
- coordinator.poll(time.milliseconds());
+ coordinator.poll(time.milliseconds(), Long.MAX_VALUE);
assertFalse(coordinator.needRejoin());
@@ -783,7 +783,7 @@ public class ConsumerCoordinatorTest {
client.prepareResponse(joinGroupLeaderResponse(2, consumerId, memberSubscriptions, Errors.NONE));
client.prepareResponse(syncGroupResponse(Arrays.asList(tp1, tp2), Errors.NONE));
- coordinator.poll(time.milliseconds());
+ coordinator.poll(time.milliseconds(), Long.MAX_VALUE);
assertFalse(coordinator.needRejoin());
assertEquals(new HashSet<>(Arrays.asList(tp1, tp2)), subscriptions.assignedPartitions());
@@ -830,7 +830,7 @@ public class ConsumerCoordinatorTest {
client.prepareResponse(joinGroupLeaderResponse(1, consumerId, memberSubscriptions, Errors.NONE));
client.prepareResponse(syncGroupResponse(Collections.<TopicPartition>emptyList(), Errors.NONE));
- coordinator.poll(time.milliseconds());
+ coordinator.poll(time.milliseconds(), Long.MAX_VALUE);
if (!assign) {
assertFalse(coordinator.needRejoin());
assertEquals(Collections.<TopicPartition>emptySet(), rebalanceListener.assigned);
@@ -841,7 +841,7 @@ public class ConsumerCoordinatorTest {
client.poll(0, time.milliseconds());
client.prepareResponse(joinGroupLeaderResponse(2, consumerId, memberSubscriptions, Errors.NONE));
client.prepareResponse(syncGroupResponse(singletonList(t1p), Errors.NONE));
- coordinator.poll(time.milliseconds());
+ coordinator.poll(time.milliseconds(), Long.MAX_VALUE);
assertFalse("Metadata refresh requested unnecessarily", metadata.updateRequested());
if (!assign) {
@@ -970,7 +970,7 @@ public class ConsumerCoordinatorTest {
client.prepareResponse(offsetCommitResponse(Collections.singletonMap(t1p, Errors.NONE)));
time.sleep(autoCommitIntervalMs);
- coordinator.poll(time.milliseconds());
+ coordinator.poll(time.milliseconds(), Long.MAX_VALUE);
assertEquals(100L, subscriptions.committed(t1p).offset());
}
@@ -999,7 +999,7 @@ public class ConsumerCoordinatorTest {
client.prepareResponse(offsetCommitResponse(Collections.singletonMap(t1p, Errors.NONE)));
time.sleep(autoCommitIntervalMs);
- coordinator.poll(time.milliseconds());
+ coordinator.poll(time.milliseconds(), Long.MAX_VALUE);
assertEquals(100L, subscriptions.committed(t1p).offset());
}
@@ -1017,7 +1017,7 @@ public class ConsumerCoordinatorTest {
client.prepareResponse(offsetCommitResponse(Collections.singletonMap(t1p, Errors.NONE)));
time.sleep(autoCommitIntervalMs);
- coordinator.poll(time.milliseconds());
+ coordinator.poll(time.milliseconds(), Long.MAX_VALUE);
assertEquals(100L, subscriptions.committed(t1p).offset());
}
@@ -1044,7 +1044,7 @@ public class ConsumerCoordinatorTest {
// sleep only for the retry backoff
time.sleep(retryBackoffMs);
client.prepareResponse(offsetCommitResponse(Collections.singletonMap(t1p, Errors.NONE)));
- coordinator.poll(time.milliseconds());
+ coordinator.poll(time.milliseconds(), Long.MAX_VALUE);
assertEquals(100L, subscriptions.committed(t1p).offset());
}
@@ -1508,7 +1508,7 @@ public class ConsumerCoordinatorTest {
subscriptions.assignFromUser(singleton(t1p));
subscriptions.seek(t1p, 100);
- coordinator.poll(time.milliseconds());
+ coordinator.poll(time.milliseconds(), Long.MAX_VALUE);
return coordinator;
}