You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rh...@apache.org on 2015/07/06 01:45:11 UTC

[10/38] qpid-proton git commit: PROTON-881: Add reactor interop tests that send messages from proton-c to proton-j

PROTON-881: Add reactor interop tests that send messages from proton-c to proton-j


Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/1eb41f60
Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/1eb41f60
Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/1eb41f60

Branch: refs/heads/master
Commit: 1eb41f603b0a4c5da9c686af1369837e7c6f2184
Parents: 7faa7e2
Author: Adrian Preston <pr...@uk.ibm.com>
Authored: Fri May 1 15:36:00 2015 +0100
Committer: Adrian Preston <pr...@uk.ibm.com>
Committed: Wed May 6 23:24:28 2015 +0100

----------------------------------------------------------------------
 .../org/apache/qpid/proton/ProtonJInterop.java  | 76 +++++++++++++++++---
 tests/python/proton_tests/reactor_interop.py    | 76 ++++++++++++++++++--
 2 files changed, 138 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/1eb41f60/tests/java/org/apache/qpid/proton/ProtonJInterop.java
----------------------------------------------------------------------
diff --git a/tests/java/org/apache/qpid/proton/ProtonJInterop.java b/tests/java/org/apache/qpid/proton/ProtonJInterop.java
index 678bfd2..31306ef 100644
--- a/tests/java/org/apache/qpid/proton/ProtonJInterop.java
+++ b/tests/java/org/apache/qpid/proton/ProtonJInterop.java
@@ -31,9 +31,12 @@ import org.apache.qpid.proton.engine.Connection;
 import org.apache.qpid.proton.engine.Delivery;
 import org.apache.qpid.proton.engine.EndpointState;
 import org.apache.qpid.proton.engine.Event;
+import org.apache.qpid.proton.engine.Receiver;
 import org.apache.qpid.proton.engine.Sender;
 import org.apache.qpid.proton.engine.Session;
 import org.apache.qpid.proton.message.Message;
+import org.apache.qpid.proton.reactor.Acceptor;
+import org.apache.qpid.proton.reactor.FlowController;
 import org.apache.qpid.proton.reactor.Handshaker;
 import org.apache.qpid.proton.reactor.Reactor;
 
@@ -123,18 +126,75 @@ public class ProtonJInterop {
         }
     }
 
