You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by rs...@apache.org on 2017/05/25 10:23:46 UTC

kafka git commit: KAFKA-5263: Avoid tight polling loop in consumer with no ready nodes

Repository: kafka
Updated Branches:
  refs/heads/trunk a10990f44 -> 64fc1a7ca


KAFKA-5263: Avoid tight polling loop in consumer with no ready nodes

For consumers with manual partition assignment, await metadata when there are no ready nodes to avoid busy polling.

Author: Rajini Sivaram <ra...@googlemail.com>

Reviewers: Jason Gustafson <ja...@confluent.io>, Ismael Juma <is...@juma.me.uk>

Closes #3124 from rajinisivaram/KAFKA-5263


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/64fc1a7c
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/64fc1a7c
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/64fc1a7c

Branch: refs/heads/trunk
Commit: 64fc1a7cae348fad10e84c5ebc457c2a391573ee
Parents: a10990f
Author: Rajini Sivaram <ra...@googlemail.com>
Authored: Thu May 25 11:23:18 2017 +0100
Committer: Rajini Sivaram <ra...@googlemail.com>
Committed: Thu May 25 11:23:18 2017 +0100

----------------------------------------------------------------------
 .../kafka/clients/ClusterConnectionStates.java  | 12 ++++++
 .../org/apache/kafka/clients/KafkaClient.java   |  5 +++
 .../org/apache/kafka/clients/NetworkClient.java |  5 +++
 .../kafka/clients/consumer/KafkaConsumer.java   |  2 +-
 .../consumer/internals/ConsumerCoordinator.java | 42 ++++++++++++++------
 .../internals/ConsumerNetworkClient.java        | 10 +++--
 .../org/apache/kafka/clients/MockClient.java    |  5 +++
 .../internals/ConsumerCoordinatorTest.java      | 32 +++++++--------
 8 files changed, 79 insertions(+), 34 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/64fc1a7c/clients/src/main/java/org/apache/kafka/clients/ClusterConnectionStates.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/ClusterConnectionStates.java b/clients/src/main/java/org/apache/kafka/clients/ClusterConnectionStates.java
index 32a222b..4d4bedd 100644
--- a/clients/src/main/java/org/apache/kafka/clients/ClusterConnectionStates.java
+++ b/clients/src/main/java/org/apache/kafka/clients/ClusterConnectionStates.java
@@ -145,6 +145,18 @@ final class ClusterConnectionStates {
     }
 
     /**
+     * Return true if there is at least one node with connection in ready state and false otherwise.
+     */
+    public boolean hasReadyNodes() {
+        for (Map.Entry<String, NodeConnectionState> entry : nodeState.entrySet()) {
+            NodeConnectionState state = entry.getValue();
+            if (state != null && state.state == ConnectionState.READY)
+                return true;
+        }
+        return false;
+    }
+
+    /**
      * Return true if the connection has been disconnected
      * @param id The id of the node to check
      */

