You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by rs...@apache.org on 2018/10/11 11:36:18 UTC
[kafka] branch 2.1 updated: KAFKA-7475 - capture remote address on
connection authetication errors, and log it (#5729)
This is an automated email from the ASF dual-hosted git repository.
rsivaram pushed a commit to branch 2.1
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/2.1 by this push:
new e31b844 KAFKA-7475 - capture remote address on connection authetication errors, and log it (#5729)
e31b844 is described below
commit e31b8441c90a2a3c6978f59631abc6dc7cad5756
Author: Radai Rosenblatt <ra...@gmail.com>
AuthorDate: Thu Oct 11 04:20:19 2018 -0700
KAFKA-7475 - capture remote address on connection authetication errors, and log it (#5729)
Reviewers: Manikumar Reddy <ma...@gmail.com>, Rajini Sivaram <ra...@googlemail.com>
---
.../org/apache/kafka/clients/NetworkClient.java | 9 ++++----
.../apache/kafka/common/network/ChannelState.java | 16 +++++++++++--
.../apache/kafka/common/network/KafkaChannel.java | 27 +++++++++++++++++++---
.../authenticator/SaslAuthenticatorTest.java | 2 +-
4 files changed, 44 insertions(+), 10 deletions(-)
diff --git a/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java b/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
index b2098bf..8ec51ed 100644
--- a/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
@@ -649,6 +649,7 @@ public class NetworkClient implements KafkaClient {
* @param responses The list of responses to update
* @param nodeId Id of the node to be disconnected
* @param now The current time
+ * @param disconnectState The state of the disconnected channel
*/
private void processDisconnection(List<ClientResponse> responses,
String nodeId,
@@ -662,15 +663,15 @@ public class NetworkClient implements KafkaClient {
AuthenticationException exception = disconnectState.exception();
connectionStates.authenticationFailed(nodeId, now, exception);
metadataUpdater.handleAuthenticationFailure(exception);
- log.error("Connection to node {} failed authentication due to: {}", nodeId, exception.getMessage());
+ log.error("Connection to node {} ({}) failed authentication due to: {}", nodeId, disconnectState.remoteAddress(), exception.getMessage());
break;
case AUTHENTICATE:
// This warning applies to older brokers which don't provide feedback on authentication failures
- log.warn("Connection to node {} terminated during authentication. This may indicate " +
- "that authentication failed due to invalid credentials.", nodeId);
+ log.warn("Connection to node {} ({}) terminated during authentication. This may indicate " +
+ "that authentication failed due to invalid credentials.", nodeId, disconnectState.remoteAddress());
break;
case NOT_CONNECTED:
- log.warn("Connection to node {} could not be established. Broker may not be available.", nodeId);
+ log.warn("Connection to node {} ({}) could not be established. Broker may not be available.", nodeId, disconnectState.remoteAddress());
break;
default:
break; // Disconnections in other states are logged at debug level in Selector
diff --git a/clients/src/main/java/org/apache/kafka/common/network/ChannelState.java b/clients/src/main/java/org/apache/kafka/common/network/ChannelState.java
index 2d584bd..5f6dfb9 100644
--- a/clients/src/main/java/org/apache/kafka/common/network/ChannelState.java
+++ b/clients/src/main/java/org/apache/kafka/common/network/ChannelState.java
@@ -75,12 +75,20 @@ public class ChannelState {
private final State state;
private final AuthenticationException exception;
+ private final String remoteAddress;
+
public ChannelState(State state) {
- this(state, null);
+ this(state, null, null);
+ }
+
+ public ChannelState(State state, String remoteAddress) {
+ this(state, null, remoteAddress);
}
- public ChannelState(State state, AuthenticationException exception) {
+
+ public ChannelState(State state, AuthenticationException exception, String remoteAddress) {
this.state = state;
this.exception = exception;
+ this.remoteAddress = remoteAddress;
}
public State state() {
@@ -90,4 +98,8 @@ public class ChannelState {
public AuthenticationException exception() {
return exception;
}
+
+ public String remoteAddress() {
+ return remoteAddress;
+ }
}
diff --git a/clients/src/main/java/org/apache/kafka/common/network/KafkaChannel.java b/clients/src/main/java/org/apache/kafka/common/network/KafkaChannel.java
index 2895128..6a12ef2 100644
--- a/clients/src/main/java/org/apache/kafka/common/network/KafkaChannel.java
+++ b/clients/src/main/java/org/apache/kafka/common/network/KafkaChannel.java
@@ -16,6 +16,7 @@
*/
package org.apache.kafka.common.network;
+import java.net.SocketAddress;
import org.apache.kafka.common.errors.AuthenticationException;
import org.apache.kafka.common.memory.MemoryPool;
import org.apache.kafka.common.security.auth.KafkaPrincipal;
@@ -25,6 +26,7 @@ import java.io.IOException;
import java.net.InetAddress;
import java.net.Socket;
import java.nio.channels.SelectionKey;
+import java.nio.channels.SocketChannel;
import java.util.Objects;
public class KafkaChannel {
@@ -89,6 +91,7 @@ public class KafkaChannel {
private boolean disconnected;
private ChannelMuteState muteState;
private ChannelState state;
+ private SocketAddress remoteAddress;
public KafkaChannel(String id, TransportLayer transportLayer, Authenticator authenticator, int maxReceiveSize, MemoryPool memoryPool) {
this.id = id;
@@ -131,7 +134,8 @@ public class KafkaChannel {
} catch (AuthenticationException e) {
// Clients are notified of authentication exceptions to enable operations to be terminated
// without retries. Other errors are handled as network exceptions in Selector.
- state = new ChannelState(ChannelState.State.AUTHENTICATION_FAILED, e);
+ String remoteDesc = remoteAddress != null ? remoteAddress.toString() : null;
+ state = new ChannelState(ChannelState.State.AUTHENTICATION_FAILED, e, remoteDesc);
if (authenticating) {
delayCloseOnAuthenticationFailure();
throw new DelayedResponseAuthenticationException(e);
@@ -144,6 +148,10 @@ public class KafkaChannel {
public void disconnect() {
disconnected = true;
+ if (state == ChannelState.NOT_CONNECTED && remoteAddress != null) {
+ //if we captured the remote address we can provide more information
+ state = new ChannelState(ChannelState.State.NOT_CONNECTED, remoteAddress.toString());
+ }
transportLayer.disconnect();
}
@@ -156,9 +164,22 @@ public class KafkaChannel {
}
public boolean finishConnect() throws IOException {
+ //we need to grab remoteAddr before finishConnect() is called otherwise
+ //it becomes inaccessible if the connection was refused.
+ SocketChannel socketChannel = transportLayer.socketChannel();
+ if (socketChannel != null) {
+ remoteAddress = socketChannel.getRemoteAddress();
+ }
boolean connected = transportLayer.finishConnect();
- if (connected)
- state = ready() ? ChannelState.READY : ChannelState.AUTHENTICATE;
+ if (connected) {
+ if (ready()) {
+ state = ChannelState.READY;
+ } else if (remoteAddress != null) {
+ state = new ChannelState(ChannelState.State.AUTHENTICATE, remoteAddress.toString());
+ } else {
+ state = ChannelState.AUTHENTICATE;
+ }
+ }
return connected;
}
diff --git a/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslAuthenticatorTest.java b/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslAuthenticatorTest.java
index 297cba5..c090b6b 100644
--- a/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslAuthenticatorTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslAuthenticatorTest.java
@@ -1282,7 +1282,7 @@ public class SaslAuthenticatorTest {
// Without SASL_AUTHENTICATE headers, disconnect state is ChannelState.AUTHENTICATE which is
// a hint that channel was closed during authentication, unlike ChannelState.AUTHENTICATE_FAILED
// which is an actual authentication failure reported by the broker.
- NetworkTestUtils.waitForChannelClose(selector, node, ChannelState.AUTHENTICATE.state());
+ NetworkTestUtils.waitForChannelClose(selector, node, ChannelState.State.AUTHENTICATE);
}
private void createServer(SecurityProtocol securityProtocol, String saslMechanism,