You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by kg...@apache.org on 2013/02/20 16:22:08 UTC
svn commit: r1448216 - in /qpid/proton/trunk:
proton-c/bindings/python/proton.py proton-c/include/proton/cproton.i
proton-c/include/proton/messenger.h proton-c/src/messenger.c
tests/python/proton_tests/messenger.py
Author: kgiusti
Date: Wed Feb 20 15:22:08 2013
New Revision: 1448216
URL: http://svn.apache.org/r1448216
Log:
PROTON-200: allow recv(-1) to grant unlimited credit
Modified:
qpid/proton/trunk/proton-c/bindings/python/proton.py
qpid/proton/trunk/proton-c/include/proton/cproton.i
qpid/proton/trunk/proton-c/include/proton/messenger.h
qpid/proton/trunk/proton-c/src/messenger.c
qpid/proton/trunk/tests/python/proton_tests/messenger.py
Modified: qpid/proton/trunk/proton-c/bindings/python/proton.py
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/bindings/python/proton.py?rev=1448216&r1=1448215&r2=1448216&view=diff
==============================================================================
--- qpid/proton/trunk/proton-c/bindings/python/proton.py (original)
+++ qpid/proton/trunk/proton-c/bindings/python/proton.py Wed Feb 20 15:22:08 2013
@@ -366,12 +366,14 @@ send. Defaults to zero.
"""
self._check(pn_messenger_send(self._mng))
- def recv(self, n):
+ def recv(self, n=None):
"""
Receives up to I{n} messages into the incoming queue of the
L{Messenger}. This method will block until at least one message is
available or the operation times out.
"""
+ if n is None:
+ n = -1
self._check(pn_messenger_recv(self._mng, n))
def get(self, message=None):
Modified: qpid/proton/trunk/proton-c/include/proton/cproton.i
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/include/proton/cproton.i?rev=1448216&r1=1448215&r2=1448216&view=diff
==============================================================================
--- qpid/proton/trunk/proton-c/include/proton/cproton.i (original)
+++ qpid/proton/trunk/proton-c/include/proton/cproton.i Wed Feb 20 15:22:08 2013
@@ -1284,7 +1284,6 @@ typedef long long int int64_t;
{
require:
messenger != NULL;
- n >= 0;
}
%contract pn_messenger_get(pn_messenger_t *messenger, pn_message_t *msg)
Modified: qpid/proton/trunk/proton-c/include/proton/messenger.h
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/include/proton/messenger.h?rev=1448216&r1=1448215&r2=1448216&view=diff
==============================================================================
--- qpid/proton/trunk/proton-c/include/proton/messenger.h (original)
+++ qpid/proton/trunk/proton-c/include/proton/messenger.h Wed Feb 20 15:22:08 2013
@@ -334,7 +334,8 @@ PN_EXTERN int pn_messenger_send(pn_messe
* incoming queue.
*
* @param[in] messenger the messenger
- * @param[in] n the maximum number of messages to receive
+ * @param[in] n the maximum number of messages to receive, if -1
+ * allow at least one message from each peer.
*
* @return an error code or zero on success
* @see error.h
Modified: qpid/proton/trunk/proton-c/src/messenger.c
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/src/messenger.c?rev=1448216&r1=1448215&r2=1448216&view=diff
==============================================================================
--- qpid/proton/trunk/proton-c/src/messenger.c (original)
+++ qpid/proton/trunk/proton-c/src/messenger.c Wed Feb 20 15:22:08 2013
@@ -48,6 +48,7 @@ struct pn_messenger_t {
char *trusted_certificates;
int timeout;
pn_driver_t *driver;
+ bool unlimited_credit;
int credit;
int distributed;
uint64_t next_tag;
@@ -262,6 +263,7 @@ pn_messenger_t *pn_messenger(const char
m->trusted_certificates = NULL;
m->timeout = -1;
m->driver = pn_driver();
+ m->unlimited_credit = false;
m->credit = 0;
m->distributed = 0;
m->next_tag = 0;
@@ -385,25 +387,33 @@ const char *pn_messenger_error(pn_messen
void pn_messenger_flow(pn_messenger_t *messenger)
{
- while (messenger->credit > 0) {
+ while (messenger->credit > 0 || messenger->unlimited_credit) {
int prev = messenger->credit;
pn_connector_t *ctor = pn_connector_head(messenger->driver);
while (ctor) {
pn_connection_t *conn = pn_connector_connection(ctor);
pn_link_t *link = pn_link_head(conn, PN_LOCAL_ACTIVE);
- while (link && messenger->credit > 0) {
+ while (link) {
if (pn_link_is_receiver(link)) {
- pn_link_flow(link, 1);
- messenger->credit--;
- messenger->distributed++;
+ if (messenger->unlimited_credit) {
+ if (!pn_link_credit(link)) {
+ pn_link_flow(link, 1);
+ messenger->distributed++;
+ }
+ } else {
+ pn_link_flow(link, 1);
+ messenger->credit--;
+ messenger->distributed++;
+ if (messenger->credit == 0) break;
+ }
}
link = pn_link_next(link, PN_LOCAL_ACTIVE);
}
ctor = pn_connector_next(ctor);
}
- if (messenger->credit == prev) break;
+ if (messenger->unlimited_credit || messenger->credit == prev) break;
}
}
@@ -1115,8 +1125,13 @@ int pn_messenger_recv(pn_messenger_t *me
if (!pn_listener_head(messenger->driver) && !pn_connector_head(messenger->driver))
return pn_error_format(messenger->error, PN_STATE_ERR, "no valid sources");
int total = messenger->credit + messenger->distributed;
- if (n > total)
- messenger->credit += (n - total);
+ if (n == -1) {
+ messenger->unlimited_credit = true;
+ } else {
+ messenger->unlimited_credit = false;
+ if (n > total)
+ messenger->credit += (n - total);
+ }
pn_messenger_flow(messenger);
return pn_messenger_sync(messenger, pn_messenger_rcvd);
}
Modified: qpid/proton/trunk/tests/python/proton_tests/messenger.py
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/tests/python/proton_tests/messenger.py?rev=1448216&r1=1448215&r2=1448216&view=diff
==============================================================================
--- qpid/proton/trunk/tests/python/proton_tests/messenger.py (original)
+++ qpid/proton/trunk/tests/python/proton_tests/messenger.py Wed Feb 20 15:22:08 2013
@@ -25,7 +25,8 @@ from time import sleep, time
class Test(common.Test):
def setup(self):
- self.server_received = 0;
+ self.server_credit = -1
+ self.server_received = 0
self.server = Messenger("server")
self.server.timeout=10000
self.server.start()
@@ -61,7 +62,7 @@ class MessengerTest(Test):
msg = Message()
try:
while self.running:
- self.server.recv(10)
+ self.server.recv(self.server_credit)
self.process_incoming(msg)
except Timeout:
print "server timed out"
@@ -209,6 +210,7 @@ class MessengerTest(Test):
assert self.client.status(t) is ACCEPTED, (t, self.client.status(t))
def testRejectIndividual(self):
+ self.server_credit = 10
self.testReject(self.reject_individual)
def reject_individual(self, msg):
@@ -269,3 +271,43 @@ class MessengerTest(Test):
assert time() < deadline, "Server did not receive message!"
sleep(.1)
assert self.server_received == 1
+
+ def testUnlimitedCredit(self):
+ """ Bring up two links. Verify credit is granted to each link by
+ transferring a message over each.
+ """
+ self.start()
+
+ msg = Message()
+ msg.address="amqp://0.0.0.0:12345/XXX"
+ msg.subject="Hello World!"
+ body = "First the world, then the galaxy!"
+ msg.load(body)
+ self.client.put(msg)
+ self.client.send()
+
+ reply = Message()
+ self.client.recv(1)
+ assert self.client.incoming == 1
+ self.client.get(reply)
+
+ assert reply.subject == "Hello World!"
+ rbod = reply.save()
+ assert rbod == body, (rbod, body)
+
+ msg = Message()
+ msg.address="amqp://0.0.0.0:12345/YYY"
+ msg.subject="Hello World!"
+ body = "First the world, then the galaxy!"
+ msg.load(body)
+ self.client.put(msg)
+ self.client.send()
+
+ reply = Message()
+ self.client.recv(-1)
+ assert self.client.incoming == 1
+ self.client.get(reply)
+
+ assert reply.subject == "Hello World!"
+ rbod = reply.save()
+ assert rbod == body, (rbod, body)
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org