You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rh...@apache.org on 2015/07/06 01:45:14 UTC
[13/38] qpid-proton git commit: PROTON-881: Add reactor unit tests
based on those in proton-c/src/tests/reactor.c
PROTON-881: Add reactor unit tests based on those in proton-c/src/tests/reactor.c
Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/e9d4a78d
Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/e9d4a78d
Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/e9d4a78d
Branch: refs/heads/master
Commit: e9d4a78d294f08e7eb1bd7a3f28bd3f97ce6b9df
Parents: 5748bb9
Author: Adrian Preston <pr...@uk.ibm.com>
Authored: Mon May 4 21:01:19 2015 +0100
Committer: Adrian Preston <pr...@uk.ibm.com>
Committed: Wed May 6 23:24:48 2015 +0100
----------------------------------------------------------------------
.../org/apache/qpid/proton/reactor/Reactor.java | 4 +-
.../qpid/proton/reactor/impl/AcceptorImpl.java | 6 +
.../qpid/proton/reactor/impl/ReactorImpl.java | 15 ++
.../apache/qpid/proton/reactor/ReactorTest.java | 243 ++++++++++++++++++-
4 files changed, 263 insertions(+), 5 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/e9d4a78d/proton-j/src/main/java/org/apache/qpid/proton/reactor/Reactor.java
----------------------------------------------------------------------
diff --git a/proton-j/src/main/java/org/apache/qpid/proton/reactor/Reactor.java b/proton-j/src/main/java/org/apache/qpid/proton/reactor/Reactor.java
index 68375b1..935523a 100644
--- a/proton-j/src/main/java/org/apache/qpid/proton/reactor/Reactor.java
+++ b/proton-j/src/main/java/org/apache/qpid/proton/reactor/Reactor.java
@@ -93,11 +93,11 @@ public interface Reactor {
// pn_reactor_schedule from reactor.c
public Task schedule(int delay, Handler handler);
- // TODO: acceptor
- // TODO: acceptorClose
Connection connection(Handler handler);
Acceptor acceptor(String host, int port) throws IOException;
Acceptor acceptor(String host, int port, Handler handler) throws IOException;
+
+ public void free();
}
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/e9d4a78d/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/AcceptorImpl.java
----------------------------------------------------------------------
diff --git a/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/AcceptorImpl.java b/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/AcceptorImpl.java
index 7084dfb..fb48df6 100644
--- a/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/AcceptorImpl.java
+++ b/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/AcceptorImpl.java
@@ -122,4 +122,10 @@ public class AcceptorImpl implements Acceptor {
public void add(Handler handler) {
sel.add(handler);
}
+
+ // Used for unit tests, where acceptor is bound to an ephemeral port
+ public int getPortNumber() throws IOException {
+ ServerSocketChannel ssc = (ServerSocketChannel)sel.getChannel();
+ return ((InetSocketAddress)ssc.getLocalAddress()).getPort();
+ }
}
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/e9d4a78d/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/ReactorImpl.java
----------------------------------------------------------------------
diff --git a/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/ReactorImpl.java b/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/ReactorImpl.java
index a3180ab..e25e813 100644
--- a/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/ReactorImpl.java
+++ b/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/ReactorImpl.java
@@ -117,6 +117,20 @@ public class ReactorImpl implements Reactor {
wakeup = Pipe.open();
mark();
}
+
+ @Override
+ public void free() {
+ // TODO
+/*
+ 132 void pn_reactor_free(pn_reactor_t *reactor) {
+ 133 if (reactor) {
+ 134 pn_collector_release(reactor->collector);
+ 135 pn_handler_free(reactor->handler);
+ 136 reactor->handler = NULL;
+ 137 pn_decref(reactor);
+ 138 }
+ 139 }
+ */
/*
85 static void pn_reactor_finalize(pn_reactor_t *reactor) {
86 for (int i = 0; i < 2; i++) {
@@ -133,6 +147,7 @@ public class ReactorImpl implements Reactor {
97 pn_decref(reactor->io);
98 }
*/
+ }
@Override
public void attach(Object attachment) {
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/e9d4a78d/proton-j/src/test/java/org/apache/qpid/proton/reactor/ReactorTest.java
----------------------------------------------------------------------
diff --git a/proton-j/src/test/java/org/apache/qpid/proton/reactor/ReactorTest.java b/proton-j/src/test/java/org/apache/qpid/proton/reactor/ReactorTest.java
index 2378d5f..0bcfd80 100644
--- a/proton-j/src/test/java/org/apache/qpid/proton/reactor/ReactorTest.java
+++ b/proton-j/src/test/java/org/apache/qpid/proton/reactor/ReactorTest.java
@@ -1,6 +1,7 @@
package org.apache.qpid.proton.reactor;
import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
@@ -10,8 +11,14 @@ import java.util.ArrayList;
import org.apache.qpid.proton.Proton;
import org.apache.qpid.proton.engine.BaseHandler;
+import org.apache.qpid.proton.engine.Connection;
+import org.apache.qpid.proton.engine.Delivery;
import org.apache.qpid.proton.engine.Event;
import org.apache.qpid.proton.engine.Event.Type;
+import org.apache.qpid.proton.engine.Handler;
+import org.apache.qpid.proton.engine.Sender;
+import org.apache.qpid.proton.engine.Session;
+import org.apache.qpid.proton.reactor.impl.AcceptorImpl;
import org.junit.Test;
public class ReactorTest {
@@ -45,7 +52,49 @@ public class ReactorTest {
}
/**
- * Tests basic operation of the Reactor.acceptor method by creating an acceptor
+ * Tests adding a handler to a reactor and running the reactor. The
+ * expected behaviour is for the reactor to return, and a number of reactor-
+ * related events to have been delivered to the handler.
+ * @throws IOException
+ */
+ @Test
+ public void handlerRun() throws IOException {
+ Reactor reactor = Proton.reactor();
+ Handler handler = reactor.getHandler();
+ assertNotNull(handler);
+ TestHandler testHandler = new TestHandler();
+ handler.add(testHandler);
+ reactor.run();
+ testHandler.assertEvents(Type.REACTOR_INIT, Type.SELECTABLE_INIT, Type.SELECTABLE_UPDATED, Type.SELECTABLE_FINAL, Type.REACTOR_FINAL);
+ }
+
+ /**
+ * Tests basic operation of the Reactor.connection method by creating a
+ * connection from a reactor, then running the reactor. The expected behaviour
+ * is for:
+ * <ul>
+ * <li>The reactor to end immediately.</li>
+ * <li>The handler associated with the connection receives an init event.</li>
+ * <li>The connection is one of the reactor's children.</li>
+ * </ul>
+ * @throws IOException
+ */
+ @Test
+ public void connection() throws IOException {
+ Reactor reactor = Proton.reactor();
+ TestHandler connectionHandler = new TestHandler();
+ Connection connection = reactor.connection(connectionHandler);
+ assertNotNull(connection);
+ assertTrue("connection should be one of the reactor's children", reactor.children().contains(connection));
+ TestHandler reactorHandler = new TestHandler();
+ reactor.getHandler().add(reactorHandler);
+ reactor.run();
+ reactorHandler.assertEvents(Type.REACTOR_INIT, Type.SELECTABLE_INIT, Type.SELECTABLE_UPDATED, Type.SELECTABLE_FINAL, Type.REACTOR_FINAL);
+ connectionHandler.assertEvents(Type.CONNECTION_INIT);
+ }
+
+ /**
+ * Tests operation of the Reactor.acceptor method by creating an acceptor
* which is immediately closed by the reactor. The expected behaviour is for:
* <ul>
* <li>The reactor to end immediately (as it has no more work to process).</li>
@@ -56,9 +105,9 @@ public class ReactorTest {
* @throws IOException
*/
@Test
- public void basicAcceptor() throws IOException {
+ public void acceptor() throws IOException {
Reactor reactor = Proton.reactor();
- final Acceptor acceptor = reactor.acceptor("localhost", 0);
+ final Acceptor acceptor = reactor.acceptor("127.0.0.1", 0);
assertNotNull(acceptor);
assertTrue("acceptor should be one of the reactor's children", reactor.children().contains(acceptor));
TestHandler acceptorHandler = new TestHandler();
@@ -73,4 +122,192 @@ public class ReactorTest {
acceptorHandler.assertEvents(Type.SELECTABLE_INIT, Type.SELECTABLE_UPDATED, Type.SELECTABLE_FINAL);
assertFalse("acceptor should have been removed from the reactor's children", reactor.children().contains(acceptor));
}
+
+ private static class ServerHandler extends TestHandler {
+ private Acceptor acceptor;
+ public void setAcceptor(Acceptor acceptor) {
+ this.acceptor = acceptor;
+ }
+ @Override
+ public void onConnectionRemoteOpen(Event event) {
+ super.onConnectionRemoteOpen(event);
+ event.getConnection().open();
+ }
+ @Override
+ public void onConnectionRemoteClose(Event event) {
+ super.onConnectionRemoteClose(event);
+ acceptor.close();
+ event.getConnection().close();
+ event.getConnection().free();
+ }
+ }
+
+ /**
+ * Tests end to end behaviour of the reactor by creating an acceptor and then
+ * a connection (which connects to the port the acceptor is listening on).
+ * As soon as the connection is established, both the acceptor and connection
+ * are closed. The events generated by the acceptor and the connection are
+ * compared to a set of expected events.
+ * @throws IOException
+ */
+ @Test
+ public void connect() throws IOException {
+ Reactor reactor = Proton.reactor();
+
+ ServerHandler sh = new ServerHandler();
+ Acceptor acceptor = reactor.acceptor("127.0.0.1", 0, sh);
+ final int listeningPort = ((AcceptorImpl)acceptor).getPortNumber();
+ sh.setAcceptor(acceptor);
+
+ class ClientHandler extends TestHandler {
+ @Override
+ public void onConnectionInit(Event event) {
+ super.onConnectionInit(event);
+ event.getConnection().setHostname("127.0.0.1:" + listeningPort);
+ event.getConnection().open();
+ }
+ @Override
+ public void onConnectionRemoteOpen(Event event) {
+ super.onConnectionRemoteOpen(event);
+ event.getConnection().close();
+ }
+ @Override
+ public void onConnectionRemoteClose(Event event) {
+ super.onConnectionRemoteClose(event);
+ event.getConnection().free();
+ }
+ }
+ ClientHandler ch = new ClientHandler();
+ Connection connection = reactor.connection(ch);
+
+ assertTrue("acceptor should be one of the reactor's children", reactor.children().contains(acceptor));
+ assertTrue("connection should be one of the reactor's children", reactor.children().contains(connection));
+
+ reactor.run();
+
+ assertFalse("acceptor should have been removed from the reactor's children", reactor.children().contains(acceptor));
+ assertFalse("connection should have been removed from the reactor's children", reactor.children().contains(connection));
+ sh.assertEvents(Type.CONNECTION_INIT, Type.CONNECTION_BOUND,
+ // XXX: proton-c generates a PN_TRANSPORT event here
+ Type.CONNECTION_REMOTE_OPEN, Type.CONNECTION_LOCAL_OPEN,
+ Type.TRANSPORT, Type.CONNECTION_REMOTE_CLOSE,
+ Type.TRANSPORT_TAIL_CLOSED, Type.CONNECTION_LOCAL_CLOSE,
+ Type.TRANSPORT, Type.TRANSPORT_HEAD_CLOSED,
+ Type.TRANSPORT_CLOSED, Type.CONNECTION_UNBOUND,
+ Type.CONNECTION_FINAL);
+
+ ch.assertEvents(Type.CONNECTION_INIT, Type.CONNECTION_LOCAL_OPEN,
+ Type.CONNECTION_BOUND,
+ // XXX: proton-c generates two PN_TRANSPORT events here
+ Type.CONNECTION_REMOTE_OPEN, Type.CONNECTION_LOCAL_CLOSE,
+ Type.TRANSPORT, Type.TRANSPORT_HEAD_CLOSED,
+ Type.CONNECTION_REMOTE_CLOSE, Type.TRANSPORT_TAIL_CLOSED,
+ Type.TRANSPORT_CLOSED, Type.CONNECTION_UNBOUND,
+ Type.CONNECTION_FINAL);
+
+ }
+
+ private static class SinkHandler extends BaseHandler {
+ protected int received = 0;
+
+ @Override
+ public void onDelivery(Event event) {
+ Delivery dlv = event.getDelivery();
+ if (!dlv.isPartial()) {
+ dlv.settle();
+ ++received;
+ }
+ }
+ }
+
+ private static class SourceHandler extends BaseHandler {
+ private int remaining;
+ private final int port;
+
+ protected SourceHandler(int count, int port) {
+ remaining = count;
+ this.port = port;
+ }
+
+ @Override
+ public void onConnectionInit(Event event) {
+ Connection conn = event.getConnection();
+ conn.setHostname("127.0.0.1:" + port);
+ Session ssn = conn.session();
+ Sender snd = ssn.sender("sender");
+ conn.open();
+ ssn.open();
+ snd.open();
+ }
+
+ @Override
+ public void onLinkFlow(Event event) {
+ Sender link = (Sender)event.getLink();
+ while (link.getCredit() > 0 && remaining > 0) {
+ Delivery dlv = link.delivery(new byte[0]);
+ assertNotNull(dlv);
+ dlv.settle();
+ link.advance();
+ --remaining;
+ }
+
+ if (remaining == 0) {
+ event.getConnection().close();
+ }
+ }
+
+ @Override
+ public void onConnectionRemoteClose(Event event) {
+ event.getConnection().free();
+ }
+ }
+
+ private void transfer(int count, int window) throws IOException {
+ Reactor reactor = Proton.reactor();
+ ServerHandler sh = new ServerHandler();
+ Acceptor acceptor = reactor.acceptor("127.0.0.1", 0, sh);
+ sh.setAcceptor(acceptor);
+ sh.add(new Handshaker());
+ // XXX: a window of 1 doesn't work unless the flowcontroller is
+ // added after the thing that settles the delivery
+ sh.add(new FlowController(window));
+ SinkHandler snk = new SinkHandler();
+ sh.add(snk);
+
+ SourceHandler src = new SourceHandler(count, ((AcceptorImpl)acceptor).getPortNumber());
+ reactor.connection(src);
+
+ reactor.run();
+ assertEquals("Did not receive the expected number of messages", count, snk.received);
+ }
+
+ @Test
+ public void transfer_0to64_2() throws IOException {
+ for (int i = 0; i < 64; ++i) {
+ transfer(i, 2);
+ }
+ }
+
+ @Test
+ public void transfer_1024_64() throws IOException {
+ transfer(1024, 64);
+ }
+
+ @Test
+ public void transfer_4096_1024() throws IOException {
+ transfer(4*1024, 1024);
+ }
+
+ @Test
+ public void schedule() throws IOException {
+ Reactor reactor = Proton.reactor();
+ TestHandler reactorHandler = new TestHandler();
+ reactor.getHandler().add(reactorHandler);
+ TestHandler taskHandler = new TestHandler();
+ reactor.schedule(0, taskHandler);
+ reactor.run();
+ reactorHandler.assertEvents(Type.REACTOR_INIT, Type.SELECTABLE_INIT, Type.SELECTABLE_UPDATED, Type.REACTOR_QUIESCED, Type.SELECTABLE_UPDATED,
+ Type.SELECTABLE_FINAL, Type.REACTOR_FINAL);
+ taskHandler.assertEvents(Type.TIMER_TASK);
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org