You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rh...@apache.org on 2015/07/06 02:06:33 UTC

[4/4] qpid-proton git commit: PROTON-881: Make connect a non-blocking operation

PROTON-881: Make connect a non-blocking operation

Previously connecting was a blocking operation which meant that if the server
being connected to took a long time to accept the connection this blocked all
other work in an instance of the Reactor.

This commit makes connect a non-blocking operation, allowing the reactor to
continue processing other work while the connection is established (or not).

Unfortunately, I've not found a satisfactory way to test this behavior in the
test suite - because Java never blocks during connect if it is using the
loopback adapter.  Instead, to test the non-blocking connect code path, I had
to configure firewall rules to drop all packets sent to a particular port.


Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/e2d23691
Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/e2d23691
Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/e2d23691

Branch: refs/heads/master
Commit: e2d23691541b09f4efc59a32705a5306b179a0b0
Parents: 46edaeb
Author: Adrian Preston <pr...@uk.ibm.com>
Authored: Fri Jun 26 23:58:21 2015 +0100
Committer: Rafael Schloming <rh...@alum.mit.edu>
Committed: Sun Jul 5 19:57:39 2015 -0400

----------------------------------------------------------------------
 .../qpid/proton/reactor/impl/IOHandler.java     |  1 +
 .../qpid/proton/reactor/impl/SelectorImpl.java  | 70 ++++++++++++++++----
 .../apache/qpid/proton/reactor/ReactorTest.java | 30 +++++++++
 3 files changed, 89 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/e2d23691/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/IOHandler.java
