You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ne...@apache.org on 2014/04/01 07:36:53 UTC

git commit: KAFKA-1341 Client Selector doesn't check connection id properly; reviewed by Jay Kreps and Neha Narkhede

Repository: kafka
Updated Branches:
  refs/heads/trunk f82ce3314 -> 083b6265c


KAFKA-1341 Client Selector doesn't check connection id properly; reviewed by Jay Kreps and Neha Narkhede


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/083b6265
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/083b6265
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/083b6265

Branch: refs/heads/trunk
Commit: 083b6265c93619dd8b672c59fc31f0ed3d5da29b
Parents: f82ce33
Author: Timothy Chen <tn...@gmail.com>
Authored: Mon Mar 31 22:36:35 2014 -0700
Committer: Neha Narkhede <ne...@gmail.com>
Committed: Mon Mar 31 22:36:42 2014 -0700

----------------------------------------------------------------------
 .../main/java/org/apache/kafka/common/network/Selector.java    | 5 +++--
 .../java/org/apache/kafka/common/network/SelectorTest.java     | 6 ++++++
 2 files changed, 9 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/083b6265/clients/src/main/java/org/apache/kafka/common/network/Selector.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/network/Selector.java b/clients/src/main/java/org/apache/kafka/common/network/Selector.java
index 5d93965..5b801e4 100644
--- a/clients/src/main/java/org/apache/kafka/common/network/Selector.java
+++ b/clients/src/main/java/org/apache/kafka/common/network/Selector.java
@@ -114,6 +114,9 @@ public class Selector implements Selectable {
      */
     @Override
     public void connect(int id, InetSocketAddress address, int sendBufferSize, int receiveBufferSize) throws IOException {
+        if (this.keys.containsKey(id))
+            throw new IllegalStateException("There is already a connection for id " + id);
+
         SocketChannel channel = SocketChannel.open();
         channel.configureBlocking(false);
         Socket socket = channel.socket();
@@ -132,8 +135,6 @@ public class Selector implements Selectable {
         }
         SelectionKey key = channel.register(this.selector, SelectionKey.OP_CONNECT);
         key.attach(new Transmissions(id));
-        if (this.keys.containsKey(key))
-            throw new IllegalStateException("There is already a connection for id " + id);
         this.keys.put(id, key);
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/083b6265/clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java b/clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java
index 99856e9..5c5e3d4 100644
--- a/clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java
@@ -205,6 +205,12 @@ public class SelectorTest {
         assertEquals("", blockingRequest(node, ""));
     }
 
+    @Test(expected = IllegalStateException.class)
+    public void testExistingConnectionId() throws IOException {
+        blockingConnect(0);
+        blockingConnect(0);
+    }
+
     private String blockingRequest(int node, String s) throws IOException {
         selector.poll(1000L, asList(createSend(node, s)));
         while (true) {