You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rh...@apache.org on 2015/07/06 01:45:14 UTC

[13/38] qpid-proton git commit: PROTON-881: Add reactor unit tests based on those in proton-c/src/tests/reactor.c

PROTON-881: Add reactor unit tests based on those in proton-c/src/tests/reactor.c


Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/e9d4a78d
Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/e9d4a78d
Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/e9d4a78d

Branch: refs/heads/master
Commit: e9d4a78d294f08e7eb1bd7a3f28bd3f97ce6b9df
Parents: 5748bb9
Author: Adrian Preston <pr...@uk.ibm.com>
Authored: Mon May 4 21:01:19 2015 +0100
Committer: Adrian Preston <pr...@uk.ibm.com>
Committed: Wed May 6 23:24:48 2015 +0100

----------------------------------------------------------------------
 .../org/apache/qpid/proton/reactor/Reactor.java |   4 +-
 .../qpid/proton/reactor/impl/AcceptorImpl.java  |   6 +
 .../qpid/proton/reactor/impl/ReactorImpl.java   |  15 ++
 .../apache/qpid/proton/reactor/ReactorTest.java | 243 ++++++++++++++++++-
 4 files changed, 263 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/e9d4a78d/proton-j/src/main/java/org/apache/qpid/proton/reactor/Reactor.java
----------------------------------------------------------------------
diff --git a/proton-j/src/main/java/org/apache/qpid/proton/reactor/Reactor.java b/proton-j/src/main/java/org/apache/qpid/proton/reactor/Reactor.java
index 68375b1..935523a 100644
--- a/proton-j/src/main/java/org/apache/qpid/proton/reactor/Reactor.java
+++ b/proton-j/src/main/java/org/apache/qpid/proton/reactor/Reactor.java
@@ -93,11 +93,11 @@ public interface Reactor {
 
     // pn_reactor_schedule from reactor.c
     public Task schedule(int delay, Handler handler);
-    // TODO: acceptor
-    // TODO: acceptorClose
 
     Connection connection(Handler handler);
 
     Acceptor acceptor(String host, int port) throws IOException;
     Acceptor acceptor(String host, int port, Handler handler) throws IOException;
+
+    public void free();
 }

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/e9d4a78d/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/AcceptorImpl.java
----------------------------------------------------------------------
diff --git a/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/AcceptorImpl.java b/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/AcceptorImpl.java
index 7084dfb..fb48df6 100644
--- a/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/AcceptorImpl.java
+++ b/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/AcceptorImpl.java
@@ -122,4 +122,10 @@ public class AcceptorImpl implements Acceptor {
     public void add(Handler handler) {
         sel.add(handler);
     }
+
+    // Used for unit tests, where acceptor is bound to an ephemeral port
+    public int getPortNumber() throws IOException {
+        ServerSocketChannel ssc = (ServerSocketChannel)sel.getChannel();
+        return ((InetSocketAddress)ssc.getLocalAddress()).getPort();
+    }
 }

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/e9d4a78d/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/ReactorImpl.java
----------------------------------------------------------------------
diff --git a/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/ReactorImpl.java b/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/ReactorImpl.java
index a3180ab..e25e813 100644
--- a/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/ReactorImpl.java
+++ b/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/ReactorImpl.java
@@ -117,6 +117,20 @@ public class ReactorImpl implements Reactor {
         wakeup = Pipe.open();
         mark();
     }
