You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ij...@apache.org on 2017/06/14 00:59:29 UTC
kafka git commit: MINOR: NetworkClient#disconnect should not erase
connection info
Repository: kafka
Updated Branches:
refs/heads/trunk 004dde9e7 -> d099c7495
MINOR: NetworkClient#disconnect should not erase connection info
NetworkClient#disconnect should not erase the connection information. This will allow exponential
backoff to occur.
Author: Colin P. Mccabe <cm...@confluent.io>
Reviewers: Ismael Juma <is...@juma.me.uk>
Closes #3309 from cmccabe/disc
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/d099c749
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/d099c749
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/d099c749
Branch: refs/heads/trunk
Commit: d099c749591ac0e819017a9d3ae48144ddbbff0d
Parents: 004dde9
Author: Colin P. Mccabe <cm...@confluent.io>
Authored: Wed Jun 14 01:59:24 2017 +0100
Committer: Ismael Juma <is...@juma.me.uk>
Committed: Wed Jun 14 01:59:24 2017 +0100
----------------------------------------------------------------------
.../org/apache/kafka/clients/NetworkClient.java | 2 +-
.../apache/kafka/clients/NetworkClientTest.java | 17 ++++++++++++++++-
2 files changed, 17 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/d099c749/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java b/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
index a0730ca..af96575 100644
--- a/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
@@ -249,7 +249,7 @@ public class NetworkClient implements KafkaClient {
true, null, null));
}
}
- connectionStates.remove(nodeId);
+ connectionStates.disconnected(nodeId, now);
if (log.isDebugEnabled()) {
log.debug("Manually disconnected from {}. Removed requests: {}.", nodeId,
Utils.join(requestTypes, ", "));
http://git-wip-us.apache.org/repos/asf/kafka/blob/d099c749/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java b/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java
index 0de76a1..77960e1 100644
--- a/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java
@@ -29,6 +29,7 @@ import org.apache.kafka.common.requests.MetadataRequest;
import org.apache.kafka.common.requests.ProduceRequest;
import org.apache.kafka.common.requests.ResponseHeader;
import org.apache.kafka.common.utils.MockTime;
+import org.apache.kafka.common.utils.Time;
import org.apache.kafka.test.DelayedReceive;
import org.apache.kafka.test.MockSelector;
import org.apache.kafka.test.TestUtils;
@@ -264,7 +265,21 @@ public class NetworkClientTest {
assertEquals(1, responses.size());
assertTrue(responses.iterator().next().wasDisconnected());
}
-
+
+ @Test
+ public void testCallDisconnect() throws Exception {
+ awaitReady(client, node);
+ assertTrue("Expected NetworkClient to be ready to send to node " + node.idString(),
+ client.isReady(node, Time.SYSTEM.milliseconds()));
+ assertFalse("Did not expect connection to node " + node.idString() + " to be failed",
+ client.connectionFailed(node));
+ client.disconnect(node.idString());
+ assertFalse("Expected node " + node.idString() + " to be disconnected.",
+ client.isReady(node, Time.SYSTEM.milliseconds()));
+ assertTrue("Expected connection to node " + node.idString() + " to be failed after disconnect",
+ client.connectionFailed(node));
+ }
+
private static class TestCallbackHandler implements RequestCompletionHandler {
public boolean executed = false;
public ClientResponse response;