http://git-wip-us.apache.org/repos/asf/kafka/blob/64fc1a7c/clients/src/main/java/org/apache/kafka/clients/KafkaClient.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/KafkaClient.java b/clients/src/main/java/org/apache/kafka/clients/KafkaClient.java
index 9d63d43..d563fa0 100644
--- a/clients/src/main/java/org/apache/kafka/clients/KafkaClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/KafkaClient.java
@@ -125,6 +125,11 @@ public interface KafkaClient extends Closeable {
     boolean hasInFlightRequests(String nodeId);
 
     /**
+     * Return true if there is at least one node with connection in ready state and false otherwise.
+     */
+    boolean hasReadyNodes();
+
+    /**
      * Wake up the client if it is currently blocked waiting for I/O
      */
     void wakeup();

http://git-wip-us.apache.org/repos/asf/kafka/blob/64fc1a7c/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java b/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
index 8708218..93fbb85 100644
--- a/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
@@ -417,6 +417,11 @@ public class NetworkClient implements KafkaClient {
         return this.inFlightRequests.isEmpty(node);
     }
 
+    @Override
+    public boolean hasReadyNodes() {
+        return connectionStates.hasReadyNodes();
+    }
+
     /**
      * Interrupt the client if it is blocked waiting on I/O.
      */

http://git-wip-us.apache.org/repos/asf/kafka/blob/64fc1a7c/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
index 3c6d409..6489792 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
@@ -1043,7 +1043,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
      */
     private Map<TopicPartition, List<ConsumerRecord<K, V>>> pollOnce(long timeout) {
         client.maybeTriggerWakeup();
-        coordinator.poll(time.milliseconds());
+        coordinator.poll(time.milliseconds(), timeout);
 
         // fetch positions if we have partitions we're subscribed to that we
         // don't know the offset for

http://git-wip-us.apache.org/repos/asf/kafka/blob/64fc1a7c/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
index ca2108d..b35a571 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
@@ -278,23 +278,39 @@ public final class ConsumerCoordinator extends AbstractCoordinator {
      *
      * @param now current time in milliseconds
      */
-    public void poll(long now) {
+    public void poll(long now, long remainingMs) {
         invokeCompletedOffsetCommitCallbacks();
 
-        if (subscriptions.partitionsAutoAssigned() && coordinatorUnknown()) {
-            ensureCoordinatorReady();
-            now = time.milliseconds();
-        }
+        if (subscriptions.partitionsAutoAssigned()) {
+            if (coordinatorUnknown()) {
+                ensureCoordinatorReady();
+                now = time.milliseconds();
+            }
 
-        if (needRejoin()) {
-            // due to a race condition between the initial metadata fetch and the initial rebalance,
-            // we need to ensure that the metadata is fresh before joining initially. This ensures
-            // that we have matched the pattern against the cluster's topics at least once before joining.
-            if (subscriptions.hasPatternSubscription())
-                client.ensureFreshMetadata();
+            if (needRejoin()) {
+                // due to a race condition between the initial metadata fetch and the initial rebalance,
+                // we need to ensure that the metadata is fresh before joining initially. This ensures
+                // that we have matched the pattern against the cluster's topics at least once before joining.
+                if (subscriptions.hasPatternSubscription())
+                    client.ensureFreshMetadata();
 
-            ensureActiveGroup();
-            now = time.milliseconds();
+                ensureActiveGroup();
+                now = time.milliseconds();
+            }
+        } else {
+            // For manually assigned partitions, if there are no ready nodes, await metadata.
+            // If connections to all nodes fail, wakeups triggered while attempting to send fetch
+            // requests result in polls returning immediately, causing a tight loop of polls. Without
+            // the wakeup, poll() with no channels would block for the timeout, delaying re-connection.
+            // awaitMetadataUpdate() initiates new connections with configured backoff and avoids the busy loop.
+            // When group management is used, metadata wait is already performed for this scenario as
+            // coordinator is unknown, hence this check is not required.
+            if (metadata.updateRequested() && !client.hasReadyNodes()) {
+                boolean metadataUpdated = client.awaitMetadataUpdate(remainingMs);
+                if (!metadataUpdated && !client.hasReadyNodes())
+                    return;
+                now = time.milliseconds();
+            }
         }
 
         pollHeartbeat(now);

http://git-wip-us.apache.org/repos/asf/kafka/blob/64fc1a7c/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java
index 92d049a..84e9a81 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java
@@ -107,10 +107,12 @@ public class ConsumerNetworkClient implements Closeable {
         return completionHandler.future;
     }
 
-    public Node leastLoadedNode() {
-        synchronized (this) {
-            return client.leastLoadedNode(time.milliseconds());
-        }
+    public synchronized Node leastLoadedNode() {
+        return client.leastLoadedNode(time.milliseconds());
+    }
+
+    public synchronized boolean hasReadyNodes() {
+        return client.hasReadyNodes();
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/kafka/blob/64fc1a7c/clients/src/test/java/org/apache/kafka/clients/MockClient.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/MockClient.java b/clients/src/test/java/org/apache/kafka/clients/MockClient.java
index 8fff3cc..9ca95e3 100644
--- a/clients/src/test/java/org/apache/kafka/clients/MockClient.java
+++ b/clients/src/test/java/org/apache/kafka/clients/MockClient.java
@@ -318,6 +318,11 @@ public class MockClient implements KafkaClient {
     }
 
     @Override
+    public boolean hasReadyNodes() {
+        return !ready.isEmpty();
+    }
+
+    @Override
     public ClientRequest newClientRequest(String nodeId, AbstractRequest.Builder<?> requestBuilder, long createdTimeMs,
                                           boolean expectResponse) {
         return newClientRequest(nodeId, requestBuilder, createdTimeMs, expectResponse, null);

http://git-wip-us.apache.org/repos/asf/kafka/blob/64fc1a7c/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
index 8b582ca..770d4f7 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
@@ -170,7 +170,7 @@ public class ConsumerCoordinatorTest {
 
         client.prepareResponse(joinGroupLeaderResponse(0, "memberId", Collections.<String, List<String>>emptyMap(),
                 Errors.GROUP_AUTHORIZATION_FAILED));
-        coordinator.poll(time.milliseconds());
+        coordinator.poll(time.milliseconds(), Long.MAX_VALUE);
     }
 
     @Test
@@ -299,7 +299,7 @@ public class ConsumerCoordinatorTest {
 
         client.prepareResponse(joinGroupLeaderResponse(0, consumerId, Collections.<String, List<String>>emptyMap(),
                 Errors.INVALID_GROUP_ID));
-        coordinator.poll(time.milliseconds());
+        coordinator.poll(time.milliseconds(), Long.MAX_VALUE);
     }
 
     @Test
@@ -329,7 +329,7 @@ public class ConsumerCoordinatorTest {
                         sync.groupAssignment().containsKey(consumerId);
             }
         }, syncGroupResponse(singletonList(t1p), Errors.NONE));
-        coordinator.poll(time.milliseconds());
+        coordinator.poll(time.milliseconds(), Long.MAX_VALUE);
 
         assertFalse(coordinator.needRejoin());
         assertEquals(singleton(t1p), subscriptions.assignedPartitions());
@@ -371,7 +371,7 @@ public class ConsumerCoordinatorTest {
         // expect client to force updating the metadata, if yes gives it both topics
         client.prepareMetadataUpdate(cluster, Collections.<String>emptySet());
 
-        coordinator.poll(time.milliseconds());
+        coordinator.poll(time.milliseconds(), Long.MAX_VALUE);
 
         assertFalse(coordinator.needRejoin());
         assertEquals(2, subscriptions.assignedPartitions().size());
@@ -434,7 +434,7 @@ public class ConsumerCoordinatorTest {
         }, joinGroupLeaderResponse(2, consumerId, updatedSubscriptions, Errors.NONE));
         client.prepareResponse(syncGroupResponse(newAssignment, Errors.NONE));
 
-        coordinator.poll(time.milliseconds());
+        coordinator.poll(time.milliseconds(), Long.MAX_VALUE);
 
         assertFalse(coordinator.needRejoin());
         assertEquals(updatedSubscriptionSet, subscriptions.subscription());
@@ -466,14 +466,14 @@ public class ConsumerCoordinatorTest {
         consumerClient.wakeup();
 
         try {
-            coordinator.poll(time.milliseconds());
+            coordinator.poll(time.milliseconds(), Long.MAX_VALUE);
         } catch (WakeupException e) {
             // ignore
         }
 
         // now complete the second half
         client.prepareResponse(syncGroupResponse(singletonList(t1p), Errors.NONE));
-        coordinator.poll(time.milliseconds());
+        coordinator.poll(time.milliseconds(), Long.MAX_VALUE);
 
         assertFalse(coordinator.needRejoin());
         assertEquals(singleton(t1p), subscriptions.assignedPartitions());
@@ -726,7 +726,7 @@ public class ConsumerCoordinatorTest {
         client.prepareResponse(joinGroupLeaderResponse(1, consumerId, memberSubscriptions, Errors.NONE));
         client.prepareResponse(syncGroupResponse(singletonList(t1p), Errors.NONE));
 
-        coordinator.poll(time.milliseconds());
+        coordinator.poll(time.milliseconds(), Long.MAX_VALUE);
 
         assertFalse(coordinator.needRejoin());
 
@@ -783,7 +783,7 @@ public class ConsumerCoordinatorTest {
         client.prepareResponse(joinGroupLeaderResponse(2, consumerId, memberSubscriptions, Errors.NONE));
         client.prepareResponse(syncGroupResponse(Arrays.asList(tp1, tp2), Errors.NONE));
 
-        coordinator.poll(time.milliseconds());
+        coordinator.poll(time.milliseconds(), Long.MAX_VALUE);
 
         assertFalse(coordinator.needRejoin());
         assertEquals(new HashSet<>(Arrays.asList(tp1, tp2)), subscriptions.assignedPartitions());
@@ -830,7 +830,7 @@ public class ConsumerCoordinatorTest {
 
         client.prepareResponse(joinGroupLeaderResponse(1, consumerId, memberSubscriptions, Errors.NONE));
         client.prepareResponse(syncGroupResponse(Collections.<TopicPartition>emptyList(), Errors.NONE));
-        coordinator.poll(time.milliseconds());
+        coordinator.poll(time.milliseconds(), Long.MAX_VALUE);
         if (!assign) {
             assertFalse(coordinator.needRejoin());
             assertEquals(Collections.<TopicPartition>emptySet(), rebalanceListener.assigned);
@@ -841,7 +841,7 @@ public class ConsumerCoordinatorTest {
         client.poll(0, time.milliseconds());
         client.prepareResponse(joinGroupLeaderResponse(2, consumerId, memberSubscriptions, Errors.NONE));
         client.prepareResponse(syncGroupResponse(singletonList(t1p), Errors.NONE));
-        coordinator.poll(time.milliseconds());
+        coordinator.poll(time.milliseconds(), Long.MAX_VALUE);
 
         assertFalse("Metadata refresh requested unnecessarily", metadata.updateRequested());
         if (!assign) {
@@ -970,7 +970,7 @@ public class ConsumerCoordinatorTest {
 
         client.prepareResponse(offsetCommitResponse(Collections.singletonMap(t1p, Errors.NONE)));
         time.sleep(autoCommitIntervalMs);
-        coordinator.poll(time.milliseconds());
+        coordinator.poll(time.milliseconds(), Long.MAX_VALUE);
 
         assertEquals(100L, subscriptions.committed(t1p).offset());
     }
@@ -999,7 +999,7 @@ public class ConsumerCoordinatorTest {
 
         client.prepareResponse(offsetCommitResponse(Collections.singletonMap(t1p, Errors.NONE)));
         time.sleep(autoCommitIntervalMs);
-        coordinator.poll(time.milliseconds());
+        coordinator.poll(time.milliseconds(), Long.MAX_VALUE);
 
         assertEquals(100L, subscriptions.committed(t1p).offset());
     }
@@ -1017,7 +1017,7 @@ public class ConsumerCoordinatorTest {
 
         client.prepareResponse(offsetCommitResponse(Collections.singletonMap(t1p, Errors.NONE)));
         time.sleep(autoCommitIntervalMs);
-        coordinator.poll(time.milliseconds());
+        coordinator.poll(time.milliseconds(), Long.MAX_VALUE);
 
         assertEquals(100L, subscriptions.committed(t1p).offset());
     }
@@ -1044,7 +1044,7 @@ public class ConsumerCoordinatorTest {
         // sleep only for the retry backoff
         time.sleep(retryBackoffMs);
         client.prepareResponse(offsetCommitResponse(Collections.singletonMap(t1p, Errors.NONE)));
-        coordinator.poll(time.milliseconds());
+        coordinator.poll(time.milliseconds(), Long.MAX_VALUE);
 
         assertEquals(100L, subscriptions.committed(t1p).offset());
     }
@@ -1508,7 +1508,7 @@ public class ConsumerCoordinatorTest {
             subscriptions.assignFromUser(singleton(t1p));
 
         subscriptions.seek(t1p, 100);
-        coordinator.poll(time.milliseconds());
+        coordinator.poll(time.milliseconds(), Long.MAX_VALUE);
 
         return coordinator;
     }