+
+    @Override
+    public void free() {
+        // TODO
+/*
+  132 void pn_reactor_free(pn_reactor_t *reactor) {
+  133   if (reactor) {
+  134     pn_collector_release(reactor->collector);
+  135     pn_handler_free(reactor->handler);
+  136     reactor->handler = NULL;
+  137     pn_decref(reactor);
+  138   }
+  139 }
+ */
     /*
  85 static void pn_reactor_finalize(pn_reactor_t *reactor) {
  86   for (int i = 0; i < 2; i++) {
@@ -133,6 +147,7 @@ public class ReactorImpl implements Reactor {
  97   pn_decref(reactor->io);
  98 }
  */
+    }
 
     @Override
     public void attach(Object attachment) {

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/e9d4a78d/proton-j/src/test/java/org/apache/qpid/proton/reactor/ReactorTest.java
----------------------------------------------------------------------
diff --git a/proton-j/src/test/java/org/apache/qpid/proton/reactor/ReactorTest.java b/proton-j/src/test/java/org/apache/qpid/proton/reactor/ReactorTest.java
index 2378d5f..0bcfd80 100644
--- a/proton-j/src/test/java/org/apache/qpid/proton/reactor/ReactorTest.java
+++ b/proton-j/src/test/java/org/apache/qpid/proton/reactor/ReactorTest.java
@@ -1,6 +1,7 @@
 package org.apache.qpid.proton.reactor;
 
 import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
@@ -10,8 +11,14 @@ import java.util.ArrayList;
 
 import org.apache.qpid.proton.Proton;
 import org.apache.qpid.proton.engine.BaseHandler;
+import org.apache.qpid.proton.engine.Connection;
+import org.apache.qpid.proton.engine.Delivery;
 import org.apache.qpid.proton.engine.Event;
 import org.apache.qpid.proton.engine.Event.Type;
+import org.apache.qpid.proton.engine.Handler;
+import org.apache.qpid.proton.engine.Sender;
+import org.apache.qpid.proton.engine.Session;
+import org.apache.qpid.proton.reactor.impl.AcceptorImpl;
 import org.junit.Test;
 
 public class ReactorTest {
@@ -45,7 +52,49 @@ public class ReactorTest {
     }
 
     /**
-     * Tests basic operation of the Reactor.acceptor method by creating an acceptor
+     * Tests adding a handler to a reactor and running the reactor.  The
+     * expected behaviour is for the reactor to return, and a number of reactor-
+     * related events to have been delivered to the handler.
+     * @throws IOException
+     */
+    @Test
+    public void handlerRun() throws IOException {
+        Reactor reactor = Proton.reactor();
+        Handler handler = reactor.getHandler();
+        assertNotNull(handler);
+        TestHandler testHandler = new TestHandler();
+        handler.add(testHandler);
+        reactor.run();
+        testHandler.assertEvents(Type.REACTOR_INIT, Type.SELECTABLE_INIT, Type.SELECTABLE_UPDATED, Type.SELECTABLE_FINAL, Type.REACTOR_FINAL);
+    }
+
+    /**
+     * Tests basic operation of the Reactor.connection method by creating a
+     * connection from a reactor, then running the reactor.  The expected behaviour
+     * is for:
+     * <ul>
+     *   <li>The reactor to end immediately.</li>
+     *   <li>The handler associated with the connection receives an init event.</li>
+     *   <li>The connection is one of the reactor's children.</li>
+     * </ul>
+     * @throws IOException
+     */
+    @Test
+    public void connection() throws IOException {
+        Reactor reactor = Proton.reactor();
+        TestHandler connectionHandler = new TestHandler();
+        Connection connection = reactor.connection(connectionHandler);
+        assertNotNull(connection);
+        assertTrue("connection should be one of the reactor's children", reactor.children().contains(connection));
+        TestHandler reactorHandler = new TestHandler();
+        reactor.getHandler().add(reactorHandler);
+        reactor.run();
+        reactorHandler.assertEvents(Type.REACTOR_INIT, Type.SELECTABLE_INIT, Type.SELECTABLE_UPDATED, Type.SELECTABLE_FINAL, Type.REACTOR_FINAL);
+        connectionHandler.assertEvents(Type.CONNECTION_INIT);
+    }
+
+    /**
+     * Tests operation of the Reactor.acceptor method by creating an acceptor
      * which is immediately closed by the reactor.  The expected behaviour is for:
      * <ul>
      *   <li>The reactor to end immediately (as it has no more work to process).</li>
@@ -56,9 +105,9 @@ public class ReactorTest {
      * @throws IOException
      */
     @Test
-    public void basicAcceptor() throws IOException {
+    public void acceptor() throws IOException {
         Reactor reactor = Proton.reactor();
-        final Acceptor acceptor = reactor.acceptor("localhost", 0);
+        final Acceptor acceptor = reactor.acceptor("127.0.0.1", 0);
         assertNotNull(acceptor);
         assertTrue("acceptor should be one of the reactor's children", reactor.children().contains(acceptor));
         TestHandler acceptorHandler = new TestHandler();
@@ -73,4 +122,192 @@ public class ReactorTest {
         acceptorHandler.assertEvents(Type.SELECTABLE_INIT, Type.SELECTABLE_UPDATED, Type.SELECTABLE_FINAL);
         assertFalse("acceptor should have been removed from the reactor's children", reactor.children().contains(acceptor));
     }
+
+    private static class ServerHandler extends TestHandler {
+        private Acceptor acceptor;
+        public void setAcceptor(Acceptor acceptor) {
+            this.acceptor = acceptor;
+        }
+        @Override
+        public void onConnectionRemoteOpen(Event event) {
+            super.onConnectionRemoteOpen(event);
+            event.getConnection().open();
+        }
+        @Override
+        public void onConnectionRemoteClose(Event event) {
+            super.onConnectionRemoteClose(event);
+            acceptor.close();
+            event.getConnection().close();
+            event.getConnection().free();
+        }
+    }
+
+    /**
+     * Tests end to end behaviour of the reactor by creating an acceptor and then
+     * a connection (which connects to the port the acceptor is listening on).
+     * As soon as the connection is established, both the acceptor and connection
+     * are closed.  The events generated by the acceptor and the connection are
+     * compared to a set of expected events.
+     * @throws IOException
+     */
+    @Test
+    public void connect() throws IOException {
+        Reactor reactor = Proton.reactor();
+
+        ServerHandler sh = new ServerHandler();
+        Acceptor acceptor = reactor.acceptor("127.0.0.1",  0, sh);
+        final int listeningPort = ((AcceptorImpl)acceptor).getPortNumber();
+        sh.setAcceptor(acceptor);
+
+        class ClientHandler extends TestHandler {
+            @Override
+            public void onConnectionInit(Event event) {
+                super.onConnectionInit(event);
+                event.getConnection().setHostname("127.0.0.1:" + listeningPort);
+                event.getConnection().open();
+            }
+            @Override
+            public void onConnectionRemoteOpen(Event event) {
+                super.onConnectionRemoteOpen(event);
+                event.getConnection().close();
+            }
+            @Override
+            public void onConnectionRemoteClose(Event event) {
+                super.onConnectionRemoteClose(event);
+                event.getConnection().free();
+            }
+        }
+        ClientHandler ch = new ClientHandler();
+        Connection connection = reactor.connection(ch);
+
+        assertTrue("acceptor should be one of the reactor's children", reactor.children().contains(acceptor));
+        assertTrue("connection should be one of the reactor's children", reactor.children().contains(connection));
+
+        reactor.run();
+
+        assertFalse("acceptor should have been removed from the reactor's children", reactor.children().contains(acceptor));
+        assertFalse("connection should have been removed from the reactor's children", reactor.children().contains(connection));
+        sh.assertEvents(Type.CONNECTION_INIT, Type.CONNECTION_BOUND,
+                // XXX: proton-c generates a PN_TRANSPORT event here
+                Type.CONNECTION_REMOTE_OPEN, Type.CONNECTION_LOCAL_OPEN,
+                Type.TRANSPORT, Type.CONNECTION_REMOTE_CLOSE,
+                Type.TRANSPORT_TAIL_CLOSED, Type.CONNECTION_LOCAL_CLOSE,
+                Type.TRANSPORT, Type.TRANSPORT_HEAD_CLOSED,
+                Type.TRANSPORT_CLOSED, Type.CONNECTION_UNBOUND,
+                Type.CONNECTION_FINAL);
+
+        ch.assertEvents(Type.CONNECTION_INIT, Type.CONNECTION_LOCAL_OPEN,
+                Type.CONNECTION_BOUND,
+                // XXX: proton-c generates two PN_TRANSPORT events here
+                Type.CONNECTION_REMOTE_OPEN, Type.CONNECTION_LOCAL_CLOSE,
+                Type.TRANSPORT, Type.TRANSPORT_HEAD_CLOSED,
+                Type.CONNECTION_REMOTE_CLOSE, Type.TRANSPORT_TAIL_CLOSED,
+                Type.TRANSPORT_CLOSED, Type.CONNECTION_UNBOUND,
+                Type.CONNECTION_FINAL);
+
+    }
+
+    private static class SinkHandler extends BaseHandler {
+        protected int received = 0;
+
+        @Override
+        public void onDelivery(Event event) {
+            Delivery dlv = event.getDelivery();
+            if (!dlv.isPartial()) {
+                dlv.settle();
+                ++received;
+            }
+        }
+    }
+
+    private static class SourceHandler extends BaseHandler {
+        private int remaining;
+        private final int port;
+
+        protected SourceHandler(int count, int port) {
+            remaining = count;
+            this.port = port;
+        }
+
+        @Override
+        public void onConnectionInit(Event event) {
+            Connection conn = event.getConnection();
+            conn.setHostname("127.0.0.1:" + port);
+            Session ssn = conn.session();
+            Sender snd = ssn.sender("sender");
+            conn.open();
+            ssn.open();
+            snd.open();
+        }
+
+        @Override
+        public void onLinkFlow(Event event) {
+            Sender link = (Sender)event.getLink();
+            while (link.getCredit() > 0 && remaining > 0) {
+                Delivery dlv = link.delivery(new byte[0]);
+                assertNotNull(dlv);
+                dlv.settle();
+                link.advance();
+                --remaining;
+            }
+
+            if (remaining == 0) {
+                event.getConnection().close();
+            }
+        }
+
+        @Override
+        public void onConnectionRemoteClose(Event event) {
+            event.getConnection().free();
+        }
+    }
+
+    private void transfer(int count, int window) throws IOException {
+        Reactor reactor = Proton.reactor();
+        ServerHandler sh = new ServerHandler();
+        Acceptor acceptor = reactor.acceptor("127.0.0.1", 0, sh);
+        sh.setAcceptor(acceptor);
+        sh.add(new Handshaker());
+        // XXX: a window of 1 doesn't work unless the flowcontroller is
+        // added after the thing that settles the delivery
+        sh.add(new FlowController(window));
+        SinkHandler snk = new SinkHandler();
+        sh.add(snk);
+
+        SourceHandler src = new SourceHandler(count, ((AcceptorImpl)acceptor).getPortNumber());
+        reactor.connection(src);
+
+        reactor.run();
+        assertEquals("Did not receive the expected number of messages", count, snk.received);
+    }
+
+    @Test
+    public void transfer_0to64_2() throws IOException {
+        for (int i = 0; i < 64; ++i) {
+            transfer(i, 2);
+        }
+    }
+
+    @Test
+    public void transfer_1024_64() throws IOException {
+        transfer(1024, 64);
+    }
+
+    @Test
+    public void transfer_4096_1024() throws IOException {
+        transfer(4*1024, 1024);
+    }
+
+    @Test
+    public void schedule() throws IOException {
+        Reactor reactor = Proton.reactor();
+        TestHandler reactorHandler = new TestHandler();
+        reactor.getHandler().add(reactorHandler);
+        TestHandler taskHandler = new TestHandler();
+        reactor.schedule(0, taskHandler);
+        reactor.run();
+        reactorHandler.assertEvents(Type.REACTOR_INIT, Type.SELECTABLE_INIT, Type.SELECTABLE_UPDATED, Type.REACTOR_QUIESCED, Type.SELECTABLE_UPDATED,
+                Type.SELECTABLE_FINAL, Type.REACTOR_FINAL);
+        taskHandler.assertEvents(Type.TIMER_TASK);
+    }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org