You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by rs...@apache.org on 2018/10/11 11:36:18 UTC

[kafka] branch 2.1 updated: KAFKA-7475 - capture remote address on connection authetication errors, and log it (#5729)

This is an automated email from the ASF dual-hosted git repository.

rsivaram pushed a commit to branch 2.1
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.1 by this push:
     new e31b844  KAFKA-7475 - capture remote address on connection authetication errors, and log it (#5729)
e31b844 is described below

commit e31b8441c90a2a3c6978f59631abc6dc7cad5756
Author: Radai Rosenblatt <ra...@gmail.com>
AuthorDate: Thu Oct 11 04:20:19 2018 -0700

    KAFKA-7475 - capture remote address on connection authetication errors, and log it (#5729)
    
    Reviewers: Manikumar Reddy <ma...@gmail.com>, Rajini Sivaram <ra...@googlemail.com>
---
 .../org/apache/kafka/clients/NetworkClient.java    |  9 ++++----
 .../apache/kafka/common/network/ChannelState.java  | 16 +++++++++++--
 .../apache/kafka/common/network/KafkaChannel.java  | 27 +++++++++++++++++++---
 .../authenticator/SaslAuthenticatorTest.java       |  2 +-
 4 files changed, 44 insertions(+), 10 deletions(-)

diff --git a/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java b/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
index b2098bf..8ec51ed 100644
--- a/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
@@ -649,6 +649,7 @@ public class NetworkClient implements KafkaClient {
      * @param responses The list of responses to update
      * @param nodeId Id of the node to be disconnected
      * @param now The current time
+     * @param disconnectState The state of the disconnected channel           
      */
     private void processDisconnection(List<ClientResponse> responses,
                                       String nodeId,
@@ -662,15 +663,15 @@ public class NetworkClient implements KafkaClient {
                 AuthenticationException exception = disconnectState.exception();
                 connectionStates.authenticationFailed(nodeId, now, exception);
                 metadataUpdater.handleAuthenticationFailure(exception);
-                log.error("Connection to node {} failed authentication due to: {}", nodeId, exception.getMessage());
+                log.error("Connection to node {} ({}) failed authentication due to: {}", nodeId, disconnectState.remoteAddress(), exception.getMessage());
                 break;
             case AUTHENTICATE:
                 // This warning applies to older brokers which don't provide feedback on authentication failures
-                log.warn("Connection to node {} terminated during authentication. This may indicate " +
-                        "that authentication failed due to invalid credentials.", nodeId);
+                log.warn("Connection to node {} ({}) terminated during authentication. This may indicate " +
+                        "that authentication failed due to invalid credentials.", nodeId, disconnectState.remoteAddress());
                 break;
             case NOT_CONNECTED:
-                log.warn("Connection to node {} could not be established. Broker may not be available.", nodeId);
+                log.warn("Connection to node {} ({}) could not be established. Broker may not be available.", nodeId, disconnectState.remoteAddress());
                 break;
             default:
                 break; // Disconnections in other states are logged at debug level in Selector
diff --git a/clients/src/main/java/org/apache/kafka/common/network/ChannelState.java b/clients/src/main/java/org/apache/kafka/common/network/ChannelState.java
index 2d584bd..5f6dfb9 100644
--- a/clients/src/main/java/org/apache/kafka/common/network/ChannelState.java
+++ b/clients/src/main/java/org/apache/kafka/common/network/ChannelState.java
@@ -75,12 +75,20 @@ public class ChannelState {
 
     private final State state;
     private final AuthenticationException exception;
+    private final String remoteAddress;
+
     public ChannelState(State state) {
-        this(state, null);
+        this(state, null, null);
+    }
+
+    public ChannelState(State state, String remoteAddress) {
+        this(state, null, remoteAddress);
     }
-    public ChannelState(State state, AuthenticationException exception) {
+    
+    public ChannelState(State state, AuthenticationException exception, String remoteAddress) {
         this.state = state;
         this.exception = exception;
+        this.remoteAddress = remoteAddress;
     }
 
     public State state() {
@@ -90,4 +98,8 @@ public class ChannelState {
     public AuthenticationException exception() {
         return exception;
     }
+
+    public String remoteAddress() {
+        return remoteAddress;
+    }
 }
diff --git a/clients/src/main/java/org/apache/kafka/common/network/KafkaChannel.java b/clients/src/main/java/org/apache/kafka/common/network/KafkaChannel.java
index 2895128..6a12ef2 100644
--- a/clients/src/main/java/org/apache/kafka/common/network/KafkaChannel.java
+++ b/clients/src/main/java/org/apache/kafka/common/network/KafkaChannel.java
@@ -16,6 +16,7 @@
  */
 package org.apache.kafka.common.network;
 
+import java.net.SocketAddress;
 import org.apache.kafka.common.errors.AuthenticationException;
 import org.apache.kafka.common.memory.MemoryPool;
 import org.apache.kafka.common.security.auth.KafkaPrincipal;
@@ -25,6 +26,7 @@ import java.io.IOException;
 import java.net.InetAddress;
 import java.net.Socket;
 import java.nio.channels.SelectionKey;
+import java.nio.channels.SocketChannel;
 import java.util.Objects;
 
 public class KafkaChannel {
@@ -89,6 +91,7 @@ public class KafkaChannel {
     private boolean disconnected;
     private ChannelMuteState muteState;
     private ChannelState state;
+    private SocketAddress remoteAddress;
 
     public KafkaChannel(String id, TransportLayer transportLayer, Authenticator authenticator, int maxReceiveSize, MemoryPool memoryPool) {
         this.id = id;
@@ -131,7 +134,8 @@ public class KafkaChannel {
         } catch (AuthenticationException e) {
             // Clients are notified of authentication exceptions to enable operations to be terminated
             // without retries. Other errors are handled as network exceptions in Selector.
-            state = new ChannelState(ChannelState.State.AUTHENTICATION_FAILED, e);
+            String remoteDesc = remoteAddress != null ? remoteAddress.toString() : null;
+            state = new ChannelState(ChannelState.State.AUTHENTICATION_FAILED, e, remoteDesc);
             if (authenticating) {
                 delayCloseOnAuthenticationFailure();
                 throw new DelayedResponseAuthenticationException(e);
@@ -144,6 +148,10 @@ public class KafkaChannel {
 
     public void disconnect() {
         disconnected = true;
+        if (state == ChannelState.NOT_CONNECTED && remoteAddress != null) {
+            //if we captured the remote address we can provide more information
+            state = new ChannelState(ChannelState.State.NOT_CONNECTED, remoteAddress.toString());
+        }
         transportLayer.disconnect();
     }
 
@@ -156,9 +164,22 @@ public class KafkaChannel {
     }
 
     public boolean finishConnect() throws IOException {
+        //we need to grab remoteAddr before finishConnect() is called otherwise
+        //it becomes inaccessible if the connection was refused.
+        SocketChannel socketChannel = transportLayer.socketChannel();
+        if (socketChannel != null) {
+            remoteAddress = socketChannel.getRemoteAddress();
+        }
         boolean connected = transportLayer.finishConnect();
-        if (connected)
-            state = ready() ? ChannelState.READY : ChannelState.AUTHENTICATE;
+        if (connected) {
+            if (ready()) {
+                state = ChannelState.READY;
+            } else if (remoteAddress != null) {
+                state = new ChannelState(ChannelState.State.AUTHENTICATE, remoteAddress.toString());
+            } else {
+                state = ChannelState.AUTHENTICATE;
+            }
+        }
         return connected;
     }
 
diff --git a/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslAuthenticatorTest.java b/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslAuthenticatorTest.java
index 297cba5..c090b6b 100644
--- a/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslAuthenticatorTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslAuthenticatorTest.java
@@ -1282,7 +1282,7 @@ public class SaslAuthenticatorTest {
         // Without SASL_AUTHENTICATE headers, disconnect state is ChannelState.AUTHENTICATE which is
         // a hint that channel was closed during authentication, unlike ChannelState.AUTHENTICATE_FAILED
         // which is an actual authentication failure reported by the broker.
-        NetworkTestUtils.waitForChannelClose(selector, node, ChannelState.AUTHENTICATE.state());
+        NetworkTestUtils.waitForChannelClose(selector, node, ChannelState.State.AUTHENTICATE);
     }
 
     private void createServer(SecurityProtocol securityProtocol, String saslMechanism,