You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rh...@apache.org on 2015/07/06 01:45:11 UTC
[10/38] qpid-proton git commit: PROTON-881: Add reactor interop tests
that send messages from proton-c to proton-j
PROTON-881: Add reactor interop tests that send messages from proton-c to proton-j
Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/1eb41f60
Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/1eb41f60
Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/1eb41f60
Branch: refs/heads/master
Commit: 1eb41f603b0a4c5da9c686af1369837e7c6f2184
Parents: 7faa7e2
Author: Adrian Preston <pr...@uk.ibm.com>
Authored: Fri May 1 15:36:00 2015 +0100
Committer: Adrian Preston <pr...@uk.ibm.com>
Committed: Wed May 6 23:24:28 2015 +0100
----------------------------------------------------------------------
.../org/apache/qpid/proton/ProtonJInterop.java | 76 +++++++++++++++++---
tests/python/proton_tests/reactor_interop.py | 76 ++++++++++++++++++--
2 files changed, 138 insertions(+), 14 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/1eb41f60/tests/java/org/apache/qpid/proton/ProtonJInterop.java
----------------------------------------------------------------------
diff --git a/tests/java/org/apache/qpid/proton/ProtonJInterop.java b/tests/java/org/apache/qpid/proton/ProtonJInterop.java
index 678bfd2..31306ef 100644
--- a/tests/java/org/apache/qpid/proton/ProtonJInterop.java
+++ b/tests/java/org/apache/qpid/proton/ProtonJInterop.java
@@ -31,9 +31,12 @@ import org.apache.qpid.proton.engine.Connection;
import org.apache.qpid.proton.engine.Delivery;
import org.apache.qpid.proton.engine.EndpointState;
import org.apache.qpid.proton.engine.Event;
+import org.apache.qpid.proton.engine.Receiver;
import org.apache.qpid.proton.engine.Sender;
import org.apache.qpid.proton.engine.Session;
import org.apache.qpid.proton.message.Message;
+import org.apache.qpid.proton.reactor.Acceptor;
+import org.apache.qpid.proton.reactor.FlowController;
import org.apache.qpid.proton.reactor.Handshaker;
import org.apache.qpid.proton.reactor.Reactor;
@@ -123,18 +126,75 @@ public class ProtonJInterop {
}
}
- private static boolean sendTest(String[] args) throws IOException {
- int port = Integer.valueOf(args[0]);
- int numMsgs = Integer.valueOf(args[1]);
- Send send = new Send("localhost:" + port, numMsgs);
- Reactor r = Proton.reactor(send);
- r.run();
- return send.getResult();
+ private static class Recv extends BaseHandler {
+ private final int port;
+ private final int numMsgs;
+ private int count = 0;
+ private Acceptor acceptor = null;
+
+ private Recv(int port, int numMsgs) {
+ this.port = port;
+ this.numMsgs = numMsgs;
+ add(new Handshaker());
+ add(new FlowController());
+ }
+
+ @Override
+ public void onReactorInit(Event event) {
+ try {
+ acceptor = event.getReactor().acceptor("localhost", port);
+ } catch(IOException ioException) {
+ throw new RuntimeException(ioException);
+ }
+ }
+
+ @Override
+ public void onDelivery(Event event) {
+ Receiver recv = (Receiver)event.getLink();
+ Delivery delivery = recv.current();
+ if (delivery.isReadable() && !delivery.isPartial()) {
+ int size = delivery.pending();
+ byte[] buffer = new byte[size];
+ int read = recv.recv(buffer, 0, buffer.length);
+ recv.advance();
+
+ Message msg = Proton.message();
+ msg.decode(buffer, 0, read);
+
+ ++count;
+ String msgBody = ((AmqpValue)msg.getBody()).getValue().toString();
+ String expected = "message-" + count;
+ if (!expected.equals(msgBody)) {
+ throw new RuntimeException("Received message body '" + msgBody + "', expected: '" + expected + "'");
+ }
+
+ if (count == numMsgs) {
+ recv.close();
+ recv.getSession().close();
+ recv.getSession().getConnection().close();
+ acceptor.close();
+ }
+ }
+ }
}
public static void main(String[] args) throws IOException {
try {
- System.exit(sendTest(args) ? 0 : 1);
+ int port = Integer.valueOf(args[1]);
+ int numMsgs = Integer.valueOf(args[2]);
+ boolean result = false;
+
+ if ("send".equalsIgnoreCase(args[0])) {
+ Send send = new Send("localhost:" + port, numMsgs);
+ Reactor r = Proton.reactor(send);
+ r.run();
+ result = send.getResult();
+ } else {
+ Reactor r = Proton.reactor(new Recv(port, numMsgs));
+ r.run();
+ result = true;
+ }
+ System.exit(result ? 0 : 1);
} catch(Throwable t) {
t.printStackTrace();
System.exit(1);
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/1eb41f60/tests/python/proton_tests/reactor_interop.py
----------------------------------------------------------------------
diff --git a/tests/python/proton_tests/reactor_interop.py b/tests/python/proton_tests/reactor_interop.py
index b2604db..cbb75e6 100644
--- a/tests/python/proton_tests/reactor_interop.py
+++ b/tests/python/proton_tests/reactor_interop.py
@@ -20,16 +20,18 @@
from common import Test, free_tcp_port
from proton import Message
-from proton.reactor import Reactor
from proton.handlers import CHandshaker, CFlowController
+from proton.reactor import Reactor
-import subprocess
import os
+import subprocess
from threading import Thread
+import time
-class JavaSendThread(Thread):
- def __init__(self, port, count):
+class JavaThread(Thread):
+ def __init__(self, operation, port, count):
Thread.__init__(self)
+ self.operation = operation
self.port = str(port)
self.count = str(count)
self.result = 1
@@ -37,7 +39,7 @@ class JavaSendThread(Thread):
def run(self):
self.result = subprocess.call(['java',
'org.apache.qpid.proton.ProtonJInterop',
- self.port, self.count])
+ self.operation, self.port, self.count])
class ReceiveHandler:
def __init__(self, count):
@@ -48,7 +50,7 @@ class ReceiveHandler:
def on_reactor_init(self, event):
port = free_tcp_port()
self.acceptor = event.reactor.acceptor("localhost", port)
- self.java_thread = JavaSendThread(port, self.count)
+ self.java_thread = JavaThread("send", port, self.count)
self.java_thread.start()
def on_delivery(self, event):
@@ -61,6 +63,37 @@ class ReceiveHandler:
if (self.count == 0):
self.acceptor.close()
+class SendHandler:
+ def __init__(self, host, num_msgs):
+ self.host = host
+ self.num_msgs = num_msgs
+ self.count = 0
+ self.handlers = [CHandshaker()]
+
+ def on_connection_init(self, event):
+ conn = event.connection
+ conn.hostname = self.host
+ ssn = conn.session()
+ snd = ssn.sender("sender")
+ conn.open()
+ ssn.open()
+ snd.open()
+
+ def on_link_flow(self, event):
+ snd = event.sender
+ if snd.credit > 0 and self.count < self.num_msgs:
+ self.count += 1
+ msg = Message("message-" + str(self.count))
+ dlv = snd.send(msg)
+ dlv.settle()
+ if (self.count == self.num_msgs):
+ snd.close()
+ snd.session.close()
+ snd.connection.close()
+
+ def on_reactor_init(self, event):
+ event.reactor.connection(self)
+
class ReactorInteropTest(Test):
def setup(self):
@@ -72,6 +105,24 @@ class ReactorInteropTest(Test):
for entry in entries:
self.proton_j_available |= os.path.exists(entry)
+ def protonc_to_protonj(self, count):
+ if (not self.proton_j_available):
+ raise Skip()
+
+ port = free_tcp_port()
+ java_thread = JavaThread("recv", port, count)
+ java_thread.start()
+ # Give the Java thread time to spin up a JVM and start listening
+ # XXX: would be better to parse the stdout output for a message
+ time.sleep(1)
+
+ sh = SendHandler('localhost:' + str(port), count)
+ r = Reactor(sh)
+ r.run()
+
+ java_thread.join()
+ assert(java_thread.result == 0)
+
def protonj_to_protonc(self, count):
if (not self.proton_j_available):
raise Skip()
@@ -86,6 +137,18 @@ class ReactorInteropTest(Test):
for i in range(1, count):
assert(rh.messages[i-1] == ("message-" + str(i)))
+ def test_protonc_to_protonj_1(self):
+ self.protonc_to_protonj(1)
+
+ def test_protonc_to_protonj_5(self):
+ self.protonc_to_protonj(5)
+
+ def test_protonc_to_protonj_500(self):
+ self.protonc_to_protonj(500)
+
+ def test_protonc_to_protonj_5000(self):
+ self.protonc_to_protonj(5000)
+
def test_protonj_to_protonc_1(self):
self.protonj_to_protonc(1)
@@ -97,3 +160,4 @@ class ReactorInteropTest(Test):
def test_protonj_to_protonc_5000(self):
self.protonj_to_protonc(5000)
+
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org