----------------------------------------------------------------------
diff --git a/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/IOHandler.java b/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/IOHandler.java
index f810742..39d840e 100644
--- a/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/IOHandler.java
+++ b/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/IOHandler.java
@@ -103,6 +103,7 @@ public class IOHandler extends BaseHandler {
         Socket socket = null;   // In this case, 'null' is the proton-j equivalent of PN_INVALID_SOCKET
         try {
             SocketChannel socketChannel = ((ReactorImpl)reactor).getIO().socketChannel();
+            socketChannel.configureBlocking(false);
             socketChannel.connect(new InetSocketAddress(hostname, port));
             socket = socketChannel.socket();
         } catch(IOException ioException) {

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/e2d23691/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/SelectorImpl.java
----------------------------------------------------------------------
diff --git a/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/SelectorImpl.java b/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/SelectorImpl.java
index 1145158..5ef74e7 100644
--- a/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/SelectorImpl.java
+++ b/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/SelectorImpl.java
@@ -24,9 +24,13 @@ package org.apache.qpid.proton.reactor.impl;
 import java.io.IOException;
 import java.nio.channels.SelectionKey;
 import java.nio.channels.ServerSocketChannel;
+import java.nio.channels.SocketChannel;
 import java.util.HashSet;
 import java.util.Iterator;
 
+import org.apache.qpid.proton.amqp.Symbol;
+import org.apache.qpid.proton.amqp.transport.ErrorCondition;
+import org.apache.qpid.proton.engine.Transport;
 import org.apache.qpid.proton.reactor.Selectable;
 import org.apache.qpid.proton.reactor.Selector;
 
@@ -59,14 +63,19 @@ class SelectorImpl implements Selector {
     public void update(Selectable selectable) {
         if (selectable.getChannel() != null) {
             int interestedOps = 0;
-            if (selectable.isReading()) {
-                if (selectable.getChannel() instanceof ServerSocketChannel) {
-                    interestedOps |= SelectionKey.OP_ACCEPT;
-                } else {
-                    interestedOps |= SelectionKey.OP_READ;
+            if (selectable.getChannel() instanceof SocketChannel &&
+                    ((SocketChannel)selectable.getChannel()).isConnectionPending()) {
+                interestedOps |= SelectionKey.OP_CONNECT;
+            } else {
+                if (selectable.isReading()) {
+                    if (selectable.getChannel() instanceof ServerSocketChannel) {
+                        interestedOps |= SelectionKey.OP_ACCEPT;
+                    } else {
+                        interestedOps |= SelectionKey.OP_READ;
+                    }
                 }
+                if (selectable.isWriting()) interestedOps |= SelectionKey.OP_WRITE;
             }
-            if (selectable.isWriting()) interestedOps |= SelectionKey.OP_WRITE;
             SelectionKey key = selectable.getChannel().keyFor(selector);
             key.interestOps(interestedOps);
         }
@@ -76,14 +85,18 @@ class SelectorImpl implements Selector {
     public void remove(Selectable selectable) {
         if (selectable.getChannel() != null) {
             SelectionKey key = selectable.getChannel().keyFor(selector);
-            key.cancel();
-            key.attach(null);
+            if (key != null) {
+                key.cancel();
+                key.attach(null);
+            }
         }
         selectables.remove(selectable);
     }
 
     @Override
     public void select(long timeout) throws IOException {
+
+        long now = System.currentTimeMillis();
         if (timeout > 0) {
             long deadline = 0;
             for (Selectable selectable : selectables) {    // TODO: this differs from the C code which requires a call to update() to make deadline changes take affect
@@ -94,7 +107,6 @@ class SelectorImpl implements Selector {
             }
 
             if (deadline > 0) {
-                long now = System.currentTimeMillis();
                 long delta = deadline - now;
                 if (delta < 0) {
                     timeout = 0;
@@ -104,17 +116,51 @@ class SelectorImpl implements Selector {
             }
         }
 
+        error.clear();
+
+        long awoken = 0;
         if (timeout > 0) {
-            selector.select(timeout);
+            long remainingTimeout = timeout;
+            while(remainingTimeout > 0) {
+                selector.select(remainingTimeout);
+                awoken = System.currentTimeMillis();
+
+                for (Iterator<SelectionKey> iterator = selector.selectedKeys().iterator(); iterator.hasNext();) {
+                    SelectionKey key = iterator.next();
+                    if (key.isConnectable()) {
+                        try {
+                            ((SocketChannel)key.channel()).finishConnect();
+                            update((Selectable)key.attachment());
+                        } catch(IOException ioException) {
+                            Selectable selectable = (Selectable)key.attachment();
+                            ErrorCondition condition = new ErrorCondition();
+                            condition.setCondition(Symbol.getSymbol("proton:io"));
+                            condition.setDescription(ioException.getMessage());
+                            Transport transport = selectable.getTransport();
+                            if (transport != null) {
+                                transport.setCondition(condition);
+                                transport.close_tail();
+                                transport.close_head();
+                                transport.pop(transport.pending());
+                            }
+                            error.add(selectable);
+                        }
+                        iterator.remove();
+                    }
+                }
+                if (!selector.selectedKeys().isEmpty()) {
+                    break;
+                }
+                remainingTimeout = remainingTimeout - (awoken - now);
+            }
         } else {
             selector.selectNow();
+            awoken = System.currentTimeMillis();
         }
-        long awoken = System.currentTimeMillis();
 
         readable.clear();
         writeable.clear();
         expired.clear();
-        error.clear();  // TODO: nothing ever gets put in here...
         for (SelectionKey key : selector.selectedKeys()) {
             Selectable selectable = (Selectable)key.attachment();
             if (key.isReadable()) readable.add(selectable);

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/e2d23691/proton-j/src/test/java/org/apache/qpid/proton/reactor/ReactorTest.java
----------------------------------------------------------------------
diff --git a/proton-j/src/test/java/org/apache/qpid/proton/reactor/ReactorTest.java b/proton-j/src/test/java/org/apache/qpid/proton/reactor/ReactorTest.java
index 61e2761..2d81666 100644
--- a/proton-j/src/test/java/org/apache/qpid/proton/reactor/ReactorTest.java
+++ b/proton-j/src/test/java/org/apache/qpid/proton/reactor/ReactorTest.java
@@ -29,6 +29,7 @@ import static org.junit.Assert.assertSame;
 import static org.junit.Assert.assertTrue;
 
 import java.io.IOException;
+import java.net.ServerSocket;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
@@ -563,4 +564,33 @@ public class ReactorTest {
         assertReactorRunBarfsOnHandler(reactor, expectedBarf, expectedHandler);
         reactor.free();
     }
+
+    @Test
+    public void connectionRefused() throws IOException {
+        final ServerSocket serverSocket = new ServerSocket(0, 0);
+
+        class ConnectionHandler extends TestHandler {
+            @Override
+            public void onConnectionInit(Event event) {
+                super.onConnectionInit(event);
+                Connection connection = event.getConnection();
+                connection.setHostname("127.0.0.1:" + serverSocket.getLocalPort());
+                connection.open();
+                try {
+                    serverSocket.close();
+                } catch(IOException e) {
+                    AssertionFailedError afe = new AssertionFailedError();
+                    afe.initCause(e);
+                    throw afe;
+                }
+            }
+        }
+        TestHandler connectionHandler = new ConnectionHandler();
+        reactor.connection(connectionHandler);
+        reactor.run();
+        reactor.free();
+        serverSocket.close();
+        connectionHandler.assertEvents(Type.CONNECTION_INIT, Type.CONNECTION_LOCAL_OPEN, Type.CONNECTION_BOUND, Type.TRANSPORT_ERROR, Type.TRANSPORT_TAIL_CLOSED,
+                Type.TRANSPORT_HEAD_CLOSED, Type.TRANSPORT_CLOSED, Type.CONNECTION_UNBOUND, Type.TRANSPORT);
+    }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org