You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by rs...@apache.org on 2018/03/08 11:00:25 UTC

[kafka] branch trunk updated: MINOR: Skip sending fetches/offset lookups when awaiting the reconnect backoff (#4644)

This is an automated email from the ASF dual-hosted git repository.

rsivaram pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 925d6a2  MINOR: Skip sending fetches/offset lookups when awaiting the reconnect backoff (#4644)
925d6a2 is described below

commit 925d6a2ef363ee540e65f7ec3b699dce05aa591c
Author: Jason Gustafson <ja...@confluent.io>
AuthorDate: Thu Mar 8 03:00:20 2018 -0800

    MINOR: Skip sending fetches/offset lookups when awaiting the reconnect backoff (#4644)
    
    Logging can get spammy during the reconnect blackout period because any requests we send to ConsumerNetworkClient will immediately be failed when poll() returns. This patch checks for connection failures prior to sending fetches and offset lookups and skips sending to any failed nodes. Test cases added for both.
    
    Reviewers: Rajini Sivaram <ra...@googlemail.com>
---
 .../kafka/clients/ClusterConnectionStates.java     |  1 +
 .../org/apache/kafka/clients/NetworkClient.java    | 11 +++----
 .../consumer/internals/AbstractCoordinator.java    |  4 +--
 .../consumer/internals/ConsumerNetworkClient.java  | 31 +++++++++++++------
 .../kafka/clients/consumer/internals/Fetcher.java  | 29 ++++++++++++-----
 .../kafka/common/requests/FetchResponse.java       |  2 +-
 .../kafka/clients/ClusterConnectionStatesTest.java |  4 ++-
 .../java/org/apache/kafka/clients/MockClient.java  | 14 ++++++---
 .../clients/consumer/internals/FetcherTest.java    | 36 ++++++++++++++++++++++
 9 files changed, 100 insertions(+), 32 deletions(-)

diff --git a/clients/src/main/java/org/apache/kafka/clients/ClusterConnectionStates.java b/clients/src/main/java/org/apache/kafka/clients/ClusterConnectionStates.java
index 89e958d..7abcf12 100644
--- a/clients/src/main/java/org/apache/kafka/clients/ClusterConnectionStates.java
+++ b/clients/src/main/java/org/apache/kafka/clients/ClusterConnectionStates.java
@@ -144,6 +144,7 @@ final class ClusterConnectionStates {
     public void ready(String id) {
         NodeConnectionState nodeState = nodeState(id);
         nodeState.state = ConnectionState.READY;
+        nodeState.authenticationException = null;
         resetReconnectBackoff(nodeState);
     }
 
diff --git a/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java b/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
index 4b291b5..3a9cbc4 100644
--- a/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
@@ -603,11 +603,13 @@ public class NetworkClient implements KafkaClient {
         nodesNeedingApiVersionsFetch.remove(nodeId);
         switch (disconnectState.state()) {
             case AUTHENTICATION_FAILED:
-                connectionStates.authenticationFailed(nodeId, now, disconnectState.exception());
-                log.error("Connection to node {} failed authentication due to: {}", nodeId, disconnectState.exception().getMessage());
+                AuthenticationException exception = disconnectState.exception();
+                connectionStates.authenticationFailed(nodeId, now, exception);
+                metadataUpdater.handleAuthenticationFailure(exception);
+                log.error("Connection to node {} failed authentication due to: {}", nodeId, exception.getMessage());
                 break;
             case AUTHENTICATE:
-                // This warning applies to older brokers which dont provide feedback on authentication failures
+                // This warning applies to older brokers which don't provide feedback on authentication failures
                 log.warn("Connection to node {} terminated during authentication. This may indicate " +
                         "that authentication failed due to invalid credentials.", nodeId);
                 break;
@@ -625,9 +627,6 @@ public class NetworkClient implements KafkaClient {
             else if (request.header.apiKey() == ApiKeys.METADATA)
                 metadataUpdater.handleDisconnection(request.destination);
         }
-        AuthenticationException authenticationException = connectionStates.authenticationException(nodeId);
-        if (authenticationException != null)
-            metadataUpdater.handleAuthenticationFailure(authenticationException);
     }
 
     /**
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
index c3edaa7..dd4bb70 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
@@ -228,7 +228,7 @@ public abstract class AbstractCoordinator implements Closeable {
                     client.awaitMetadataUpdate(remainingMs);
                 } else
                     throw future.exception();
-            } else if (coordinator != null && client.connectionFailed(coordinator)) {
+            } else if (coordinator != null && client.isUnavailable(coordinator)) {
                 // we found the coordinator, but the connection has failed, so mark
                 // it dead and backoff before retrying discovery
                 markCoordinatorUnknown();
@@ -637,7 +637,7 @@ public abstract class AbstractCoordinator implements Closeable {
      * @return the current coordinator or null if it is unknown
      */
     protected synchronized Node checkAndGetCoordinator() {
-        if (coordinator != null && client.connectionFailed(coordinator)) {
+        if (coordinator != null && client.isUnavailable(coordinator)) {
             markCoordinatorUnknown(true);
             return null;
         }
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java
index fb393a5..e1598fa 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java
@@ -28,7 +28,6 @@ import org.apache.kafka.common.errors.InterruptException;
 import org.apache.kafka.common.errors.TimeoutException;
 import org.apache.kafka.common.errors.WakeupException;
 import org.apache.kafka.common.requests.AbstractRequest;
-import org.apache.kafka.common.requests.RequestHeader;
 import org.apache.kafka.common.utils.LogContext;
 import org.apache.kafka.common.utils.Time;
 import org.slf4j.Logger;
@@ -510,15 +509,29 @@ public class ConsumerNetworkClient implements Closeable {
         }
     }
 
+
+    /**
+     * Check if the code is disconnected and unavailable for immediate reconnection (i.e. if it is in
+     * reconnect backoff window following the disconnect).
+     */
+    public boolean isUnavailable(Node node) {
+        lock.lock();
+        try {
+            return client.connectionFailed(node) && client.connectionDelay(node, time.milliseconds()) > 0;
+        } finally {
+            lock.unlock();
+        }
+    }
+
     /**
-     * Find whether a previous connection has failed. Note that the failure state will persist until either
-     * {@link #tryConnect(Node)} or {@link #send(Node, AbstractRequest.Builder)} has been called.
-     * @param node Node to connect to if possible
+     * Check for an authentication error on a given node and raise the exception if there is one.
      */
-    public boolean connectionFailed(Node node) {
+    public void maybeThrowAuthFailure(Node node) {
         lock.lock();
         try {
-            return client.connectionFailed(node);
+            AuthenticationException exception = client.authenticationException(node);
+            if (exception != null)
+                throw exception;
         } finally {
             lock.unlock();
         }
@@ -552,10 +565,8 @@ public class ConsumerNetworkClient implements Closeable {
             if (e != null) {
                 future.raise(e);
             } else if (response.wasDisconnected()) {
-                RequestHeader requestHeader = response.requestHeader();
-                int correlation = requestHeader.correlationId();
-                log.debug("Cancelled {} request {} with correlation id {} due to node {} being disconnected",
-                        requestHeader.apiKey(), requestHeader, correlation, response.destination());
+                log.debug("Cancelled request with header {} due to node {} being disconnected",
+                        response.requestHeader(), response.destination());
                 future.raise(DisconnectException.INSTANCE);
             } else if (response.versionMismatch() != null) {
                 future.raise(response.versionMismatch());
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
index b46a3a2..6d8fb6c 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
@@ -576,10 +576,11 @@ public class Fetcher<K, V> implements SubscriptionState.Listener, Closeable {
 
         Map<Node, Map<TopicPartition, Long>> timestampsToSearchByNode = groupListOffsetRequests(partitionResetTimestamps);
         for (Map.Entry<Node, Map<TopicPartition, Long>> entry : timestampsToSearchByNode.entrySet()) {
+            Node node = entry.getKey();
             final Map<TopicPartition, Long> resetTimestamps = entry.getValue();
             subscriptions.setResetPending(resetTimestamps.keySet(), time.milliseconds() + requestTimeoutMs);
 
-            RequestFuture<ListOffsetResult> future = sendListOffsetRequest(entry.getKey(), resetTimestamps, false);
+            RequestFuture<ListOffsetResult> future = sendListOffsetRequest(node, resetTimestamps, false);
             future.addListener(new RequestFutureListener<ListOffsetResult>() {
                 @Override
                 public void onSuccess(ListOffsetResult result) {
@@ -667,11 +668,19 @@ public class Fetcher<K, V> implements SubscriptionState.Listener, Closeable {
             PartitionInfo info = metadata.fetch().partition(tp);
             if (info == null) {
                 metadata.add(tp.topic());
-                log.debug("Partition {} is unknown for fetching offset", tp);
+                log.debug("Leader for partition {} is unknown for fetching offset", tp);
                 metadata.requestUpdate();
             } else if (info.leader() == null) {
-                log.debug("Leader for partition {} unavailable for fetching offset", tp);
+                log.debug("Leader for partition {} is unavailable for fetching offset", tp);
                 metadata.requestUpdate();
+            } else if (client.isUnavailable(info.leader())) {
+                client.maybeThrowAuthFailure(info.leader());
+
+                // The connection has failed and we need to await the blackout period before we can
+                // try again. No need to request a metadata update since the disconnect will have
+                // done so already.
+                log.debug("Leader {} for partition {} is unavailable for fetching offset until reconnect backoff expires",
+                        info.leader(), tp);
             } else {
                 Node node = info.leader();
                 Map<TopicPartition, Long> topicData = timestampsToSearchByNode.get(node);
@@ -823,14 +832,21 @@ public class Fetcher<K, V> implements SubscriptionState.Listener, Closeable {
      * that have no existing requests in flight.
      */
     private Map<Node, FetchSessionHandler.FetchRequestData> prepareFetchRequests() {
-        // create the fetch info
         Cluster cluster = metadata.fetch();
         Map<Node, FetchSessionHandler.Builder> fetchable = new LinkedHashMap<>();
         for (TopicPartition partition : fetchablePartitions()) {
             Node node = cluster.leaderFor(partition);
             if (node == null) {
                 metadata.requestUpdate();
-            } else if (!this.client.hasPendingRequests(node)) {
+            } else if (client.isUnavailable(node)) {
+                client.maybeThrowAuthFailure(node);
+
+                // If we try to send during the reconnect blackout window, then the request is just
+                // going to be failed anyway before being sent, so skip the send for now
+                log.trace("Skipping fetch for partition {} because node {} is awaiting reconnect backoff", partition, node);
+            } else if (client.hasPendingRequests(node)) {
+                log.trace("Skipping fetch for partition {} because there is an in-flight request to {}", partition, node);
+            } else {
                 // if there is a leader and no in-flight requests, issue a new fetch
                 FetchSessionHandler.Builder builder = fetchable.get(node);
                 if (builder == null) {
@@ -846,10 +862,9 @@ public class Fetcher<K, V> implements SubscriptionState.Listener, Closeable {
                 long position = this.subscriptions.position(partition);
                 builder.add(partition, new FetchRequest.PartitionData(position, FetchRequest.INVALID_LOG_START_OFFSET,
                     this.fetchSize));
+
                 log.debug("Added {} fetch request for partition {} at offset {} to node {}", isolationLevel,
                     partition, position, node);
-            } else {
-                log.trace("Skipping fetch for partition {} because there is an in-flight request to {}", partition, node);
             }
         }
         Map<Node, FetchSessionHandler.FetchRequestData> reqs = new LinkedHashMap<>();
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java
index ed0f5a3..d0380f1 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java
@@ -152,7 +152,7 @@ public class FetchResponse extends AbstractResponse {
      */
     private static final Schema FETCH_RESPONSE_V6 = FETCH_RESPONSE_V5;
 
-    // FETCH_REESPONSE_V7 added incremental fetch responses and a top-level error code.
+    // FETCH_RESPONSE_V7 added incremental fetch responses and a top-level error code.
     public static final Field.Int32 SESSION_ID = new Field.Int32("session_id", "The fetch session ID");
 
     private static final Schema FETCH_RESPONSE_V7 = new Schema(
diff --git a/clients/src/test/java/org/apache/kafka/clients/ClusterConnectionStatesTest.java b/clients/src/test/java/org/apache/kafka/clients/ClusterConnectionStatesTest.java
index 3b246f4..03f4d02 100644
--- a/clients/src/test/java/org/apache/kafka/clients/ClusterConnectionStatesTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/ClusterConnectionStatesTest.java
@@ -19,6 +19,7 @@ package org.apache.kafka.clients;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 
 import org.apache.kafka.common.errors.AuthenticationException;
@@ -141,9 +142,10 @@ public class ClusterConnectionStatesTest {
         time.sleep(connectionStates.connectionDelay(nodeId1, time.milliseconds()) + 1);
 
         assertTrue(connectionStates.canConnect(nodeId1, time.milliseconds()));
+        connectionStates.ready(nodeId1);
+        assertNull(connectionStates.authenticationException(nodeId1));
     }
 
-
     @Test
     public void testRemoveNode() {
         connectionStates.connecting(nodeId1, time.milliseconds());
diff --git a/clients/src/test/java/org/apache/kafka/clients/MockClient.java b/clients/src/test/java/org/apache/kafka/clients/MockClient.java
index faf9240..60af9bc 100644
--- a/clients/src/test/java/org/apache/kafka/clients/MockClient.java
+++ b/clients/src/test/java/org/apache/kafka/clients/MockClient.java
@@ -79,7 +79,7 @@ public class MockClient implements KafkaClient {
     private Node node = null;
     private final Set<String> ready = new HashSet<>();
     private final Map<Node, Long> blackedOut = new HashMap<>();
-    private final Map<Node, AuthenticationException> authenticationException = new HashMap<>();
+    private final Map<Node, AuthenticationException> authenticationErrors = new HashMap<>();
     // Use concurrent queue for requests so that requests may be queried from a different thread
     private final Queue<ClientRequest> requests = new ConcurrentLinkedDeque<>();
     // Use concurrent queue for responses so that responses may be updated during poll() from a different thread.
@@ -106,14 +106,18 @@ public class MockClient implements KafkaClient {
 
     @Override
     public boolean ready(Node node, long now) {
-        if (isBlackedOut(node) || authenticationException(node) != null)
+        if (isBlackedOut(node))
             return false;
+        authenticationErrors.remove(node);
         ready.add(node.idString());
         return true;
     }
 
     @Override
     public long connectionDelay(Node node, long now) {
+        Long blackoutExpiration = blackedOut.get(node);
+        if (blackoutExpiration != null)
+            return Math.max(0, blackoutExpiration - now);
         return 0;
     }
 
@@ -122,7 +126,7 @@ public class MockClient implements KafkaClient {
     }
 
     public void authenticationFailed(Node node, long duration) {
-        authenticationException.put(node, (AuthenticationException) Errors.SASL_AUTHENTICATION_FAILED.exception());
+        authenticationErrors.put(node, (AuthenticationException) Errors.SASL_AUTHENTICATION_FAILED.exception());
         disconnect(node.idString());
         blackout(node, duration);
     }
@@ -147,7 +151,7 @@ public class MockClient implements KafkaClient {
 
     @Override
     public AuthenticationException authenticationException(Node node) {
-        return authenticationException.get(node);
+        return authenticationErrors.get(node);
     }
 
     @Override
@@ -389,7 +393,7 @@ public class MockClient implements KafkaClient {
         responses.clear();
         futureResponses.clear();
         metadataUpdates.clear();
-        authenticationException.clear();
+        authenticationErrors.clear();
     }
 
     public boolean hasPendingMetadataUpdates() {
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
index 27aba41..6de1186 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
@@ -199,6 +199,18 @@ public class FetcherTest {
     }
 
     @Test
+    public void testFetchSkipsBlackedOutNodes() {
+        subscriptions.assignFromUser(singleton(tp0));
+        subscriptions.seek(tp0, 0);
+
+        client.blackout(node, 500);
+        assertEquals(0, fetcher.sendFetches());
+
+        time.sleep(500);
+        assertEquals(1, fetcher.sendFetches());
+    }
+
+    @Test
     public void testFetcherIgnoresControlRecords() {
         subscriptions.assignFromUser(singleton(tp0));
         subscriptions.seek(tp0, 0);
@@ -962,6 +974,30 @@ public class FetcherTest {
     }
 
     @Test
+    public void testResetOffsetsSkipsBlackedOutConnections() {
+        subscriptions.assignFromUser(singleton(tp0));
+        subscriptions.requestOffsetReset(tp0, OffsetResetStrategy.EARLIEST);
+
+        // Check that we skip sending the ListOffset request when the node is blacked out
+        client.blackout(node, 500);
+        fetcher.resetOffsetsIfNeeded();
+        assertEquals(0, consumerClient.pendingRequestCount());
+        consumerClient.pollNoWakeup();
+        assertTrue(subscriptions.isOffsetResetNeeded(tp0));
+        assertEquals(OffsetResetStrategy.EARLIEST, subscriptions.resetStrategy(tp0));
+
+        time.sleep(500);
+        client.prepareResponse(listOffsetRequestMatcher(ListOffsetRequest.EARLIEST_TIMESTAMP),
+                listOffsetResponse(Errors.NONE, 1L, 5L));
+        fetcher.resetOffsetsIfNeeded();
+        consumerClient.pollNoWakeup();
+
+        assertFalse(subscriptions.isOffsetResetNeeded(tp0));
+        assertTrue(subscriptions.isFetchable(tp0));
+        assertEquals(5, subscriptions.position(tp0).longValue());
+    }
+
+    @Test
     public void testUpdateFetchPositionResetToEarliestOffset() {
         subscriptions.assignFromUser(singleton(tp0));
         subscriptions.requestOffsetReset(tp0, OffsetResetStrategy.EARLIEST);

-- 
To stop receiving notification emails like this one, please contact
rsivaram@apache.org.