You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by rs...@apache.org on 2017/05/15 22:13:38 UTC
kafka git commit: KAFKA-5179;
Log connection termination during authentication
Repository: kafka
Updated Branches:
refs/heads/trunk 46aa88b9c -> 4c75f31a5
KAFKA-5179; Log connection termination during authentication
Author: Rajini Sivaram <ra...@googlemail.com>
Reviewers: Ismael Juma, Jun Rao
Closes #2980 from rajinisivaram/KAFKA-5179
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/4c75f31a
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/4c75f31a
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/4c75f31a
Branch: refs/heads/trunk
Commit: 4c75f31a5f80e6a717d040b0534c79f5ed8d9346
Parents: 46aa88b
Author: Rajini Sivaram <ra...@googlemail.com>
Authored: Mon May 15 18:13:20 2017 -0400
Committer: Rajini Sivaram <ra...@googlemail.com>
Committed: Mon May 15 18:13:20 2017 -0400
----------------------------------------------------------------------
checkstyle/suppressions.xml | 2 +-
.../org/apache/kafka/clients/NetworkClient.java | 27 ++++++++--
.../kafka/common/network/ChannelState.java | 56 ++++++++++++++++++++
.../kafka/common/network/KafkaChannel.java | 16 +++++-
.../apache/kafka/common/network/Selectable.java | 7 +--
.../apache/kafka/common/network/Selector.java | 22 +++++---
.../apache/kafka/clients/NetworkClientTest.java | 4 +-
.../kafka/common/network/NetworkTestUtils.java | 3 +-
.../kafka/common/network/SelectorTest.java | 9 ++--
.../kafka/common/network/SslSelectorTest.java | 2 +-
.../common/network/SslTransportLayerTest.java | 14 ++---
.../authenticator/SaslAuthenticatorTest.java | 40 ++++++--------
.../org/apache/kafka/test/MockSelector.java | 9 ++--
.../main/scala/kafka/network/SocketServer.scala | 2 +-
14 files changed, 156 insertions(+), 57 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/4c75f31a/checkstyle/suppressions.xml
----------------------------------------------------------------------
diff --git a/checkstyle/suppressions.xml b/checkstyle/suppressions.xml
index dd41f94..9729ee5 100644
--- a/checkstyle/suppressions.xml
+++ b/checkstyle/suppressions.xml
@@ -8,7 +8,7 @@
<!-- Clients -->
<suppress checks="ClassFanOutComplexity"
- files="(Fetcher|ConsumerCoordinator|KafkaConsumer|KafkaProducer|SaslServerAuthenticator|Utils|TransactionManagerTest|KafkaAdminClient).java"/>
+ files="(Fetcher|ConsumerCoordinator|KafkaConsumer|KafkaProducer|SaslServerAuthenticator|Utils|TransactionManagerTest|KafkaAdminClient|NetworkClient).java"/>
<suppress checks="ClassFanOutComplexity"
files=".*/protocol/Errors.java"/>
<suppress checks="ClassFanOutComplexity"
http://git-wip-us.apache.org/repos/asf/kafka/blob/4c75f31a/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java b/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
index df9e2fa..a09f85d 100644
--- a/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
@@ -19,6 +19,7 @@ package org.apache.kafka.clients;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.errors.UnsupportedVersionException;
+import org.apache.kafka.common.network.ChannelState;
import org.apache.kafka.common.network.NetworkReceive;
import org.apache.kafka.common.network.Selectable;
import org.apache.kafka.common.network.Send;
@@ -482,10 +483,21 @@ public class NetworkClient implements KafkaClient {
* @param nodeId Id of the node to be disconnected
* @param now The current time
*/
- private void processDisconnection(List<ClientResponse> responses, String nodeId, long now) {
+ private void processDisconnection(List<ClientResponse> responses, String nodeId, long now, ChannelState disconnectState) {
connectionStates.disconnected(nodeId, now);
apiVersions.remove(nodeId);
nodesNeedingApiVersionsFetch.remove(nodeId);
+ switch (disconnectState) {
+ case AUTHENTICATE:
+ log.warn("Connection to node {} terminated during authentication. This may indicate " +
+ "that authentication failed due to invalid credentials.", nodeId);
+ break;
+ case NOT_CONNECTED:
+ log.warn("Connection to node {} could not be established. Broker may not be available.", nodeId);
+ break;
+ default:
+ break; // Disconnections in other states are logged at debug level in Selector
+ }
for (InFlightRequest request : this.inFlightRequests.clearAll(nodeId)) {
log.trace("Cancelled request {} due to node {} being disconnected", request.request, nodeId);
if (request.isInternalRequest && request.header.apiKey() == ApiKeys.METADATA.id)
@@ -508,7 +520,7 @@ public class NetworkClient implements KafkaClient {
// close connection to the node
this.selector.close(nodeId);
log.debug("Disconnecting from node {} due to request timeout.", nodeId);
- processDisconnection(responses, nodeId, now);
+ processDisconnection(responses, nodeId, now, ChannelState.LOCAL_CLOSE);
}
// we disconnected, so we should probably refresh our metadata
@@ -567,7 +579,7 @@ public class NetworkClient implements KafkaClient {
log.warn("Node {} got error {} when making an ApiVersionsRequest. Disconnecting.",
node, apiVersionsResponse.error());
this.selector.close(node);
- processDisconnection(responses, node, now);
+ processDisconnection(responses, node, now, ChannelState.LOCAL_CLOSE);
} else {
nodesNeedingApiVersionsFetch.put(node, new ApiVersionsRequest.Builder((short) 0));
}
@@ -588,9 +600,10 @@ public class NetworkClient implements KafkaClient {
* @param now The current time
*/
private void handleDisconnections(List<ClientResponse> responses, long now) {
- for (String node : this.selector.disconnected()) {
+ for (Map.Entry<String, ChannelState> entry : this.selector.disconnected().entrySet()) {
+ String node = entry.getKey();
log.debug("Node {} disconnected.", node);
- processDisconnection(responses, node, now);
+ processDisconnection(responses, node, now, entry.getValue());
}
// we got a disconnect so we should probably refresh our metadata and see if that broker is dead
if (this.selector.disconnected().size() > 0)
@@ -710,6 +723,10 @@ public class NetworkClient implements KafkaClient {
@Override
public void handleDisconnection(String destination) {
Cluster cluster = metadata.fetch();
+ // 'processDisconnection' generates warnings for misconfigured bootstrap server configuration
+ // resulting in 'Connection Refused' and misconfigured security resulting in authentication failures.
+ // The warning below handles the case where connection to a broker was established, but was disconnected
+ // before metadata could be obtained.
if (cluster.isBootstrapConfigured()) {
int nodeId = Integer.parseInt(destination);
Node node = cluster.nodeById(nodeId);
http://git-wip-us.apache.org/repos/asf/kafka/blob/4c75f31a/clients/src/main/java/org/apache/kafka/common/network/ChannelState.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/network/ChannelState.java b/clients/src/main/java/org/apache/kafka/common/network/ChannelState.java
new file mode 100644
index 0000000..23e877c
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/network/ChannelState.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.network;
+
+/**
+ * States for KafkaChannel:
+ * <ul>
+ * <li>NOT_CONNECTED: Connections are created in NOT_CONNECTED state. State is updated
+ * on {@link TransportLayer#finishConnect()} when socket connection is established.
+ * PLAINTEXT channels transition from NOT_CONNECTED to READY, others transition
+ * to AUTHENTICATE. Failures in NOT_CONNECTED state typically indicate that the
+ * remote endpoint is unavailable, which may be due to misconfigured endpoints.</li>
+ * <li>AUTHENTICATE: SSL, SASL_SSL and SASL_PLAINTEXT channels are in AUTHENTICATE state during SSL and
+ * SASL handshake. Disconnections in AUTHENTICATE state may indicate that SSL or SASL
+ * authentication failed. Channels transition to READY state when authentication completes
+ * successfully.</li>
+ * <li>READY: Connected, authenticated channels are in READY state. Channels may transition from
+ * READY to EXPIRED, FAILED_SEND or LOCAL_CLOSE.</li>
+ * <li>EXPIRED: Idle connections are moved to EXPIRED state on idle timeout and the channel is closed.</li>
+ * <li>FAILED_SEND: Channels transition from READY to FAILED_SEND state if the channel is closed due
+ * to a send failure.</li>
+ * <li>LOCAL_CLOSE: Channels are moved to LOCAL_CLOSE state if close() is initiated locally.</li>
+ * </ul>
+ * If the remote endpoint closes a channel, the state of the channel reflects the state the channel
+ * was in at the time of disconnection. This state may be useful to identify the reason for disconnection.
+ * <p>
+ * Typical transitions:
+ * <ul>
+ * <li>PLAINTEXT Good path: NOT_CONNECTED => READY => LOCAL_CLOSE</li>
+ * <li>SASL/SSL Good path: NOT_CONNECTED => AUTHENTICATE => READY => LOCAL_CLOSE</li>
+ * <li>Bootstrap server misconfiguration: NOT_CONNECTED, disconnected in NOT_CONNECTED state</li>
+ * <li>Security misconfiguration: NOT_CONNECTED => AUTHENTICATE, disconnected in AUTHENTICATE state</li>
+ * </ul>
+ */
+public enum ChannelState {
+ NOT_CONNECTED,
+ AUTHENTICATE,
+ READY,
+ EXPIRED,
+ FAILED_SEND,
+ LOCAL_CLOSE
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/kafka/blob/4c75f31a/clients/src/main/java/org/apache/kafka/common/network/KafkaChannel.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/network/KafkaChannel.java b/clients/src/main/java/org/apache/kafka/common/network/KafkaChannel.java
index ea03ff0..5e3a895 100644
--- a/clients/src/main/java/org/apache/kafka/common/network/KafkaChannel.java
+++ b/clients/src/main/java/org/apache/kafka/common/network/KafkaChannel.java
@@ -41,6 +41,7 @@ public class KafkaChannel {
// processed after the channel is disconnected.
private boolean disconnected;
private boolean muted;
+ private ChannelState state;
public KafkaChannel(String id, TransportLayer transportLayer, Authenticator authenticator, int maxReceiveSize) throws IOException {
this.id = id;
@@ -50,6 +51,7 @@ public class KafkaChannel {
this.maxReceiveSize = maxReceiveSize;
this.disconnected = false;
this.muted = false;
+ this.state = ChannelState.NOT_CONNECTED;
}
public void close() throws IOException {
@@ -72,6 +74,8 @@ public class KafkaChannel {
transportLayer.handshake();
if (transportLayer.ready() && !authenticator.complete())
authenticator.authenticate();
+ if (ready())
+ state = ChannelState.READY;
}
public void disconnect() {
@@ -79,9 +83,19 @@ public class KafkaChannel {
transportLayer.disconnect();
}
+ public void state(ChannelState state) {
+ this.state = state;
+ }
+
+ public ChannelState state() {
+ return this.state;
+ }
public boolean finishConnect() throws IOException {
- return transportLayer.finishConnect();
+ boolean connected = transportLayer.finishConnect();
+ if (connected)
+ state = ready() ? ChannelState.READY : ChannelState.AUTHENTICATE;
+ return connected;
}
public boolean isConnected() {
http://git-wip-us.apache.org/repos/asf/kafka/blob/4c75f31a/clients/src/main/java/org/apache/kafka/common/network/Selectable.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/network/Selectable.java b/clients/src/main/java/org/apache/kafka/common/network/Selectable.java
index 6eca427..efb603c 100644
--- a/clients/src/main/java/org/apache/kafka/common/network/Selectable.java
+++ b/clients/src/main/java/org/apache/kafka/common/network/Selectable.java
@@ -20,6 +20,7 @@ package org.apache.kafka.common.network;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.List;
+import java.util.Map;
/**
* An interface for asynchronous, multi-channel network I/O
@@ -80,10 +81,10 @@ public interface Selectable {
public List<NetworkReceive> completedReceives();
/**
- * The list of connections that finished disconnecting on the last {@link #poll(long) poll()}
- * call.
+ * The connections that finished disconnecting on the last {@link #poll(long) poll()}
+ * call. Channel state indicates the local channel state at the time of disconnection.
*/
- public List<String> disconnected();
+ public Map<String, ChannelState> disconnected();
/**
* The list of connections that completed their connection on the last {@link #poll(long) poll()}
http://git-wip-us.apache.org/repos/asf/kafka/blob/4c75f31a/clients/src/main/java/org/apache/kafka/common/network/Selector.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/network/Selector.java b/clients/src/main/java/org/apache/kafka/common/network/Selector.java
index a74a584..8f85202 100644
--- a/clients/src/main/java/org/apache/kafka/common/network/Selector.java
+++ b/clients/src/main/java/org/apache/kafka/common/network/Selector.java
@@ -92,7 +92,7 @@ public class Selector implements Selectable, AutoCloseable {
private final Map<KafkaChannel, Deque<NetworkReceive>> stagedReceives;
private final Set<SelectionKey> immediatelyConnectedKeys;
private final Map<String, KafkaChannel> closingChannels;
- private final List<String> disconnected;
+ private final Map<String, ChannelState> disconnected;
private final List<String> connected;
private final List<String> failedSends;
private final Time time;
@@ -137,7 +137,7 @@ public class Selector implements Selectable, AutoCloseable {
this.immediatelyConnectedKeys = new HashSet<>();
this.closingChannels = new HashMap<>();
this.connected = new ArrayList<>();
- this.disconnected = new ArrayList<>();
+ this.disconnected = new HashMap<>();
this.failedSends = new ArrayList<>();
this.sensors = new SelectorMetrics(metrics, metricGrpPrefix, metricTags, metricsPerConnection);
this.channelBuilder = channelBuilder;
@@ -413,7 +413,7 @@ public class Selector implements Selectable, AutoCloseable {
}
@Override
- public List<String> disconnected() {
+ public Map<String, ChannelState> disconnected() {
return this.disconnected;
}
@@ -466,6 +466,7 @@ public class Selector implements Selectable, AutoCloseable {
if (log.isTraceEnabled())
log.trace("About to close the idle connection from {} due to being idle for {} millis",
connectionId, (currentTimeNanos - expiredConnection.getValue()) / 1000 / 1000);
+ channel.state(ChannelState.EXPIRED);
close(channel, true);
}
}
@@ -489,7 +490,12 @@ public class Selector implements Selectable, AutoCloseable {
it.remove();
}
}
- this.disconnected.addAll(this.failedSends);
+ for (String channel : this.failedSends) {
+ KafkaChannel failedChannel = closingChannels.get(channel);
+ if (failedChannel != null)
+ failedChannel.state(ChannelState.FAILED_SEND);
+ this.disconnected.put(channel, ChannelState.FAILED_SEND);
+ }
this.failedSends.clear();
}
@@ -516,8 +522,12 @@ public class Selector implements Selectable, AutoCloseable {
*/
public void close(String id) {
KafkaChannel channel = this.channels.get(id);
- if (channel != null)
+ if (channel != null) {
+ // There is no disconnect notification for local close, but updating
+ // channel state here anyway to avoid confusion.
+ channel.state(ChannelState.LOCAL_CLOSE);
close(channel, false);
+ }
}
/**
@@ -566,7 +576,7 @@ public class Selector implements Selectable, AutoCloseable {
this.sensors.connectionClosed.record();
this.stagedReceives.remove(channel);
if (notifyDisconnect)
- this.disconnected.add(channel.id());
+ this.disconnected.put(channel.id(), channel.state());
}
/**
http://git-wip-us.apache.org/repos/asf/kafka/blob/4c75f31a/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java b/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java
index 55b4fc6..59a46ac 100644
--- a/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java
@@ -185,8 +185,8 @@ public class NetworkClientTest {
// sleeping to make sure that the time since last send is greater than requestTimeOut
time.sleep(3000);
client.poll(3000, time.milliseconds());
- String disconnectedNode = selector.disconnected().get(0);
- assertEquals(node.idString(), disconnectedNode);
+ assertEquals(1, selector.disconnected().size());
+ assertTrue("Node not found in disconnected map", selector.disconnected().containsKey(node.idString()));
}
@Test
http://git-wip-us.apache.org/repos/asf/kafka/blob/4c75f31a/clients/src/test/java/org/apache/kafka/common/network/NetworkTestUtils.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/network/NetworkTestUtils.java b/clients/src/test/java/org/apache/kafka/common/network/NetworkTestUtils.java
index a3859c1..43c7d9b 100644
--- a/clients/src/test/java/org/apache/kafka/common/network/NetworkTestUtils.java
+++ b/clients/src/test/java/org/apache/kafka/common/network/NetworkTestUtils.java
@@ -77,7 +77,7 @@ public class NetworkTestUtils {
assertTrue(selector.isChannelReady(node));
}
- public static void waitForChannelClose(Selector selector, String node) throws IOException {
+ public static void waitForChannelClose(Selector selector, String node, ChannelState channelState) throws IOException {
boolean closed = false;
for (int i = 0; i < 30; i++) {
selector.poll(1000L);
@@ -87,5 +87,6 @@ public class NetworkTestUtils {
}
}
assertTrue("Channel was not closed by timeout", closed);
+ assertEquals(channelState, selector.disconnected().get(node));
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/4c75f31a/clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java b/clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java
index adff4b2..33959fd 100644
--- a/clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java
@@ -81,7 +81,7 @@ public class SelectorTest {
// disconnect
this.server.closeConnections();
- while (!selector.disconnected().contains(node))
+ while (!selector.disconnected().containsKey(node))
selector.poll(1000L);
// reconnect and do another request
@@ -127,8 +127,10 @@ public class SelectorTest {
ServerSocket nonListeningSocket = new ServerSocket(0);
int nonListeningPort = nonListeningSocket.getLocalPort();
selector.connect(node, new InetSocketAddress("localhost", nonListeningPort), BUFFER_SIZE, BUFFER_SIZE);
- while (selector.disconnected().contains(node))
+ while (selector.disconnected().containsKey(node)) {
+ assertEquals(ChannelState.NOT_CONNECTED, selector.disconnected().get(node));
selector.poll(1000L);
+ }
nonListeningSocket.close();
}
@@ -262,7 +264,8 @@ public class SelectorTest {
time.sleep(6000); // The max idle time is 5000ms
selector.poll(0);
- assertTrue("The idle connection should have been closed", selector.disconnected().contains(id));
+ assertTrue("The idle connection should have been closed", selector.disconnected().containsKey(id));
+ assertEquals(ChannelState.EXPIRED, selector.disconnected().get(id));
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/4c75f31a/clients/src/test/java/org/apache/kafka/common/network/SslSelectorTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/network/SslSelectorTest.java b/clients/src/test/java/org/apache/kafka/common/network/SslSelectorTest.java
index 476ddfb..80f266f 100644
--- a/clients/src/test/java/org/apache/kafka/common/network/SslSelectorTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/network/SslSelectorTest.java
@@ -154,7 +154,7 @@ public class SslSelectorTest extends SelectorTest {
List<String> disconnected = new ArrayList<>();
while (!disconnected.contains(node) && System.currentTimeMillis() < expiryTime) {
selector.poll(10);
- disconnected.addAll(selector.disconnected());
+ disconnected.addAll(selector.disconnected().keySet());
}
assertTrue("Renegotiation should cause disconnection", disconnected.contains(node));
http://git-wip-us.apache.org/repos/asf/kafka/blob/4c75f31a/clients/src/test/java/org/apache/kafka/common/network/SslTransportLayerTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/network/SslTransportLayerTest.java b/clients/src/test/java/org/apache/kafka/common/network/SslTransportLayerTest.java
index 345ace1..42e0f6f 100644
--- a/clients/src/test/java/org/apache/kafka/common/network/SslTransportLayerTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/network/SslTransportLayerTest.java
@@ -119,7 +119,7 @@ public class SslTransportLayerTest {
InetSocketAddress addr = new InetSocketAddress("localhost", server.port());
selector.connect(node, addr, BUFFER_SIZE, BUFFER_SIZE);
- NetworkTestUtils.waitForChannelClose(selector, node);
+ NetworkTestUtils.waitForChannelClose(selector, node, ChannelState.AUTHENTICATE);
}
/**
@@ -184,7 +184,7 @@ public class SslTransportLayerTest {
sslClientConfigs.remove(SslConfigs.SSL_KEY_PASSWORD_CONFIG);
createSelector(sslClientConfigs);
selector.connect(node, addr, BUFFER_SIZE, BUFFER_SIZE);
- NetworkTestUtils.waitForChannelClose(selector, node);
+ NetworkTestUtils.waitForChannelClose(selector, node, ChannelState.AUTHENTICATE);
selector.close();
server.close();
@@ -212,7 +212,7 @@ public class SslTransportLayerTest {
InetSocketAddress addr = new InetSocketAddress("localhost", server.port());
selector.connect(node, addr, BUFFER_SIZE, BUFFER_SIZE);
- NetworkTestUtils.waitForChannelClose(selector, node);
+ NetworkTestUtils.waitForChannelClose(selector, node, ChannelState.AUTHENTICATE);
}
/**
@@ -232,7 +232,7 @@ public class SslTransportLayerTest {
InetSocketAddress addr = new InetSocketAddress("localhost", server.port());
selector.connect(node, addr, BUFFER_SIZE, BUFFER_SIZE);
- NetworkTestUtils.waitForChannelClose(selector, node);
+ NetworkTestUtils.waitForChannelClose(selector, node, ChannelState.AUTHENTICATE);
}
/**
@@ -384,7 +384,7 @@ public class SslTransportLayerTest {
InetSocketAddress addr = new InetSocketAddress("localhost", server.port());
selector.connect(node, addr, BUFFER_SIZE, BUFFER_SIZE);
- NetworkTestUtils.waitForChannelClose(selector, node);
+ NetworkTestUtils.waitForChannelClose(selector, node, ChannelState.AUTHENTICATE);
}
/**
@@ -401,7 +401,7 @@ public class SslTransportLayerTest {
InetSocketAddress addr = new InetSocketAddress("localhost", server.port());
selector.connect(node, addr, BUFFER_SIZE, BUFFER_SIZE);
- NetworkTestUtils.waitForChannelClose(selector, node);
+ NetworkTestUtils.waitForChannelClose(selector, node, ChannelState.AUTHENTICATE);
}
/**
@@ -419,7 +419,7 @@ public class SslTransportLayerTest {
InetSocketAddress addr = new InetSocketAddress("localhost", server.port());
selector.connect(node, addr, BUFFER_SIZE, BUFFER_SIZE);
- NetworkTestUtils.waitForChannelClose(selector, node);
+ NetworkTestUtils.waitForChannelClose(selector, node, ChannelState.AUTHENTICATE);
}
/**
http://git-wip-us.apache.org/repos/asf/kafka/blob/4c75f31a/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslAuthenticatorTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslAuthenticatorTest.java b/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslAuthenticatorTest.java
index 1aea835..28402f0 100644
--- a/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslAuthenticatorTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslAuthenticatorTest.java
@@ -23,6 +23,7 @@ import org.apache.kafka.common.config.types.Password;
import org.apache.kafka.common.network.CertStores;
import org.apache.kafka.common.network.ChannelBuilder;
import org.apache.kafka.common.network.ChannelBuilders;
+import org.apache.kafka.common.network.ChannelState;
import org.apache.kafka.common.network.ListenerName;
import org.apache.kafka.common.network.NetworkSend;
import org.apache.kafka.common.network.NetworkTestUtils;
@@ -137,8 +138,7 @@ public class SaslAuthenticatorTest {
jaasConfig.setPlainClientOptions(TestJaasConfig.USERNAME, "invalidpassword");
server = createEchoServer(securityProtocol);
- createClientConnection(securityProtocol, node);
- NetworkTestUtils.waitForChannelClose(selector, node);
+ createAndCheckClientConnectionFailure(securityProtocol, node);
}
/**
@@ -152,8 +152,7 @@ public class SaslAuthenticatorTest {
jaasConfig.setPlainClientOptions("invaliduser", TestJaasConfig.PASSWORD);
server = createEchoServer(securityProtocol);
- createClientConnection(securityProtocol, node);
- NetworkTestUtils.waitForChannelClose(selector, node);
+ createAndCheckClientConnectionFailure(securityProtocol, node);
}
/**
@@ -286,8 +285,7 @@ public class SaslAuthenticatorTest {
String node = "0";
server = createEchoServer(securityProtocol);
updateScramCredentialCache(TestJaasConfig.USERNAME, TestJaasConfig.PASSWORD);
- createClientConnection(securityProtocol, node);
- NetworkTestUtils.waitForChannelClose(selector, node);
+ createAndCheckClientConnectionFailure(securityProtocol, node);
}
/**
@@ -305,8 +303,7 @@ public class SaslAuthenticatorTest {
String node = "0";
server = createEchoServer(securityProtocol);
updateScramCredentialCache(TestJaasConfig.USERNAME, TestJaasConfig.PASSWORD);
- createClientConnection(securityProtocol, node);
- NetworkTestUtils.waitForChannelClose(selector, node);
+ createAndCheckClientConnectionFailure(securityProtocol, node);
}
/**
@@ -323,8 +320,7 @@ public class SaslAuthenticatorTest {
server.credentialCache().cache(ScramMechanism.SCRAM_SHA_256.mechanismName(), ScramCredential.class).remove(TestJaasConfig.USERNAME);
String node = "1";
saslClientConfigs.put(SaslConfigs.SASL_MECHANISM, "SCRAM-SHA-256");
- createClientConnection(securityProtocol, node);
- NetworkTestUtils.waitForChannelClose(selector, node);
+ createAndCheckClientConnectionFailure(securityProtocol, node);
saslClientConfigs.put(SaslConfigs.SASL_MECHANISM, "SCRAM-SHA-512");
createAndCheckClientConnection(securityProtocol, "2");
@@ -425,7 +421,7 @@ public class SaslAuthenticatorTest {
SaslHandshakeRequest request = new SaslHandshakeRequest("PLAIN");
RequestHeader header = new RequestHeader(ApiKeys.SASL_HANDSHAKE.id, Short.MAX_VALUE, "someclient", 2);
selector.send(request.toSend(node1, header));
- NetworkTestUtils.waitForChannelClose(selector, node1);
+ NetworkTestUtils.waitForChannelClose(selector, node1, ChannelState.READY);
selector.close();
// Test good connection still works
@@ -451,7 +447,7 @@ public class SaslAuthenticatorTest {
byte[] bytes = new byte[1024];
random.nextBytes(bytes);
selector.send(new NetworkSend(node1, ByteBuffer.wrap(bytes)));
- NetworkTestUtils.waitForChannelClose(selector, node1);
+ NetworkTestUtils.waitForChannelClose(selector, node1, ChannelState.READY);
selector.close();
// Test good connection still works
@@ -462,7 +458,7 @@ public class SaslAuthenticatorTest {
createClientConnection(SecurityProtocol.PLAINTEXT, node2);
random.nextBytes(bytes);
selector.send(new NetworkSend(node2, ByteBuffer.wrap(bytes)));
- NetworkTestUtils.waitForChannelClose(selector, node2);
+ NetworkTestUtils.waitForChannelClose(selector, node2, ChannelState.READY);
selector.close();
// Test good connection still works
@@ -491,7 +487,7 @@ public class SaslAuthenticatorTest {
RequestHeader versionsHeader = new RequestHeader(ApiKeys.API_VERSIONS.id,
request.version(), "someclient", 2);
selector.send(request.toSend(node1, versionsHeader));
- NetworkTestUtils.waitForChannelClose(selector, node1);
+ NetworkTestUtils.waitForChannelClose(selector, node1, ChannelState.READY);
selector.close();
// Test good connection still works
@@ -518,7 +514,7 @@ public class SaslAuthenticatorTest {
buffer.put(new byte[buffer.capacity() - 4]);
buffer.rewind();
selector.send(new NetworkSend(node1, buffer));
- NetworkTestUtils.waitForChannelClose(selector, node1);
+ NetworkTestUtils.waitForChannelClose(selector, node1, ChannelState.READY);
selector.close();
// Test good connection still works
@@ -532,7 +528,7 @@ public class SaslAuthenticatorTest {
buffer.put(new byte[buffer.capacity() - 4]);
buffer.rewind();
selector.send(new NetworkSend(node2, buffer));
- NetworkTestUtils.waitForChannelClose(selector, node2);
+ NetworkTestUtils.waitForChannelClose(selector, node2, ChannelState.READY);
selector.close();
// Test good connection still works
@@ -557,7 +553,7 @@ public class SaslAuthenticatorTest {
RequestHeader metadataRequestHeader1 = new RequestHeader(ApiKeys.METADATA.id,
metadataRequest1.version(), "someclient", 1);
selector.send(metadataRequest1.toSend(node1, metadataRequestHeader1));
- NetworkTestUtils.waitForChannelClose(selector, node1);
+ NetworkTestUtils.waitForChannelClose(selector, node1, ChannelState.READY);
selector.close();
// Test good connection still works
@@ -572,7 +568,7 @@ public class SaslAuthenticatorTest {
RequestHeader metadataRequestHeader2 = new RequestHeader(ApiKeys.METADATA.id,
metadataRequest2.version(), "someclient", 2);
selector.send(metadataRequest2.toSend(node2, metadataRequestHeader2));
- NetworkTestUtils.waitForChannelClose(selector, node2);
+ NetworkTestUtils.waitForChannelClose(selector, node2, ChannelState.READY);
selector.close();
// Test good connection still works
@@ -608,8 +604,7 @@ public class SaslAuthenticatorTest {
configureMechanisms("PLAIN", Arrays.asList("DIGEST-MD5"));
server = createEchoServer(securityProtocol);
- createClientConnection(securityProtocol, node);
- NetworkTestUtils.waitForChannelClose(selector, node);
+ createAndCheckClientConnectionFailure(securityProtocol, node);
}
/**
@@ -623,8 +618,7 @@ public class SaslAuthenticatorTest {
saslClientConfigs.put(SaslConfigs.SASL_MECHANISM, "INVALID");
server = createEchoServer(securityProtocol);
- createClientConnection(securityProtocol, node);
- NetworkTestUtils.waitForChannelClose(selector, node);
+ createAndCheckClientConnectionFailure(securityProtocol, node);
}
/**
@@ -824,7 +818,7 @@ public class SaslAuthenticatorTest {
private void createAndCheckClientConnectionFailure(SecurityProtocol securityProtocol, String node) throws Exception {
createClientConnection(securityProtocol, node);
- NetworkTestUtils.waitForChannelClose(selector, node);
+ NetworkTestUtils.waitForChannelClose(selector, node, ChannelState.AUTHENTICATE);
selector.close();
selector = null;
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/4c75f31a/clients/src/test/java/org/apache/kafka/test/MockSelector.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/test/MockSelector.java b/clients/src/test/java/org/apache/kafka/test/MockSelector.java
index c1b2205..225aba4 100644
--- a/clients/src/test/java/org/apache/kafka/test/MockSelector.java
+++ b/clients/src/test/java/org/apache/kafka/test/MockSelector.java
@@ -19,9 +19,12 @@ package org.apache.kafka.test;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.ArrayList;
+import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
+import java.util.Map;
+import org.apache.kafka.common.network.ChannelState;
import org.apache.kafka.common.network.NetworkReceive;
import org.apache.kafka.common.network.NetworkSend;
import org.apache.kafka.common.network.Selectable;
@@ -37,7 +40,7 @@ public class MockSelector implements Selectable {
private final List<Send> initiatedSends = new ArrayList<Send>();
private final List<Send> completedSends = new ArrayList<Send>();
private final List<NetworkReceive> completedReceives = new ArrayList<NetworkReceive>();
- private final List<String> disconnected = new ArrayList<String>();
+ private final Map<String, ChannelState> disconnected = new HashMap<>();
private final List<String> connected = new ArrayList<String>();
private final List<DelayedReceive> delayedReceives = new ArrayList<>();
@@ -60,7 +63,7 @@ public class MockSelector implements Selectable {
@Override
public void close(String id) {
- this.disconnected.add(id);
+ this.disconnected.put(id, ChannelState.LOCAL_CLOSE);
for (int i = 0; i < this.connected.size(); i++) {
if (this.connected.get(i).equals(id)) {
this.connected.remove(i);
@@ -121,7 +124,7 @@ public class MockSelector implements Selectable {
}
@Override
- public List<String> disconnected() {
+ public Map<String, ChannelState> disconnected() {
return disconnected;
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/4c75f31a/core/src/main/scala/kafka/network/SocketServer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/network/SocketServer.scala b/core/src/main/scala/kafka/network/SocketServer.scala
index fb647fa..48d0233 100644
--- a/core/src/main/scala/kafka/network/SocketServer.scala
+++ b/core/src/main/scala/kafka/network/SocketServer.scala
@@ -544,7 +544,7 @@ private[kafka] class Processor(val id: Int,
}
private def processDisconnected() {
- selector.disconnected.asScala.foreach { connectionId =>
+ selector.disconnected.keySet.asScala.foreach { connectionId =>
val remoteHost = ConnectionId.fromString(connectionId).getOrElse {
throw new IllegalStateException(s"connectionId has unexpected format: $connectionId")
}.remoteHost