You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jg...@apache.org on 2020/01/02 23:18:29 UTC
[kafka] branch 2.4 updated: KAFKA-8933;
Fix NPE in DefaultMetadataUpdater after authentication failure
(#7682)
This is an automated email from the ASF dual-hosted git repository.
jgus pushed a commit to branch 2.4
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/2.4 by this push:
new 455fbf3 KAFKA-8933; Fix NPE in DefaultMetadataUpdater after authentication failure (#7682)
455fbf3 is described below
commit 455fbf3928efd3363992f6acd18904c489e4f0c8
Author: Jason Gustafson <ja...@confluent.io>
AuthorDate: Wed Dec 4 13:12:17 2019 -0800
KAFKA-8933; Fix NPE in DefaultMetadataUpdater after authentication failure (#7682)
This patch fixes an NPE in `DefaultMetadataUpdater` due to an inconsistency in event expectations. Whenever there is an authentication failure, we were treating it as a failed update even if was from a separate connection from an inflight metadata request. This patch fixes the problem by making the `MetadataUpdater` api clearer in terms of the events that are handled.
Reviewers: Stanislav Kozlovski <st...@outlook.com>, Rajini Sivaram <ra...@googlemail.com>
---
.../kafka/clients/ManualMetadataUpdater.java | 23 ++---
.../java/org/apache/kafka/clients/Metadata.java | 17 +++-
.../org/apache/kafka/clients/MetadataUpdater.java | 29 +++---
.../org/apache/kafka/clients/NetworkClient.java | 112 ++++++++++-----------
.../admin/internals/AdminMetadataManager.java | 17 ++--
.../kafka/clients/consumer/KafkaConsumer.java | 2 +-
.../kafka/clients/producer/KafkaProducer.java | 2 +-
.../producer/internals/ProducerMetadata.java | 7 +-
.../org/apache/kafka/clients/MetadataTest.java | 19 +++-
.../apache/kafka/clients/NetworkClientTest.java | 77 +++++++++++++-
.../internals/ConsumerNetworkClientTest.java | 4 +-
.../clients/consumer/internals/FetcherTest.java | 2 +-
.../producer/internals/ProducerMetadataTest.java | 8 +-
.../ClientAuthenticationFailureTest.java | 40 +++-----
.../java/org/apache/kafka/test/MockSelector.java | 20 ++--
.../runtime/distributed/WorkerGroupMember.java | 2 +-
.../kafka/admin/BrokerApiVersionsCommand.scala | 2 +-
17 files changed, 219 insertions(+), 164 deletions(-)
diff --git a/clients/src/main/java/org/apache/kafka/clients/ManualMetadataUpdater.java b/clients/src/main/java/org/apache/kafka/clients/ManualMetadataUpdater.java
index 7fb0224..c1c1fba 100644
--- a/clients/src/main/java/org/apache/kafka/clients/ManualMetadataUpdater.java
+++ b/clients/src/main/java/org/apache/kafka/clients/ManualMetadataUpdater.java
@@ -18,13 +18,13 @@ package org.apache.kafka.clients;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.Node;
+import org.apache.kafka.common.errors.AuthenticationException;
import org.apache.kafka.common.requests.MetadataResponse;
import org.apache.kafka.common.requests.RequestHeader;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.List;
+import java.util.Optional;
/**
* A simple implementation of `MetadataUpdater` that returns the cluster nodes set via the constructor or via
@@ -36,9 +36,6 @@ import java.util.List;
* This class is not thread-safe!
*/
public class ManualMetadataUpdater implements MetadataUpdater {
-
- private static final Logger log = LoggerFactory.getLogger(ManualMetadataUpdater.class);
-
private List<Node> nodes;
public ManualMetadataUpdater() {
@@ -69,24 +66,18 @@ public class ManualMetadataUpdater implements MetadataUpdater {
}
@Override
- public void handleDisconnection(String destination) {
- // Do nothing
- }
-
- @Override
- public void handleFatalException(KafkaException exception) {
- // We don't fail the broker on failures, but there should be sufficient information in the logs indicating the reason
- // for failure.
- log.debug("An error occurred in broker-to-broker communication.", exception);
+ public void handleServerDisconnect(long now, String nodeId, Optional<AuthenticationException> maybeAuthException) {
+ // We don't fail the broker on failures. There should be sufficient information from
+ // the NetworkClient logs to indicate the reason for the failure.
}
@Override
- public void handleCompletedMetadataResponse(RequestHeader requestHeader, long now, MetadataResponse response) {
+ public void handleFailedRequest(long now, Optional<KafkaException> maybeFatalException) {
// Do nothing
}
@Override
- public void requestUpdate() {
+ public void handleSuccessfulResponse(RequestHeader requestHeader, long now, MetadataResponse response) {
// Do nothing
}
diff --git a/clients/src/main/java/org/apache/kafka/clients/Metadata.java b/clients/src/main/java/org/apache/kafka/clients/Metadata.java
index a3ecdf8..b6fdc06 100644
--- a/clients/src/main/java/org/apache/kafka/clients/Metadata.java
+++ b/clients/src/main/java/org/apache/kafka/clients/Metadata.java
@@ -209,10 +209,8 @@ public class Metadata implements Closeable {
}
}
- public synchronized void bootstrap(List<InetSocketAddress> addresses, long now) {
+ public synchronized void bootstrap(List<InetSocketAddress> addresses) {
this.needUpdate = true;
- this.lastRefreshMs = now;
- this.lastSuccessfulRefreshMs = now;
this.updateVersion += 1;
this.cache = MetadataCache.bootstrap(addresses);
}
@@ -441,9 +439,18 @@ public class Metadata implements Closeable {
* Record an attempt to update the metadata that failed. We need to keep track of this
* to avoid retrying immediately.
*/
- public synchronized void failedUpdate(long now, KafkaException fatalException) {
+ public synchronized void failedUpdate(long now) {
this.lastRefreshMs = now;
- this.fatalException = fatalException;
+ }
+
+ /**
+ * Propagate a fatal error which affects the ability to fetch metadata for the cluster.
+ * Two examples are authentication and unsupported version exceptions.
+ *
+ * @param exception The fatal exception
+ */
+ public synchronized void fatalError(KafkaException exception) {
+ this.fatalException = exception;
}
/**
diff --git a/clients/src/main/java/org/apache/kafka/clients/MetadataUpdater.java b/clients/src/main/java/org/apache/kafka/clients/MetadataUpdater.java
index e2261d5..77f3efa 100644
--- a/clients/src/main/java/org/apache/kafka/clients/MetadataUpdater.java
+++ b/clients/src/main/java/org/apache/kafka/clients/MetadataUpdater.java
@@ -18,11 +18,14 @@ package org.apache.kafka.clients;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.Node;
+import org.apache.kafka.common.errors.AuthenticationException;
+import org.apache.kafka.common.errors.UnsupportedVersionException;
import org.apache.kafka.common.requests.MetadataResponse;
import org.apache.kafka.common.requests.RequestHeader;
import java.io.Closeable;
import java.util.List;
+import java.util.Optional;
/**
* The interface used by `NetworkClient` to request cluster metadata info to be updated and to retrieve the cluster nodes
@@ -46,7 +49,7 @@ public interface MetadataUpdater extends Closeable {
* Starts a cluster metadata update if needed and possible. Returns the time until the metadata update (which would
* be 0 if an update has been started as a result of this call).
*
- * If the implementation relies on `NetworkClient` to send requests, `handleCompletedMetadataResponse` will be
+ * If the implementation relies on `NetworkClient` to send requests, `handleSuccessfulResponse` will be
* invoked after the metadata response is received.
*
* The semantics of `needed` and `possible` are implementation-dependent and may take into account a number of
@@ -55,20 +58,24 @@ public interface MetadataUpdater extends Closeable {
long maybeUpdate(long now);
/**
- * Handle disconnections for metadata requests.
+ * Handle a server disconnect.
*
* This provides a mechanism for the `MetadataUpdater` implementation to use the NetworkClient instance for its own
* requests with special handling for disconnections of such requests.
- * @param destination
+ *
+ * @param now Current time in milliseconds
+ * @param nodeId The id of the node that disconnected
+ * @param maybeAuthException Optional authentication error
*/
- void handleDisconnection(String destination);
+ void handleServerDisconnect(long now, String nodeId, Optional<AuthenticationException> maybeAuthException);
/**
- * Handle failure. Propagate the exception if awaiting metadata.
+ * Handle a metadata request failure.
*
- * @param fatalException exception corresponding to the failure
+ * @param now Current time in milliseconds
+ * @param maybeFatalException Optional fatal error (e.g. {@link UnsupportedVersionException})
*/
- void handleFatalException(KafkaException fatalException);
+ void handleFailedRequest(long now, Optional<KafkaException> maybeFatalException);
/**
* Handle responses for metadata requests.
@@ -76,13 +83,7 @@ public interface MetadataUpdater extends Closeable {
* This provides a mechanism for the `MetadataUpdater` implementation to use the NetworkClient instance for its own
* requests with special handling for completed receives of such requests.
*/
- void handleCompletedMetadataResponse(RequestHeader requestHeader, long now, MetadataResponse metadataResponse);
-
- /**
- * Schedules an update of the current cluster metadata info. A subsequent call to `maybeUpdate` would trigger the
- * start of the update if possible (see `maybeUpdate` for more information).
- */
- void requestUpdate();
+ void handleSuccessfulResponse(RequestHeader requestHeader, long now, MetadataResponse metadataResponse);
/**
* Close this updater.
diff --git a/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java b/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
index d782df8..3431b83 100644
--- a/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
@@ -30,8 +30,8 @@ import org.apache.kafka.common.network.NetworkReceive;
import org.apache.kafka.common.network.Selectable;
import org.apache.kafka.common.network.Send;
import org.apache.kafka.common.protocol.ApiKeys;
-import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.protocol.CommonFields;
+import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.protocol.types.Struct;
import org.apache.kafka.common.requests.AbstractRequest;
import org.apache.kafka.common.requests.AbstractResponse;
@@ -51,11 +51,13 @@ import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.util.ArrayList;
+import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
+import java.util.Optional;
import java.util.Random;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
@@ -308,24 +310,29 @@ public class NetworkClient implements KafkaClient {
return;
selector.close(nodeId);
- List<ApiKeys> requestTypes = new ArrayList<>();
long now = time.milliseconds();
- for (InFlightRequest request : inFlightRequests.clearAll(nodeId)) {
- if (request.isInternalRequest) {
- if (request.header.apiKey() == ApiKeys.METADATA) {
- metadataUpdater.handleDisconnection(request.destination);
- }
- } else {
- requestTypes.add(request.header.apiKey());
- abortedSends.add(new ClientResponse(request.header,
- request.callback, request.destination, request.createdTimeMs, now,
- true, null, null, null));
- }
- }
+
+ cancelInFlightRequests(nodeId, now, abortedSends);
+
connectionStates.disconnected(nodeId, now);
- if (log.isDebugEnabled()) {
- log.debug("Manually disconnected from {}. Removed requests: {}.", nodeId,
- Utils.join(requestTypes, ", "));
+
+ if (log.isTraceEnabled()) {
+ log.trace("Manually disconnected from {}. Aborted in-flight requests: {}.", nodeId, inFlightRequests);
+ }
+ }
+
+ private void cancelInFlightRequests(String nodeId, long now, Collection<ClientResponse> responses) {
+ Iterable<InFlightRequest> inFlightRequests = this.inFlightRequests.clearAll(nodeId);
+ for (InFlightRequest request : inFlightRequests) {
+ log.trace("Cancelled request {} {} with correlation id {} due to node {} being disconnected",
+ request.header.apiKey(), request.request, request.header.correlationId(), nodeId);
+
+ if (!request.isInternalRequest) {
+ if (responses != null)
+ responses.add(request.disconnected(now, null));
+ } else if (request.header.apiKey() == ApiKeys.METADATA) {
+ metadataUpdater.handleFailedRequest(now, Optional.empty());
+ }
}
}
@@ -339,9 +346,8 @@ public class NetworkClient implements KafkaClient {
@Override
public void close(String nodeId) {
selector.close(nodeId);
- for (InFlightRequest request : inFlightRequests.clearAll(nodeId))
- if (request.isInternalRequest && request.header.apiKey() == ApiKeys.METADATA)
- metadataUpdater.handleDisconnection(request.destination);
+ long now = time.milliseconds();
+ cancelInFlightRequests(nodeId, now, null);
connectionStates.remove(nodeId);
}
@@ -481,10 +487,11 @@ public class NetworkClient implements KafkaClient {
ClientResponse clientResponse = new ClientResponse(clientRequest.makeHeader(builder.latestAllowedVersion()),
clientRequest.callback(), clientRequest.destination(), now, now,
false, unsupportedVersionException, null, null);
- abortedSends.add(clientResponse);
- if (isInternalRequest && clientRequest.apiKey() == ApiKeys.METADATA)
- metadataUpdater.handleFatalException(unsupportedVersionException);
+ if (!isInternalRequest)
+ abortedSends.add(clientResponse);
+ else if (clientRequest.apiKey() == ApiKeys.METADATA)
+ metadataUpdater.handleFailedRequest(now, Optional.of(unsupportedVersionException));
}
}
@@ -735,7 +742,6 @@ public class NetworkClient implements KafkaClient {
case AUTHENTICATION_FAILED:
AuthenticationException exception = disconnectState.exception();
connectionStates.authenticationFailed(nodeId, now, exception);
- metadataUpdater.handleFatalException(exception);
log.error("Connection to node {} ({}) failed authentication due to: {}", nodeId,
disconnectState.remoteAddress(), exception.getMessage());
break;
@@ -752,14 +758,9 @@ public class NetworkClient implements KafkaClient {
default:
break; // Disconnections in other states are logged at debug level in Selector
}
- for (InFlightRequest request : this.inFlightRequests.clearAll(nodeId)) {
- log.trace("Cancelled request {} {} with correlation id {} due to node {} being disconnected",
- request.header.apiKey(), request.request, request.header.correlationId(), nodeId);
- if (!request.isInternalRequest)
- responses.add(request.disconnected(now, disconnectState.exception()));
- else if (request.header.apiKey() == ApiKeys.METADATA)
- metadataUpdater.handleDisconnection(request.destination);
- }
+
+ cancelInFlightRequests(nodeId, now, responses);
+ metadataUpdater.handleServerDisconnect(now, nodeId, Optional.ofNullable(disconnectState.exception()));
}
/**
@@ -777,10 +778,6 @@ public class NetworkClient implements KafkaClient {
log.debug("Disconnecting from node {} due to request timeout.", nodeId);
processDisconnection(responses, nodeId, now, ChannelState.LOCAL_CLOSE);
}
-
- // we disconnected, so we should probably refresh our metadata
- if (!nodeIds.isEmpty())
- metadataUpdater.requestUpdate();
}
private void handleAbortedSends(List<ClientResponse> responses) {
@@ -844,7 +841,7 @@ public class NetworkClient implements KafkaClient {
parseResponse(req.header.apiKey(), responseStruct, req.header.apiVersion());
maybeThrottle(body, req.header.apiVersion(), req.destination, now);
if (req.isInternalRequest && body instanceof MetadataResponse)
- metadataUpdater.handleCompletedMetadataResponse(req.header, now, (MetadataResponse) body);
+ metadataUpdater.handleSuccessfulResponse(req.header, now, (MetadataResponse) body);
else if (req.isInternalRequest && body instanceof ApiVersionsResponse)
handleApiVersionsResponse(responses, req, now, (ApiVersionsResponse) body);
else
@@ -894,9 +891,6 @@ public class NetworkClient implements KafkaClient {
log.debug("Node {} disconnected.", node);
processDisconnection(responses, node, now, entry.getValue());
}
- // we got a disconnect so we should probably refresh our metadata and see if that broker is dead
- if (this.selector.disconnected().size() > 0)
- metadataUpdater.requestUpdate();
}
/**
@@ -960,10 +954,10 @@ public class NetworkClient implements KafkaClient {
this.socketReceiveBuffer);
} catch (IOException e) {
log.warn("Error connecting to node {}", node, e);
- /* attempt failed, we'll try again after the backoff */
+ // Attempt failed, we'll try again after the backoff
connectionStates.disconnected(nodeConnectionId, now);
- /* maybe the problem is our metadata, update it */
- metadataUpdater.requestUpdate();
+ // Notify metadata updater of the connection failure
+ metadataUpdater.handleServerDisconnect(now, nodeConnectionId, Optional.empty());
}
}
@@ -1001,7 +995,6 @@ public class NetworkClient implements KafkaClient {
long waitForMetadataFetch = hasFetchInProgress() ? defaultRequestTimeoutMs : 0;
long metadataTimeout = Math.max(timeToNextMetadataUpdate, waitForMetadataFetch);
-
if (metadataTimeout > 0) {
return metadataTimeout;
}
@@ -1018,31 +1011,39 @@ public class NetworkClient implements KafkaClient {
}
@Override
- public void handleDisconnection(String destination) {
+ public void handleServerDisconnect(long now, String destinationId, Optional<AuthenticationException> maybeFatalException) {
Cluster cluster = metadata.fetch();
// 'processDisconnection' generates warnings for misconfigured bootstrap server configuration
// resulting in 'Connection Refused' and misconfigured security resulting in authentication failures.
// The warning below handles the case where a connection to a broker was established, but was disconnected
// before metadata could be obtained.
if (cluster.isBootstrapConfigured()) {
- int nodeId = Integer.parseInt(destination);
+ int nodeId = Integer.parseInt(destinationId);
Node node = cluster.nodeById(nodeId);
if (node != null)
log.warn("Bootstrap broker {} disconnected", node);
}
- inProgressRequestVersion = null;
+ // If we have a disconnect while an update is due, we treat it as a failed update
+ // so that we can backoff properly
+ if (isUpdateDue(now))
+ handleFailedRequest(now, Optional.empty());
+
+ maybeFatalException.ifPresent(metadata::fatalError);
+
+ // The disconnect may be the result of stale metadata, so request an update
+ metadata.requestUpdate();
}
@Override
- public void handleFatalException(KafkaException fatalException) {
- if (metadata.updateRequested())
- metadata.failedUpdate(time.milliseconds(), fatalException);
+ public void handleFailedRequest(long now, Optional<KafkaException> maybeFatalException) {
+ maybeFatalException.ifPresent(metadata::fatalError);
+ metadata.failedUpdate(now);
inProgressRequestVersion = null;
}
@Override
- public void handleCompletedMetadataResponse(RequestHeader requestHeader, long now, MetadataResponse response) {
+ public void handleSuccessfulResponse(RequestHeader requestHeader, long now, MetadataResponse response) {
// If any partition has leader with missing listeners, log up to ten of these partitions
// for diagnosing broker configuration issues.
// This could be a transient issue if listeners were added dynamically to brokers.
@@ -1066,7 +1067,7 @@ public class NetworkClient implements KafkaClient {
// created which means we will get errors and no nodes until it exists
if (response.brokers().isEmpty()) {
log.trace("Ignoring empty metadata response with correlation id {}.", requestHeader.correlationId());
- this.metadata.failedUpdate(now, null);
+ this.metadata.failedUpdate(now);
} else {
this.metadata.update(inProgressRequestVersion, response, now);
}
@@ -1075,11 +1076,6 @@ public class NetworkClient implements KafkaClient {
}
@Override
- public void requestUpdate() {
- this.metadata.requestUpdate();
- }
-
- @Override
public void close() {
this.metadata.close();
}
@@ -1104,10 +1100,10 @@ public class NetworkClient implements KafkaClient {
if (canSendRequest(nodeConnectionId, now)) {
Metadata.MetadataRequestAndVersion requestAndVersion = metadata.newMetadataRequestAndVersion();
- this.inProgressRequestVersion = requestAndVersion.requestVersion;
MetadataRequest.Builder metadataRequest = requestAndVersion.requestBuilder;
log.debug("Sending metadata request {} to node {}", metadataRequest, node);
sendInternalMetadataRequest(metadataRequest, nodeConnectionId, now);
+ this.inProgressRequestVersion = requestAndVersion.requestVersion;
return defaultRequestTimeoutMs;
}
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/internals/AdminMetadataManager.java b/clients/src/main/java/org/apache/kafka/clients/admin/internals/AdminMetadataManager.java
index b7080ac..6e83452 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/internals/AdminMetadataManager.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/internals/AdminMetadataManager.java
@@ -29,6 +29,7 @@ import org.slf4j.Logger;
import java.util.Collections;
import java.util.List;
+import java.util.Optional;
/**
* Manages the metadata for KafkaAdminClient.
@@ -100,23 +101,19 @@ public class AdminMetadataManager {
}
@Override
- public void handleDisconnection(String destination) {
- // Do nothing
- }
-
- @Override
- public void handleFatalException(KafkaException e) {
- updateFailed(e);
+ public void handleServerDisconnect(long now, String destinationId, Optional<AuthenticationException> maybeFatalException) {
+ maybeFatalException.ifPresent(AdminMetadataManager.this::updateFailed);
+ AdminMetadataManager.this.requestUpdate();
}
@Override
- public void handleCompletedMetadataResponse(RequestHeader requestHeader, long now, MetadataResponse metadataResponse) {
+ public void handleFailedRequest(long now, Optional<KafkaException> maybeFatalException) {
// Do nothing
}
@Override
- public void requestUpdate() {
- AdminMetadataManager.this.requestUpdate();
+ public void handleSuccessfulResponse(RequestHeader requestHeader, long now, MetadataResponse metadataResponse) {
+ // Do nothing
}
@Override
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
index 9d3cf6b..a431ab3 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
@@ -734,7 +734,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
subscriptions, logContext, clusterResourceListeners);
List<InetSocketAddress> addresses = ClientUtils.parseAndValidateAddresses(
config.getList(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG), config.getString(ConsumerConfig.CLIENT_DNS_LOOKUP_CONFIG));
- this.metadata.bootstrap(addresses, time.milliseconds());
+ this.metadata.bootstrap(addresses);
String metricGrpPrefix = "consumer";
FetcherMetricsRegistry metricsRegistry = new FetcherMetricsRegistry(Collections.singleton(CLIENT_ID_METRIC_TAG), metricGrpPrefix);
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
index 4b7f3d2..51e1a8b 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
@@ -415,7 +415,7 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
logContext,
clusterResourceListeners,
Time.SYSTEM);
- this.metadata.bootstrap(addresses, time.milliseconds());
+ this.metadata.bootstrap(addresses);
}
this.errors = this.metrics.sensor("errors");
this.sender = newSender(logContext, kafkaClient, this.metadata);
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerMetadata.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerMetadata.java
index ef53af4..da18189 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerMetadata.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerMetadata.java
@@ -111,10 +111,9 @@ public class ProducerMetadata extends Metadata {
}
@Override
- public synchronized void failedUpdate(long now, KafkaException fatalException) {
- super.failedUpdate(now, fatalException);
- if (fatalException != null)
- notifyAll();
+ public synchronized void fatalError(KafkaException fatalException) {
+ super.fatalError(fatalException);
+ notifyAll();
}
/**
diff --git a/clients/src/test/java/org/apache/kafka/clients/MetadataTest.java b/clients/src/test/java/org/apache/kafka/clients/MetadataTest.java
index 7bda534..f690dc6 100644
--- a/clients/src/test/java/org/apache/kafka/clients/MetadataTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/MetadataTest.java
@@ -118,6 +118,18 @@ public class MetadataTest {
}
@Test
+ public void testUpdateMetadataAllowedImmediatelyAfterBootstrap() {
+ MockTime time = new MockTime();
+
+ Metadata metadata = new Metadata(refreshBackoffMs, metadataExpireMs, new LogContext(),
+ new ClusterResourceListeners());
+ metadata.bootstrap(Collections.singletonList(new InetSocketAddress("localhost", 9002)));
+
+ assertEquals(0, metadata.timeToAllowUpdate(time.milliseconds()));
+ assertEquals(0, metadata.timeToNextUpdate(time.milliseconds()));
+ }
+
+ @Test
public void testTimeToNextUpdate() {
checkTimeToNextUpdate(100, 1000);
checkTimeToNextUpdate(1000, 100);
@@ -131,7 +143,7 @@ public class MetadataTest {
long now = 10000;
// lastRefreshMs updated to now.
- metadata.failedUpdate(now, null);
+ metadata.failedUpdate(now);
// Backing off. Remaining time until next try should be returned.
assertEquals(refreshBackoffMs, metadata.timeToNextUpdate(now));
@@ -259,7 +271,7 @@ public class MetadataTest {
metadata.update(emptyMetadataResponse(), time);
assertEquals(100, metadata.timeToNextUpdate(1000));
- metadata.failedUpdate(1100, null);
+ metadata.failedUpdate(1100);
assertEquals(100, metadata.timeToNextUpdate(1100));
assertEquals(100, metadata.lastSuccessfulUpdate());
@@ -270,14 +282,13 @@ public class MetadataTest {
@Test
public void testClusterListenerGetsNotifiedOfUpdate() {
- long time = 0;
MockClusterResourceListener mockClusterListener = new MockClusterResourceListener();
ClusterResourceListeners listeners = new ClusterResourceListeners();
listeners.maybeAdd(mockClusterListener);
metadata = new Metadata(refreshBackoffMs, metadataExpireMs, new LogContext(), listeners);
String hostName = "www.example.com";
- metadata.bootstrap(Collections.singletonList(new InetSocketAddress(hostName, 9002)), time);
+ metadata.bootstrap(Collections.singletonList(new InetSocketAddress(hostName, 9002)));
assertFalse("ClusterResourceListener should not called when metadata is updated with bootstrap Cluster",
MockClusterResourceListener.IS_ON_UPDATE_CALLED.get());
diff --git a/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java b/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java
index e198f6c..a4145d1 100644
--- a/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java
@@ -16,10 +16,13 @@
*/
package org.apache.kafka.clients;
+import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.AuthenticationException;
import org.apache.kafka.common.errors.UnsupportedVersionException;
+import org.apache.kafka.common.internals.ClusterResourceListeners;
import org.apache.kafka.common.message.ApiVersionsResponseData;
import org.apache.kafka.common.message.ApiVersionsResponseData.ApiVersionsResponseKey;
import org.apache.kafka.common.message.ApiVersionsResponseData.ApiVersionsResponseKeyCollection;
@@ -31,6 +34,7 @@ import org.apache.kafka.common.protocol.types.Struct;
import org.apache.kafka.common.record.MemoryRecords;
import org.apache.kafka.common.requests.ApiVersionsResponse;
import org.apache.kafka.common.requests.MetadataRequest;
+import org.apache.kafka.common.requests.MetadataResponse;
import org.apache.kafka.common.requests.ProduceRequest;
import org.apache.kafka.common.requests.RequestHeader;
import org.apache.kafka.common.requests.ResponseHeader;
@@ -48,11 +52,13 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
+import java.util.Optional;
import static org.apache.kafka.common.protocol.ApiKeys.PRODUCE;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotEquals;
+import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
@@ -83,6 +89,12 @@ public class NetworkClientTest {
ClientDnsLookup.DEFAULT, time, true, new ApiVersions(), new LogContext());
}
+ private NetworkClient createNetworkClientWithNoVersionDiscovery(Metadata metadata) {
+ return new NetworkClient(selector, metadata, "mock", Integer.MAX_VALUE,
+ reconnectBackoffMsTest, 0, 64 * 1024, 64 * 1024,
+ defaultRequestTimeoutMs, ClientDnsLookup.DEFAULT, time, false, new ApiVersions(), new LogContext());
+ }
+
private NetworkClient createNetworkClientWithNoVersionDiscovery() {
return new NetworkClient(selector, metadataUpdater, "mock", Integer.MAX_VALUE,
reconnectBackoffMsTest, reconnectBackoffMaxMsTest,
@@ -532,16 +544,19 @@ public class NetworkClientTest {
}
private int sendEmptyProduceRequest() {
+ return sendEmptyProduceRequest(node.idString());
+ }
+
+ private int sendEmptyProduceRequest(String nodeId) {
ProduceRequest.Builder builder = ProduceRequest.Builder.forCurrentMagic((short) 1, 1000,
Collections.emptyMap());
TestCallbackHandler handler = new TestCallbackHandler();
- ClientRequest request = client.newClientRequest(node.idString(), builder, time.milliseconds(), true,
+ ClientRequest request = client.newClientRequest(nodeId, builder, time.milliseconds(), true,
defaultRequestTimeoutMs, handler);
client.send(request, time.milliseconds());
return request.correlationId();
}
-
private void sendResponse(ResponseHeader respHeader, Struct response) {
Struct responseHeaderStruct = respHeader.toStruct();
int size = responseHeaderStruct.sizeOf() + response.sizeOf();
@@ -588,6 +603,49 @@ public class NetworkClientTest {
}
@Test
+ public void testAuthenticationFailureWithInFlightMetadataRequest() {
+ int refreshBackoffMs = 50;
+
+ MetadataResponse metadataResponse = TestUtils.metadataUpdateWith(2, Collections.emptyMap());
+ Metadata metadata = new Metadata(refreshBackoffMs, 5000, new LogContext(), new ClusterResourceListeners());
+ metadata.update(metadataResponse, time.milliseconds());
+
+ Cluster cluster = metadata.fetch();
+ Node node1 = cluster.nodes().get(0);
+ Node node2 = cluster.nodes().get(1);
+
+ NetworkClient client = createNetworkClientWithNoVersionDiscovery(metadata);
+
+ awaitReady(client, node1);
+
+ metadata.requestUpdate();
+ time.sleep(refreshBackoffMs);
+
+ client.poll(0, time.milliseconds());
+
+ Optional<Node> nodeWithPendingMetadataOpt = cluster.nodes().stream()
+ .filter(node -> client.hasInFlightRequests(node.idString()))
+ .findFirst();
+ assertEquals(Optional.of(node1), nodeWithPendingMetadataOpt);
+
+ assertFalse(client.ready(node2, time.milliseconds()));
+ selector.serverAuthenticationFailed(node2.idString());
+ client.poll(0, time.milliseconds());
+ assertNotNull(client.authenticationException(node2));
+
+ ByteBuffer requestBuffer = selector.completedSendBuffers().get(0).buffer();
+ RequestHeader header = parseHeader(requestBuffer);
+ assertEquals(ApiKeys.METADATA, header.apiKey());
+
+ ByteBuffer responseBuffer = metadataResponse.serialize(ApiKeys.METADATA, header.apiVersion(), header.correlationId());
+ selector.delayedReceive(new DelayedReceive(node1.idString(), new NetworkReceive(node1.idString(), responseBuffer)));
+
+ int initialUpdateVersion = metadata.updateVersion();
+ client.poll(0, time.milliseconds());
+ assertEquals(initialUpdateVersion + 1, metadata.updateVersion());
+ }
+
+ @Test
public void testLeastLoadedNodeConsidersThrottledConnections() {
client.ready(node, time.milliseconds());
awaitReady(client, node);
@@ -840,9 +898,18 @@ public class NetworkClientTest {
}
@Override
- public void handleFatalException(KafkaException exception) {
- failure = exception;
- super.handleFatalException(exception);
+ public void handleServerDisconnect(long now, String destinationId, Optional<AuthenticationException> maybeAuthException) {
+ maybeAuthException.ifPresent(exception -> {
+ failure = exception;
+ });
+ super.handleServerDisconnect(now, destinationId, maybeAuthException);
+ }
+
+ @Override
+ public void handleFailedRequest(long now, Optional<KafkaException> maybeFatalException) {
+ maybeFatalException.ifPresent(exception -> {
+ failure = exception;
+ });
}
public KafkaException getAndClearFailure() {
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClientTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClientTest.java
index 90b0fc5..c50bce4 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClientTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClientTest.java
@@ -235,7 +235,7 @@ public class ConsumerNetworkClientTest {
@Test
public void testAuthenticationExceptionPropagatedFromMetadata() {
- metadata.failedUpdate(time.milliseconds(), new AuthenticationException("Authentication failed"));
+ metadata.fatalError(new AuthenticationException("Authentication failed"));
try {
consumerClient.poll(time.timer(Duration.ZERO));
fail("Expected authentication error thrown");
@@ -264,7 +264,7 @@ public class ConsumerNetworkClientTest {
@Test
public void testMetadataFailurePropagated() {
KafkaException metadataException = new KafkaException();
- metadata.failedUpdate(time.milliseconds(), metadataException);
+ metadata.fatalError(metadataException);
try {
consumerClient.poll(time.timer(Duration.ZERO));
fail("Expected poll to throw exception");
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
index fd1be37..1aa7814 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
@@ -3338,7 +3338,7 @@ public class FetcherTest {
TopicPartition t2p0 = new TopicPartition(topicName2, 0);
// Expect a metadata refresh.
metadata.bootstrap(ClientUtils.parseAndValidateAddresses(Collections.singletonList("1.1.1.1:1111"),
- ClientDnsLookup.DEFAULT), time.milliseconds());
+ ClientDnsLookup.DEFAULT));
Map<String, Integer> partitionNumByTopic = new HashMap<>();
partitionNumByTopic.put(topicName, 2);
diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/ProducerMetadataTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/ProducerMetadataTest.java
index 3e29d61..58fbd44 100644
--- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/ProducerMetadataTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/ProducerMetadataTest.java
@@ -22,7 +22,6 @@ import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.internals.ClusterResourceListeners;
import org.apache.kafka.common.requests.MetadataResponse;
import org.apache.kafka.common.utils.LogContext;
-import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.test.TestUtils;
import org.junit.After;
@@ -37,9 +36,9 @@ import java.util.concurrent.atomic.AtomicReference;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertThrows;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
-import static org.junit.Assert.assertThrows;
public class ProducerMetadataTest {
@@ -178,9 +177,8 @@ public class ProducerMetadataTest {
}
@Test
- public void testMetadataWaitAbortedOnFatalException() throws Exception {
- Time time = new MockTime();
- metadata.failedUpdate(time.milliseconds(), new AuthenticationException("Fatal exception from test"));
+ public void testMetadataWaitAbortedOnFatalException() {
+ metadata.fatalError(new AuthenticationException("Fatal exception from test"));
assertThrows(AuthenticationException.class, () -> metadata.awaitUpdate(0, 1000));
}
diff --git a/clients/src/test/java/org/apache/kafka/common/security/authenticator/ClientAuthenticationFailureTest.java b/clients/src/test/java/org/apache/kafka/common/security/authenticator/ClientAuthenticationFailureTest.java
index 5da9f04..0a93c6a 100644
--- a/clients/src/test/java/org/apache/kafka/common/security/authenticator/ClientAuthenticationFailureTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/security/authenticator/ClientAuthenticationFailureTest.java
@@ -18,12 +18,14 @@ package org.apache.kafka.common.security.authenticator;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.admin.Admin;
-import org.apache.kafka.clients.admin.DescribeTopicsResult;
+import org.apache.kafka.clients.admin.TopicDescription;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.common.config.SaslConfigs;
import org.apache.kafka.common.config.internals.BrokerSecurityConfigs;
import org.apache.kafka.common.errors.SaslAuthenticationException;
@@ -35,6 +37,7 @@ import org.apache.kafka.common.security.auth.SecurityProtocol;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.common.utils.MockTime;
+import org.apache.kafka.test.TestUtils;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
@@ -44,9 +47,9 @@ import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
+import java.util.concurrent.Future;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
+import static org.junit.Assert.assertThrows;
public class ClientAuthenticationFailureTest {
private static MockTime time = new MockTime(50);
@@ -88,13 +91,10 @@ public class ClientAuthenticationFailureTest {
StringDeserializer deserializer = new StringDeserializer();
try (KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props, deserializer, deserializer)) {
- consumer.subscribe(Arrays.asList(topic));
- consumer.poll(Duration.ofSeconds(10));
- fail("Expected an authentication error!");
- } catch (SaslAuthenticationException e) {
- // OK
- } catch (Exception e) {
- throw new AssertionError("Expected only an authentication error, but another error occurred.", e);
+ assertThrows(SaslAuthenticationException.class, () -> {
+ consumer.subscribe(Collections.singleton(topic));
+ consumer.poll(Duration.ofSeconds(10));
+ });
}
}
@@ -106,11 +106,8 @@ public class ClientAuthenticationFailureTest {
try (KafkaProducer<String, String> producer = new KafkaProducer<>(props, serializer, serializer)) {
ProducerRecord<String, String> record = new ProducerRecord<>(topic, "message");
- producer.send(record).get();
- fail("Expected an authentication error!");
- } catch (Exception e) {
- assertTrue("Expected SaslAuthenticationException, got " + e.getCause().getClass(),
- e.getCause() instanceof SaslAuthenticationException);
+ Future<RecordMetadata> future = producer.send(record);
+ TestUtils.assertFutureThrows(future, SaslAuthenticationException.class);
}
}
@@ -119,12 +116,8 @@ public class ClientAuthenticationFailureTest {
Map<String, Object> props = new HashMap<>(saslClientConfigs);
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:" + server.port());
try (Admin client = Admin.create(props)) {
- DescribeTopicsResult result = client.describeTopics(Collections.singleton("test"));
- result.all().get();
- fail("Expected an authentication error!");
- } catch (Exception e) {
- assertTrue("Expected SaslAuthenticationException, got " + e.getCause().getClass(),
- e.getCause() instanceof SaslAuthenticationException);
+ KafkaFuture<Map<String, TopicDescription>> future = client.describeTopics(Collections.singleton("test")).all();
+ TestUtils.assertFutureThrows(future, SaslAuthenticationException.class);
}
}
@@ -137,10 +130,7 @@ public class ClientAuthenticationFailureTest {
StringSerializer serializer = new StringSerializer();
try (KafkaProducer<String, String> producer = new KafkaProducer<>(props, serializer, serializer)) {
- producer.initTransactions();
- fail("Expected an authentication error!");
- } catch (SaslAuthenticationException e) {
- // expected exception
+ assertThrows(SaslAuthenticationException.class, producer::initTransactions);
}
}
diff --git a/clients/src/test/java/org/apache/kafka/test/MockSelector.java b/clients/src/test/java/org/apache/kafka/test/MockSelector.java
index 9f3c432..90a83b0 100644
--- a/clients/src/test/java/org/apache/kafka/test/MockSelector.java
+++ b/clients/src/test/java/org/apache/kafka/test/MockSelector.java
@@ -16,9 +16,9 @@
*/
package org.apache.kafka.test;
+import org.apache.kafka.common.errors.AuthenticationException;
import org.apache.kafka.common.network.ChannelState;
import org.apache.kafka.common.network.NetworkReceive;
-import org.apache.kafka.common.network.NetworkSend;
import org.apache.kafka.common.network.Selectable;
import org.apache.kafka.common.network.Send;
import org.apache.kafka.common.requests.ByteBufferChannel;
@@ -88,13 +88,15 @@ public class MockSelector implements Selectable {
close(id);
}
+ public void serverAuthenticationFailed(String id) {
+ ChannelState authFailed = new ChannelState(ChannelState.State.AUTHENTICATION_FAILED,
+ new AuthenticationException("Authentication failed"), null);
+ this.disconnected.put(id, authFailed);
+ close(id);
+ }
+
private void removeSendsForNode(String id, Collection<Send> sends) {
- Iterator<Send> iter = sends.iterator();
- while (iter.hasNext()) {
- Send send = iter.next();
- if (id.equals(send.destination()))
- iter.remove();
- }
+ sends.removeIf(send -> id.equals(send.destination()));
}
public void clear() {
@@ -153,10 +155,6 @@ public class MockSelector implements Selectable {
return completedSends;
}
- public void completeSend(NetworkSend send) {
- this.completedSends.add(send);
- }
-
public List<ByteBufferChannel> completedSendBuffers() {
return completedSendBuffers;
}
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerGroupMember.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerGroupMember.java
index cc052c2..13a52d4 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerGroupMember.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerGroupMember.java
@@ -96,7 +96,7 @@ public class WorkerGroupMember {
List<InetSocketAddress> addresses = ClientUtils.parseAndValidateAddresses(
config.getList(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG),
config.getString(CommonClientConfigs.CLIENT_DNS_LOOKUP_CONFIG));
- this.metadata.bootstrap(addresses, time.milliseconds());
+ this.metadata.bootstrap(addresses);
String metricGrpPrefix = "connect";
ChannelBuilder channelBuilder = ClientUtils.createChannelBuilder(config, time);
NetworkClient netClient = new NetworkClient(
diff --git a/core/src/main/scala/kafka/admin/BrokerApiVersionsCommand.scala b/core/src/main/scala/kafka/admin/BrokerApiVersionsCommand.scala
index b97d858..92cdb9e 100644
--- a/core/src/main/scala/kafka/admin/BrokerApiVersionsCommand.scala
+++ b/core/src/main/scala/kafka/admin/BrokerApiVersionsCommand.scala
@@ -278,7 +278,7 @@ object BrokerApiVersionsCommand {
val brokerUrls = config.getList(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG)
val clientDnsLookup = config.getString(CommonClientConfigs.CLIENT_DNS_LOOKUP_CONFIG)
val brokerAddresses = ClientUtils.parseAndValidateAddresses(brokerUrls, clientDnsLookup)
- metadata.bootstrap(brokerAddresses, time.milliseconds())
+ metadata.bootstrap(brokerAddresses)
val selector = new Selector(
DefaultConnectionMaxIdleMs,