You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ne...@apache.org on 2014/04/01 07:36:53 UTC
git commit: KAFKA-1341 Client Selector doesn't check connection id
properly; reviewed by Jay Kreps and Neha Narkhede
Repository: kafka
Updated Branches:
refs/heads/trunk f82ce3314 -> 083b6265c
KAFKA-1341 Client Selector doesn't check connection id properly; reviewed by Jay Kreps and Neha Narkhede
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/083b6265
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/083b6265
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/083b6265
Branch: refs/heads/trunk
Commit: 083b6265c93619dd8b672c59fc31f0ed3d5da29b
Parents: f82ce33
Author: Timothy Chen <tn...@gmail.com>
Authored: Mon Mar 31 22:36:35 2014 -0700
Committer: Neha Narkhede <ne...@gmail.com>
Committed: Mon Mar 31 22:36:42 2014 -0700
----------------------------------------------------------------------
.../main/java/org/apache/kafka/common/network/Selector.java | 5 +++--
.../java/org/apache/kafka/common/network/SelectorTest.java | 6 ++++++
2 files changed, 9 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/083b6265/clients/src/main/java/org/apache/kafka/common/network/Selector.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/network/Selector.java b/clients/src/main/java/org/apache/kafka/common/network/Selector.java
index 5d93965..5b801e4 100644
--- a/clients/src/main/java/org/apache/kafka/common/network/Selector.java
+++ b/clients/src/main/java/org/apache/kafka/common/network/Selector.java
@@ -114,6 +114,9 @@ public class Selector implements Selectable {
*/
@Override
public void connect(int id, InetSocketAddress address, int sendBufferSize, int receiveBufferSize) throws IOException {
+ if (this.keys.containsKey(id))
+ throw new IllegalStateException("There is already a connection for id " + id);
+
SocketChannel channel = SocketChannel.open();
channel.configureBlocking(false);
Socket socket = channel.socket();
@@ -132,8 +135,6 @@ public class Selector implements Selectable {
}
SelectionKey key = channel.register(this.selector, SelectionKey.OP_CONNECT);
key.attach(new Transmissions(id));
- if (this.keys.containsKey(key))
- throw new IllegalStateException("There is already a connection for id " + id);
this.keys.put(id, key);
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/083b6265/clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java b/clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java
index 99856e9..5c5e3d4 100644
--- a/clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java
@@ -205,6 +205,12 @@ public class SelectorTest {
assertEquals("", blockingRequest(node, ""));
}
+ @Test(expected = IllegalStateException.class)
+ public void testExistingConnectionId() throws IOException {
+ blockingConnect(0);
+ blockingConnect(0);
+ }
+
private String blockingRequest(int node, String s) throws IOException {
selector.poll(1000L, asList(createSend(node, s)));
while (true) {