You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ac...@apache.org on 2015/07/10 00:12:06 UTC
[26/50] qpid-proton git commit: PROTON-881: Make connect a
non-blocking operation
PROTON-881: Make connect a non-blocking operation
Previously connecting was a blocking operation which meant that if the server
being connected to took a long time to accept the connection this blocked all
other work in an instance of the Reactor.
This commit makes connect a non-blocking operation, allowing the reactor to
continue processing other work while the connection is established (or not).
Unfortunately, I've not found a satisfactory way to test this behavior in the
test suite - because Java never blocks during connect if it is using the
loopback adapter. Instead, to test the non-blocking connect code path, I had
to configure firewall rules to drop all packets sent to a particular port.
Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/e2d23691
Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/e2d23691
Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/e2d23691
Branch: refs/heads/cjansen-cpp-client
Commit: e2d23691541b09f4efc59a32705a5306b179a0b0
Parents: 46edaeb
Author: Adrian Preston <pr...@uk.ibm.com>
Authored: Fri Jun 26 23:58:21 2015 +0100
Committer: Rafael Schloming <rh...@alum.mit.edu>
Committed: Sun Jul 5 19:57:39 2015 -0400
----------------------------------------------------------------------
.../qpid/proton/reactor/impl/IOHandler.java | 1 +
.../qpid/proton/reactor/impl/SelectorImpl.java | 70 ++++++++++++++++----
.../apache/qpid/proton/reactor/ReactorTest.java | 30 +++++++++
3 files changed, 89 insertions(+), 12 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/e2d23691/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/IOHandler.java
----------------------------------------------------------------------
diff --git a/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/IOHandler.java b/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/IOHandler.java
index f810742..39d840e 100644
--- a/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/IOHandler.java
+++ b/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/IOHandler.java
@@ -103,6 +103,7 @@ public class IOHandler extends BaseHandler {
Socket socket = null; // In this case, 'null' is the proton-j equivalent of PN_INVALID_SOCKET
try {
SocketChannel socketChannel = ((ReactorImpl)reactor).getIO().socketChannel();
+ socketChannel.configureBlocking(false);
socketChannel.connect(new InetSocketAddress(hostname, port));
socket = socketChannel.socket();
} catch(IOException ioException) {
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/e2d23691/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/SelectorImpl.java
----------------------------------------------------------------------
diff --git a/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/SelectorImpl.java b/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/SelectorImpl.java
index 1145158..5ef74e7 100644
--- a/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/SelectorImpl.java
+++ b/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/SelectorImpl.java
@@ -24,9 +24,13 @@ package org.apache.qpid.proton.reactor.impl;
import java.io.IOException;
import java.nio.channels.SelectionKey;
import java.nio.channels.ServerSocketChannel;
+import java.nio.channels.SocketChannel;
import java.util.HashSet;
import java.util.Iterator;
+import org.apache.qpid.proton.amqp.Symbol;
+import org.apache.qpid.proton.amqp.transport.ErrorCondition;
+import org.apache.qpid.proton.engine.Transport;
import org.apache.qpid.proton.reactor.Selectable;
import org.apache.qpid.proton.reactor.Selector;
@@ -59,14 +63,19 @@ class SelectorImpl implements Selector {
public void update(Selectable selectable) {
if (selectable.getChannel() != null) {
int interestedOps = 0;
- if (selectable.isReading()) {
- if (selectable.getChannel() instanceof ServerSocketChannel) {
- interestedOps |= SelectionKey.OP_ACCEPT;
- } else {
- interestedOps |= SelectionKey.OP_READ;
+ if (selectable.getChannel() instanceof SocketChannel &&
+ ((SocketChannel)selectable.getChannel()).isConnectionPending()) {
+ interestedOps |= SelectionKey.OP_CONNECT;
+ } else {
+ if (selectable.isReading()) {
+ if (selectable.getChannel() instanceof ServerSocketChannel) {
+ interestedOps |= SelectionKey.OP_ACCEPT;
+ } else {
+ interestedOps |= SelectionKey.OP_READ;
+ }
}
+ if (selectable.isWriting()) interestedOps |= SelectionKey.OP_WRITE;
}
- if (selectable.isWriting()) interestedOps |= SelectionKey.OP_WRITE;
SelectionKey key = selectable.getChannel().keyFor(selector);
key.interestOps(interestedOps);
}
@@ -76,14 +85,18 @@ class SelectorImpl implements Selector {
public void remove(Selectable selectable) {
if (selectable.getChannel() != null) {
SelectionKey key = selectable.getChannel().keyFor(selector);
- key.cancel();
- key.attach(null);
+ if (key != null) {
+ key.cancel();
+ key.attach(null);
+ }
}
selectables.remove(selectable);
}
@Override
public void select(long timeout) throws IOException {
+
+ long now = System.currentTimeMillis();
if (timeout > 0) {
long deadline = 0;
for (Selectable selectable : selectables) { // TODO: this differs from the C code which requires a call to update() to make deadline changes take affect
@@ -94,7 +107,6 @@ class SelectorImpl implements Selector {
}
if (deadline > 0) {
- long now = System.currentTimeMillis();
long delta = deadline - now;
if (delta < 0) {
timeout = 0;
@@ -104,17 +116,51 @@ class SelectorImpl implements Selector {
}
}
+ error.clear();
+
+ long awoken = 0;
if (timeout > 0) {
- selector.select(timeout);
+ long remainingTimeout = timeout;
+ while(remainingTimeout > 0) {
+ selector.select(remainingTimeout);
+ awoken = System.currentTimeMillis();
+
+ for (Iterator<SelectionKey> iterator = selector.selectedKeys().iterator(); iterator.hasNext();) {
+ SelectionKey key = iterator.next();
+ if (key.isConnectable()) {
+ try {
+ ((SocketChannel)key.channel()).finishConnect();
+ update((Selectable)key.attachment());
+ } catch(IOException ioException) {
+ Selectable selectable = (Selectable)key.attachment();
+ ErrorCondition condition = new ErrorCondition();
+ condition.setCondition(Symbol.getSymbol("proton:io"));
+ condition.setDescription(ioException.getMessage());
+ Transport transport = selectable.getTransport();
+ if (transport != null) {
+ transport.setCondition(condition);
+ transport.close_tail();
+ transport.close_head();
+ transport.pop(transport.pending());
+ }
+ error.add(selectable);
+ }
+ iterator.remove();
+ }
+ }
+ if (!selector.selectedKeys().isEmpty()) {
+ break;
+ }
+ remainingTimeout = remainingTimeout - (awoken - now);
+ }
} else {
selector.selectNow();
+ awoken = System.currentTimeMillis();
}
- long awoken = System.currentTimeMillis();
readable.clear();
writeable.clear();
expired.clear();
- error.clear(); // TODO: nothing ever gets put in here...
for (SelectionKey key : selector.selectedKeys()) {
Selectable selectable = (Selectable)key.attachment();
if (key.isReadable()) readable.add(selectable);
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/e2d23691/proton-j/src/test/java/org/apache/qpid/proton/reactor/ReactorTest.java
----------------------------------------------------------------------
diff --git a/proton-j/src/test/java/org/apache/qpid/proton/reactor/ReactorTest.java b/proton-j/src/test/java/org/apache/qpid/proton/reactor/ReactorTest.java
index 61e2761..2d81666 100644
--- a/proton-j/src/test/java/org/apache/qpid/proton/reactor/ReactorTest.java
+++ b/proton-j/src/test/java/org/apache/qpid/proton/reactor/ReactorTest.java
@@ -29,6 +29,7 @@ import static org.junit.Assert.assertSame;
import static org.junit.Assert.assertTrue;
import java.io.IOException;
+import java.net.ServerSocket;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
@@ -563,4 +564,33 @@ public class ReactorTest {
assertReactorRunBarfsOnHandler(reactor, expectedBarf, expectedHandler);
reactor.free();
}
+
+ @Test
+ public void connectionRefused() throws IOException {
+ final ServerSocket serverSocket = new ServerSocket(0, 0);
+
+ class ConnectionHandler extends TestHandler {
+ @Override
+ public void onConnectionInit(Event event) {
+ super.onConnectionInit(event);
+ Connection connection = event.getConnection();
+ connection.setHostname("127.0.0.1:" + serverSocket.getLocalPort());
+ connection.open();
+ try {
+ serverSocket.close();
+ } catch(IOException e) {
+ AssertionFailedError afe = new AssertionFailedError();
+ afe.initCause(e);
+ throw afe;
+ }
+ }
+ }
+ TestHandler connectionHandler = new ConnectionHandler();
+ reactor.connection(connectionHandler);
+ reactor.run();
+ reactor.free();
+ serverSocket.close();
+ connectionHandler.assertEvents(Type.CONNECTION_INIT, Type.CONNECTION_LOCAL_OPEN, Type.CONNECTION_BOUND, Type.TRANSPORT_ERROR, Type.TRANSPORT_TAIL_CLOSED,
+ Type.TRANSPORT_HEAD_CLOSED, Type.TRANSPORT_CLOSED, Type.CONNECTION_UNBOUND, Type.TRANSPORT);
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org