You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rh...@apache.org on 2015/07/06 01:45:09 UTC

[08/38] qpid-proton git commit: PROTON-881: Tidy up proton-j to proton-c reactor interop tests

PROTON-881: Tidy up proton-j to proton-c reactor interop tests


Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/2e6f5cdd
Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/2e6f5cdd
Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/2e6f5cdd

Branch: refs/heads/master
Commit: 2e6f5cdd1754b266e81afbc49ae4333a75287d57
Parents: b6e18b5
Author: Adrian Preston <pr...@uk.ibm.com>
Authored: Thu Apr 30 13:58:28 2015 +0100
Committer: Adrian Preston <pr...@uk.ibm.com>
Committed: Wed May 6 23:24:11 2015 +0100

----------------------------------------------------------------------
 .../qpid/proton/example/reactor/Send.java       |  2 +-
 .../org/apache/qpid/proton/ProtonJInterop.java  | 78 ++++++++++----------
 tests/python/proton_tests/reactor_interop.py    | 57 ++++++++++----
 3 files changed, 81 insertions(+), 56 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/2e6f5cdd/examples/java/reactor/src/main/java/org/apache/qpid/proton/example/reactor/Send.java
----------------------------------------------------------------------
diff --git a/examples/java/reactor/src/main/java/org/apache/qpid/proton/example/reactor/Send.java b/examples/java/reactor/src/main/java/org/apache/qpid/proton/example/reactor/Send.java
index 5cd5811..22da720 100644
--- a/examples/java/reactor/src/main/java/org/apache/qpid/proton/example/reactor/Send.java
+++ b/examples/java/reactor/src/main/java/org/apache/qpid/proton/example/reactor/Send.java
@@ -78,7 +78,7 @@ public class Send extends BaseHandler {
         @Override
         public void onLinkFlow(Event event) {
             Sender snd = (Sender)event.getLink();
-            if (snd.getCredit() > 0 && message != null) {
+            if (snd.getCredit() > 0) {
                 byte[] msgData = new byte[1024];
                 int length;
                 while(true) {

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/2e6f5cdd/tests/java/org/apache/qpid/proton/ProtonJInterop.java
----------------------------------------------------------------------
diff --git a/tests/java/org/apache/qpid/proton/ProtonJInterop.java b/tests/java/org/apache/qpid/proton/ProtonJInterop.java
index 8b49508..678bfd2 100644
--- a/tests/java/org/apache/qpid/proton/ProtonJInterop.java
+++ b/tests/java/org/apache/qpid/proton/ProtonJInterop.java
@@ -29,6 +29,7 @@ import org.apache.qpid.proton.amqp.transport.ErrorCondition;
 import org.apache.qpid.proton.engine.BaseHandler;
 import org.apache.qpid.proton.engine.Connection;
 import org.apache.qpid.proton.engine.Delivery;
+import org.apache.qpid.proton.engine.EndpointState;
 import org.apache.qpid.proton.engine.Event;
 import org.apache.qpid.proton.engine.Sender;
 import org.apache.qpid.proton.engine.Session;
@@ -36,22 +37,18 @@ import org.apache.qpid.proton.message.Message;
 import org.apache.qpid.proton.reactor.Handshaker;
 import org.apache.qpid.proton.reactor.Reactor;
 
-public class ProtonJInterop {   // TODO: this doesn't return a useful RC
-
+public class ProtonJInterop {
 
     private static class SendHandler extends BaseHandler {
 
         private final String hostname;
-        private final Message message;
-        private int nextTag = 0;
-        private int result = 1;
+        private int numMsgs;
+        private int count = 0;
+        private boolean result = false;
 
-        private SendHandler(String hostname, Message message) {
+        private SendHandler(String hostname, int numMsgs) {
             this.hostname = hostname;
-            this.message = message;
-
-            // Add a child handler that performs some default handshaking
-            // behaviour.
+            this.numMsgs = numMsgs;
             add(new Handshaker());
         }
 
@@ -59,16 +56,7 @@ public class ProtonJInterop {   // TODO: this doesn't return a useful RC
         public void onConnectionInit(Event event) {
             Connection conn = event.getConnection();
             conn.setHostname(hostname);
-
-            // Every session or link could have their own handler(s) if we
-            // wanted simply by adding the handler to the given session
-            // or link
             Session ssn = conn.session();
-
-            // If a link doesn't have an event handler, the events go to
-            // its parent session. If the session doesn't have a handler
-            // the events go to its parent connection. If the connection
-            // doesn't have a handler, the events go to the reactor.
             Sender snd = ssn.sender("sender");
             conn.open();
             ssn.open();
@@ -78,7 +66,10 @@ public class ProtonJInterop {   // TODO: this doesn't return a useful RC
         @Override
         public void onLinkFlow(Event event) {
             Sender snd = (Sender)event.getLink();
-            if (snd.getCredit() > 0 && message != null) {
+            if (snd.getCredit() > 0 && snd.getLocalState() != EndpointState.CLOSED) {
+                Message message = Proton.message();
+                ++count;
+                message.setBody(new AmqpValue("message-"+count));
                 byte[] msgData = new byte[1024];
                 int length;
                 while(true) {
@@ -89,20 +80,23 @@ public class ProtonJInterop {   // TODO: this doesn't return a useful RC
                         msgData = new byte[msgData.length * 2];
                     }
                 }
-                byte[] tag = String.valueOf(nextTag++).getBytes();
+                byte[] tag = String.valueOf(count).getBytes();
                 Delivery dlv = snd.delivery(tag);
                 snd.send(msgData, 0, length);
                 dlv.settle();
                 snd.advance();
-                snd.close();
-                snd.getSession().close();
-                snd.getSession().getConnection().close();
-                result = 0;
+                if (count == numMsgs) {
+                    snd.close();
+                    snd.getSession().close();
+                    snd.getSession().getConnection().close();
+                    result = true;
+                }
             }
         }
 
         @Override
         public void onTransportError(Event event) {
+            result = false;
             ErrorCondition condition = event.getTransport().getCondition();
             if (condition != null) {
                 System.err.println("Error: " + condition.getDescription());
@@ -113,33 +107,37 @@ public class ProtonJInterop {   // TODO: this doesn't return a useful RC
     }
 
     private static class Send extends BaseHandler {
-        private final String hostname;
-        private final Message message;
+        private final SendHandler sendHandler;
 
-        private Send(String hostname, String content) {
-            this.hostname = hostname;
-            message = Proton.message();
-            message.setBody(new AmqpValue(content));
+        private Send(String hostname, int numMsgs) {
+            sendHandler = new SendHandler(hostname, numMsgs);
         }
 
         @Override
         public void onReactorInit(Event event) {
-            // You can use the connection method to create AMQP connections.
+            event.getReactor().connection(sendHandler);
+        }
 
-            // This connection's handler is the SendHandler object. All the events
-            // for this connection will go to the SendHandler object instead of
-            // going to the reactor. If you were to omit the SendHandler object,
-            // all the events would go to the reactor.
-            event.getReactor().connection(new SendHandler(hostname, message));
+        public boolean getResult() {
+            return sendHandler.result;
         }
     }
 
-    private static void sendTest() throws IOException {
-        Reactor r = Proton.reactor(new Send("localhost:56789", "test1"));
+    private static boolean sendTest(String[] args) throws IOException {
+        int port = Integer.valueOf(args[0]);
+        int numMsgs = Integer.valueOf(args[1]);
+        Send send = new Send("localhost:" + port, numMsgs);
+        Reactor r = Proton.reactor(send);
         r.run();
+        return send.getResult();
     }
 
     public static void main(String[] args) throws IOException {
-        sendTest();
+        try {
+            System.exit(sendTest(args) ? 0 : 1);
+        } catch(Throwable t) {
+            t.printStackTrace();
+            System.exit(1);
+        }
     }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/2e6f5cdd/tests/python/proton_tests/reactor_interop.py
----------------------------------------------------------------------
diff --git a/tests/python/proton_tests/reactor_interop.py b/tests/python/proton_tests/reactor_interop.py
index 1fdfcd7..9c58d03 100644
--- a/tests/python/proton_tests/reactor_interop.py
+++ b/tests/python/proton_tests/reactor_interop.py
@@ -18,7 +18,7 @@
 # under the License.
 #
 
-from common import Test
+from common import Test, free_tcp_port
 from proton import Message
 from proton.reactor import Reactor
 from proton.handlers import CHandshaker, CFlowController
@@ -28,33 +28,60 @@ import os
 from threading import Thread
 
 class JavaSendThread(Thread):
-  def __init__(self):
+  def __init__(self, port, count):
     Thread.__init__(self)
+    self.port = str(port)
+    self.count = str(count)
+    self.result = 1
 
   def run(self):
-    subprocess.check_output(['java', 'org.apache.qpid.proton.ProtonJInterop'])
+    self.result = subprocess.call(['java',
+        'org.apache.qpid.proton.ProtonJInterop',
+        self.port, self.count])
 
-
-class Receive:
-  def __init__(self):
+class ReceiveHandler:
+  def __init__(self, count):
+    self.count = count
     self.handlers = [CHandshaker(), CFlowController()]
-    self.message = Message()
+    self.messages = []
 
   def on_reactor_init(self, event):
-    self.acceptor = event.reactor.acceptor("localhost", 56789)
-    JavaSendThread().start()
+    port = free_tcp_port()
+    self.acceptor = event.reactor.acceptor("localhost", port)
+    self.java_thread = JavaSendThread(port, self.count)
+    self.java_thread.start()
 
   def on_delivery(self, event):
     rcv = event.receiver
-    if rcv and self.message.recv(rcv):
+    msg = Message()
+    if rcv and msg.recv(rcv):
       event.delivery.settle()
-      self.acceptor.close()
+      self.messages += [msg.body]
+      self.count -= 1
+      if (self.count == 0):
+        self.acceptor.close()
 
 class ReactorInteropTest(Test):
 
-  def test_protonj_to_protonc(self):
-    rcv = Receive()
-    r = Reactor(rcv)
+  def protonj_to_protonc(self, count):
+    rh = ReceiveHandler(count)
+    r = Reactor(rh)
     r.run()
-    assert(rcv.message.body == "test1")
 
+    rh.java_thread.join()
+    assert(rh.java_thread.result == 0)
+
+    for i in range(1, count):
+      assert(rh.messages[i-1] == ("message-" + str(i)))
+
+  def test_protonj_to_protonc_1(self):
+    self.protonj_to_protonc(1)
+
+  def test_protonj_to_protonc_5(self):
+    self.protonj_to_protonc(5)
+
+  def test_protonj_to_protonc_500(self):
+    self.protonj_to_protonc(500)
+
+  def test_protonj_to_protonc_5000(self):
+    self.protonj_to_protonc(5000)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org