You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rh...@apache.org on 2015/07/06 01:45:09 UTC
[08/38] qpid-proton git commit: PROTON-881: Tidy up proton-j to
proton-c reactor interop tests
PROTON-881: Tidy up proton-j to proton-c reactor interop tests
Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/2e6f5cdd
Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/2e6f5cdd
Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/2e6f5cdd
Branch: refs/heads/master
Commit: 2e6f5cdd1754b266e81afbc49ae4333a75287d57
Parents: b6e18b5
Author: Adrian Preston <pr...@uk.ibm.com>
Authored: Thu Apr 30 13:58:28 2015 +0100
Committer: Adrian Preston <pr...@uk.ibm.com>
Committed: Wed May 6 23:24:11 2015 +0100
----------------------------------------------------------------------
.../qpid/proton/example/reactor/Send.java | 2 +-
.../org/apache/qpid/proton/ProtonJInterop.java | 78 ++++++++++----------
tests/python/proton_tests/reactor_interop.py | 57 ++++++++++----
3 files changed, 81 insertions(+), 56 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/2e6f5cdd/examples/java/reactor/src/main/java/org/apache/qpid/proton/example/reactor/Send.java
----------------------------------------------------------------------
diff --git a/examples/java/reactor/src/main/java/org/apache/qpid/proton/example/reactor/Send.java b/examples/java/reactor/src/main/java/org/apache/qpid/proton/example/reactor/Send.java
index 5cd5811..22da720 100644
--- a/examples/java/reactor/src/main/java/org/apache/qpid/proton/example/reactor/Send.java
+++ b/examples/java/reactor/src/main/java/org/apache/qpid/proton/example/reactor/Send.java
@@ -78,7 +78,7 @@ public class Send extends BaseHandler {
@Override
public void onLinkFlow(Event event) {
Sender snd = (Sender)event.getLink();
- if (snd.getCredit() > 0 && message != null) {
+ if (snd.getCredit() > 0) {
byte[] msgData = new byte[1024];
int length;
while(true) {
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/2e6f5cdd/tests/java/org/apache/qpid/proton/ProtonJInterop.java
----------------------------------------------------------------------
diff --git a/tests/java/org/apache/qpid/proton/ProtonJInterop.java b/tests/java/org/apache/qpid/proton/ProtonJInterop.java
index 8b49508..678bfd2 100644
--- a/tests/java/org/apache/qpid/proton/ProtonJInterop.java
+++ b/tests/java/org/apache/qpid/proton/ProtonJInterop.java
@@ -29,6 +29,7 @@ import org.apache.qpid.proton.amqp.transport.ErrorCondition;
import org.apache.qpid.proton.engine.BaseHandler;
import org.apache.qpid.proton.engine.Connection;
import org.apache.qpid.proton.engine.Delivery;
+import org.apache.qpid.proton.engine.EndpointState;
import org.apache.qpid.proton.engine.Event;
import org.apache.qpid.proton.engine.Sender;
import org.apache.qpid.proton.engine.Session;
@@ -36,22 +37,18 @@ import org.apache.qpid.proton.message.Message;
import org.apache.qpid.proton.reactor.Handshaker;
import org.apache.qpid.proton.reactor.Reactor;
-public class ProtonJInterop { // TODO: this doesn't return a useful RC
-
+public class ProtonJInterop {
private static class SendHandler extends BaseHandler {
private final String hostname;
- private final Message message;
- private int nextTag = 0;
- private int result = 1;
+ private int numMsgs;
+ private int count = 0;
+ private boolean result = false;
- private SendHandler(String hostname, Message message) {
+ private SendHandler(String hostname, int numMsgs) {
this.hostname = hostname;
- this.message = message;
-
- // Add a child handler that performs some default handshaking
- // behaviour.
+ this.numMsgs = numMsgs;
add(new Handshaker());
}
@@ -59,16 +56,7 @@ public class ProtonJInterop { // TODO: this doesn't return a useful RC
public void onConnectionInit(Event event) {
Connection conn = event.getConnection();
conn.setHostname(hostname);
-
- // Every session or link could have their own handler(s) if we
- // wanted simply by adding the handler to the given session
- // or link
Session ssn = conn.session();
-
- // If a link doesn't have an event handler, the events go to
- // its parent session. If the session doesn't have a handler
- // the events go to its parent connection. If the connection
- // doesn't have a handler, the events go to the reactor.
Sender snd = ssn.sender("sender");
conn.open();
ssn.open();
@@ -78,7 +66,10 @@ public class ProtonJInterop { // TODO: this doesn't return a useful RC
@Override
public void onLinkFlow(Event event) {
Sender snd = (Sender)event.getLink();
- if (snd.getCredit() > 0 && message != null) {
+ if (snd.getCredit() > 0 && snd.getLocalState() != EndpointState.CLOSED) {
+ Message message = Proton.message();
+ ++count;
+ message.setBody(new AmqpValue("message-"+count));
byte[] msgData = new byte[1024];
int length;
while(true) {
@@ -89,20 +80,23 @@ public class ProtonJInterop { // TODO: this doesn't return a useful RC
msgData = new byte[msgData.length * 2];
}
}
- byte[] tag = String.valueOf(nextTag++).getBytes();
+ byte[] tag = String.valueOf(count).getBytes();
Delivery dlv = snd.delivery(tag);
snd.send(msgData, 0, length);
dlv.settle();
snd.advance();
- snd.close();
- snd.getSession().close();
- snd.getSession().getConnection().close();
- result = 0;
+ if (count == numMsgs) {
+ snd.close();
+ snd.getSession().close();
+ snd.getSession().getConnection().close();
+ result = true;
+ }
}
}
@Override
public void onTransportError(Event event) {
+ result = false;
ErrorCondition condition = event.getTransport().getCondition();
if (condition != null) {
System.err.println("Error: " + condition.getDescription());
@@ -113,33 +107,37 @@ public class ProtonJInterop { // TODO: this doesn't return a useful RC
}
private static class Send extends BaseHandler {
- private final String hostname;
- private final Message message;
+ private final SendHandler sendHandler;
- private Send(String hostname, String content) {
- this.hostname = hostname;
- message = Proton.message();
- message.setBody(new AmqpValue(content));
+ private Send(String hostname, int numMsgs) {
+ sendHandler = new SendHandler(hostname, numMsgs);
}
@Override
public void onReactorInit(Event event) {
- // You can use the connection method to create AMQP connections.
+ event.getReactor().connection(sendHandler);
+ }
- // This connection's handler is the SendHandler object. All the events
- // for this connection will go to the SendHandler object instead of
- // going to the reactor. If you were to omit the SendHandler object,
- // all the events would go to the reactor.
- event.getReactor().connection(new SendHandler(hostname, message));
+ public boolean getResult() {
+ return sendHandler.result;
}
}
- private static void sendTest() throws IOException {
- Reactor r = Proton.reactor(new Send("localhost:56789", "test1"));
+ private static boolean sendTest(String[] args) throws IOException {
+ int port = Integer.valueOf(args[0]);
+ int numMsgs = Integer.valueOf(args[1]);
+ Send send = new Send("localhost:" + port, numMsgs);
+ Reactor r = Proton.reactor(send);
r.run();
+ return send.getResult();
}
public static void main(String[] args) throws IOException {
- sendTest();
+ try {
+ System.exit(sendTest(args) ? 0 : 1);
+ } catch(Throwable t) {
+ t.printStackTrace();
+ System.exit(1);
+ }
}
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/2e6f5cdd/tests/python/proton_tests/reactor_interop.py
----------------------------------------------------------------------
diff --git a/tests/python/proton_tests/reactor_interop.py b/tests/python/proton_tests/reactor_interop.py
index 1fdfcd7..9c58d03 100644
--- a/tests/python/proton_tests/reactor_interop.py
+++ b/tests/python/proton_tests/reactor_interop.py
@@ -18,7 +18,7 @@
# under the License.
#
-from common import Test
+from common import Test, free_tcp_port
from proton import Message
from proton.reactor import Reactor
from proton.handlers import CHandshaker, CFlowController
@@ -28,33 +28,60 @@ import os
from threading import Thread
class JavaSendThread(Thread):
- def __init__(self):
+ def __init__(self, port, count):
Thread.__init__(self)
+ self.port = str(port)
+ self.count = str(count)
+ self.result = 1
def run(self):
- subprocess.check_output(['java', 'org.apache.qpid.proton.ProtonJInterop'])
+ self.result = subprocess.call(['java',
+ 'org.apache.qpid.proton.ProtonJInterop',
+ self.port, self.count])
-
-class Receive:
- def __init__(self):
+class ReceiveHandler:
+ def __init__(self, count):
+ self.count = count
self.handlers = [CHandshaker(), CFlowController()]
- self.message = Message()
+ self.messages = []
def on_reactor_init(self, event):
- self.acceptor = event.reactor.acceptor("localhost", 56789)
- JavaSendThread().start()
+ port = free_tcp_port()
+ self.acceptor = event.reactor.acceptor("localhost", port)
+ self.java_thread = JavaSendThread(port, self.count)
+ self.java_thread.start()
def on_delivery(self, event):
rcv = event.receiver
- if rcv and self.message.recv(rcv):
+ msg = Message()
+ if rcv and msg.recv(rcv):
event.delivery.settle()
- self.acceptor.close()
+ self.messages += [msg.body]
+ self.count -= 1
+ if (self.count == 0):
+ self.acceptor.close()
class ReactorInteropTest(Test):
- def test_protonj_to_protonc(self):
- rcv = Receive()
- r = Reactor(rcv)
+ def protonj_to_protonc(self, count):
+ rh = ReceiveHandler(count)
+ r = Reactor(rh)
r.run()
- assert(rcv.message.body == "test1")
+ rh.java_thread.join()
+ assert(rh.java_thread.result == 0)
+
+ for i in range(1, count):
+ assert(rh.messages[i-1] == ("message-" + str(i)))
+
+ def test_protonj_to_protonc_1(self):
+ self.protonj_to_protonc(1)
+
+ def test_protonj_to_protonc_5(self):
+ self.protonj_to_protonc(5)
+
+ def test_protonj_to_protonc_500(self):
+ self.protonj_to_protonc(500)
+
+ def test_protonj_to_protonc_5000(self):
+ self.protonj_to_protonc(5000)
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org