-    private static boolean sendTest(String[] args) throws IOException {
-        int port = Integer.valueOf(args[0]);
-        int numMsgs = Integer.valueOf(args[1]);
-        Send send = new Send("localhost:" + port, numMsgs);
-        Reactor r = Proton.reactor(send);
-        r.run();
-        return send.getResult();
+    private static class Recv extends BaseHandler {
+        private final int port;
+        private final int numMsgs;
+        private int count = 0;
+        private Acceptor acceptor = null;
+
+        private Recv(int port, int numMsgs) {
+            this.port = port;
+            this.numMsgs = numMsgs;
+            add(new Handshaker());
+            add(new FlowController());
+        }
+
+        @Override
+        public void onReactorInit(Event event) {
+            try {
+                acceptor = event.getReactor().acceptor("localhost", port);
+            } catch(IOException ioException) {
+                throw new RuntimeException(ioException);
+            }
+        }
+
+        @Override
+        public void onDelivery(Event event) {
+            Receiver recv = (Receiver)event.getLink();
+            Delivery delivery = recv.current();
+            if (delivery.isReadable() && !delivery.isPartial()) {
+                int size = delivery.pending();
+                byte[] buffer = new byte[size];
+                int read = recv.recv(buffer, 0, buffer.length);
+                recv.advance();
+
+                Message msg = Proton.message();
+                msg.decode(buffer, 0, read);
+
+                ++count;
+                String msgBody = ((AmqpValue)msg.getBody()).getValue().toString();
+                String expected = "message-" + count;
+                if (!expected.equals(msgBody)) {
+                    throw new RuntimeException("Received message body '" + msgBody + "', expected: '" + expected + "'");
+                }
+
+                if (count == numMsgs) {
+                    recv.close();
+                    recv.getSession().close();
+                    recv.getSession().getConnection().close();
+                    acceptor.close();
+                }
+            }
+        }
     }
 
     public static void main(String[] args) throws IOException {
         try {
-            System.exit(sendTest(args) ? 0 : 1);
+            int port = Integer.valueOf(args[1]);
+            int numMsgs = Integer.valueOf(args[2]);
+            boolean result = false;
+
+            if ("send".equalsIgnoreCase(args[0])) {
+                Send send = new Send("localhost:" + port, numMsgs);
+                Reactor r = Proton.reactor(send);
+                r.run();
+                result = send.getResult();
+            } else {
+                Reactor r = Proton.reactor(new Recv(port, numMsgs));
+                r.run();
+                result = true;
+            }
+            System.exit(result ? 0 : 1);
         } catch(Throwable t) {
             t.printStackTrace();
             System.exit(1);

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/1eb41f60/tests/python/proton_tests/reactor_interop.py
----------------------------------------------------------------------
diff --git a/tests/python/proton_tests/reactor_interop.py b/tests/python/proton_tests/reactor_interop.py
index b2604db..cbb75e6 100644
--- a/tests/python/proton_tests/reactor_interop.py
+++ b/tests/python/proton_tests/reactor_interop.py
@@ -20,16 +20,18 @@
 
 from common import Test, free_tcp_port
 from proton import Message
-from proton.reactor import Reactor
 from proton.handlers import CHandshaker, CFlowController
+from proton.reactor import Reactor
 
-import subprocess
 import os
+import subprocess
 from threading import Thread
+import time
 
-class JavaSendThread(Thread):
-  def __init__(self, port, count):
+class JavaThread(Thread):
+  def __init__(self, operation, port, count):
     Thread.__init__(self)
+    self.operation = operation
     self.port = str(port)
     self.count = str(count)
     self.result = 1
@@ -37,7 +39,7 @@ class JavaSendThread(Thread):
   def run(self):
     self.result = subprocess.call(['java',
         'org.apache.qpid.proton.ProtonJInterop',
-        self.port, self.count])
+        self.operation, self.port, self.count])
 
 class ReceiveHandler:
   def __init__(self, count):
@@ -48,7 +50,7 @@ class ReceiveHandler:
   def on_reactor_init(self, event):
     port = free_tcp_port()
     self.acceptor = event.reactor.acceptor("localhost", port)
-    self.java_thread = JavaSendThread(port, self.count)
+    self.java_thread = JavaThread("send", port, self.count)
     self.java_thread.start()
 
   def on_delivery(self, event):
@@ -61,6 +63,37 @@ class ReceiveHandler:
       if (self.count == 0):
         self.acceptor.close()
 
+class SendHandler:
+  def __init__(self, host, num_msgs):
+    self.host = host
+    self.num_msgs = num_msgs
+    self.count = 0
+    self.handlers = [CHandshaker()]
+
+  def on_connection_init(self, event):
+    conn = event.connection
+    conn.hostname = self.host
+    ssn = conn.session()
+    snd = ssn.sender("sender")
+    conn.open()
+    ssn.open()
+    snd.open()
+
+  def on_link_flow(self, event):
+    snd = event.sender
+    if snd.credit > 0 and self.count < self.num_msgs:
+      self.count += 1
+      msg = Message("message-" + str(self.count))
+      dlv = snd.send(msg)
+      dlv.settle()
+      if (self.count == self.num_msgs):
+        snd.close()
+        snd.session.close()
+        snd.connection.close()
+
+  def on_reactor_init(self, event):
+    event.reactor.connection(self)
+
 class ReactorInteropTest(Test):
 
   def setup(self):
@@ -72,6 +105,24 @@ class ReactorInteropTest(Test):
     for entry in entries:
       self.proton_j_available |= os.path.exists(entry)
 
+  def protonc_to_protonj(self, count):
+    if (not self.proton_j_available):
+      raise Skip()
+
+    port = free_tcp_port()
+    java_thread = JavaThread("recv", port, count)
+    java_thread.start()
+    # Give the Java thread time to spin up a JVM and start listening
+    # XXX: would be better to parse the stdout output for a message
+    time.sleep(1)
+
+    sh = SendHandler('localhost:' + str(port), count)
+    r = Reactor(sh)
+    r.run()
+
+    java_thread.join()
+    assert(java_thread.result == 0)
+
   def protonj_to_protonc(self, count):
     if (not self.proton_j_available):
       raise Skip()
@@ -86,6 +137,18 @@ class ReactorInteropTest(Test):
     for i in range(1, count):
       assert(rh.messages[i-1] == ("message-" + str(i)))
 
+  def test_protonc_to_protonj_1(self):
+    self.protonc_to_protonj(1)
+
+  def test_protonc_to_protonj_5(self):
+    self.protonc_to_protonj(5)
+
+  def test_protonc_to_protonj_500(self):
+    self.protonc_to_protonj(500)
+
+  def test_protonc_to_protonj_5000(self):
+    self.protonc_to_protonj(5000)
+
   def test_protonj_to_protonc_1(self):
     self.protonj_to_protonc(1)
 
@@ -97,3 +160,4 @@ class ReactorInteropTest(Test):
 
   def test_protonj_to_protonc_5000(self):
     self.protonj_to_protonc(5000)
+


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org