You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by kg...@apache.org on 2013/02/20 16:22:26 UTC
svn commit: r1448220 - in /qpid/proton/trunk: proton-c/bindings/python/
proton-c/include/proton/ proton-c/src/
proton-j/proton/src/main/java/org/apache/qpid/proton/messenger/impl/
tests/python/proton_tests/
Author: kgiusti
Date: Wed Feb 20 15:22:26 2013
New Revision: 1448220
URL: http://svn.apache.org/r1448220
Log:
PROTON-200: maintain a minimum credit level for each receive link
Modified:
qpid/proton/trunk/proton-c/bindings/python/proton.py
qpid/proton/trunk/proton-c/include/proton/messenger.h
qpid/proton/trunk/proton-c/src/messenger.c
qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/messenger/impl/MessengerImpl.java
qpid/proton/trunk/tests/python/proton_tests/messenger.py
Modified: qpid/proton/trunk/proton-c/bindings/python/proton.py
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/bindings/python/proton.py?rev=1448220&r1=1448219&r2=1448220&view=diff
==============================================================================
--- qpid/proton/trunk/proton-c/bindings/python/proton.py (original)
+++ qpid/proton/trunk/proton-c/bindings/python/proton.py Wed Feb 20 15:22:26 2013
@@ -369,8 +369,9 @@ send. Defaults to zero.
def recv(self, n=None):
"""
Receives up to I{n} messages into the incoming queue of the
- L{Messenger}. This method will block until at least one message is
- available or the operation times out.
+ L{Messenger}. If I{n} is not specified, L{Messenger} will receive as many
+ messages as it can buffer internally. This method will block until at least
+ one message is available or the operation times out.
"""
if n is None:
n = -1
Modified: qpid/proton/trunk/proton-c/include/proton/messenger.h
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/include/proton/messenger.h?rev=1448220&r1=1448219&r2=1448220&view=diff
==============================================================================
--- qpid/proton/trunk/proton-c/include/proton/messenger.h (original)
+++ qpid/proton/trunk/proton-c/include/proton/messenger.h Wed Feb 20 15:22:26 2013
@@ -329,13 +329,14 @@ PN_EXTERN pn_tracker_t pn_messenger_outg
*/
PN_EXTERN int pn_messenger_send(pn_messenger_t *messenger);
-/** Receives up to n message into the incoming message queue of a
- * messenger. Blocks until at least one message is available in the
- * incoming queue.
+/** Receives up to n messages into the incoming message queue of a
+ * messenger. If n is -1, Messenger will be able to receive as many
+ * messages as it can buffer internally. Blocks until at least one
+ * message is available in the incoming queue.
*
* @param[in] messenger the messenger
- * @param[in] n the maximum number of messages to receive, if -1
- * allow at least one message from each peer.
+ * @param[in] n the maximum number of messages to receive or -1 to to
+ * receive as many messages as it can buffer internally.
*
* @return an error code or zero on success
* @see error.h
Modified: qpid/proton/trunk/proton-c/src/messenger.c
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/src/messenger.c?rev=1448220&r1=1448219&r2=1448220&view=diff
==============================================================================
--- qpid/proton/trunk/proton-c/src/messenger.c (original)
+++ qpid/proton/trunk/proton-c/src/messenger.c Wed Feb 20 15:22:26 2013
@@ -49,6 +49,7 @@ struct pn_messenger_t {
int timeout;
pn_driver_t *driver;
bool unlimited_credit;
+ int credit_batch;
int credit;
int distributed;
uint64_t next_tag;
@@ -264,6 +265,7 @@ pn_messenger_t *pn_messenger(const char
m->timeout = -1;
m->driver = pn_driver();
m->unlimited_credit = false;
+ m->credit_batch = 10;
m->credit = 0;
m->distributed = 0;
m->next_tag = 0;
@@ -387,33 +389,47 @@ const char *pn_messenger_error(pn_messen
void pn_messenger_flow(pn_messenger_t *messenger)
{
- while (messenger->credit > 0 || messenger->unlimited_credit) {
- int prev = messenger->credit;
- pn_connector_t *ctor = pn_connector_head(messenger->driver);
- while (ctor) {
- pn_connection_t *conn = pn_connector_connection(ctor);
-
- pn_link_t *link = pn_link_head(conn, PN_LOCAL_ACTIVE);
- while (link) {
- if (pn_link_is_receiver(link)) {
- if (messenger->unlimited_credit) {
- if (!pn_link_credit(link)) {
- pn_link_flow(link, 1);
- messenger->distributed++;
- }
- } else {
- pn_link_flow(link, 1);
- messenger->credit--;
- messenger->distributed++;
- if (messenger->credit == 0) break;
- }
+ int link_ct = 0;
+ pn_connector_t *ctor = pn_connector_head(messenger->driver);
+ while (ctor) {
+ pn_connection_t *conn = pn_connector_connection(ctor);
+
+ pn_link_t *link = pn_link_head(conn, PN_LOCAL_ACTIVE);
+ while (link) {
+ if (pn_link_is_receiver(link)) link_ct++;
+ link = pn_link_next(link, PN_LOCAL_ACTIVE);
+ }
+ ctor = pn_connector_next(ctor);
+ }
+
+ if (link_ct == 0) return;
+
+ if (messenger->unlimited_credit)
+ messenger->credit = link_ct * messenger->credit_batch;
+
+ int batch = (messenger->credit < link_ct) ? 1
+ : (messenger->credit/link_ct);
+
+ ctor = pn_connector_head(messenger->driver);
+ while (ctor) {
+ pn_connection_t *conn = pn_connector_connection(ctor);
+ pn_link_t *link = pn_link_head(conn, PN_LOCAL_ACTIVE);
+ while (link) {
+ if (pn_link_is_receiver(link)) {
+
+ int have = pn_link_credit(link);
+ if (have < batch) {
+ int need = batch - have;
+ int amount = (messenger->credit < need) ? messenger->credit : need;
+ pn_link_flow(link, amount);
+ messenger->distributed += amount;
+ messenger->credit -= amount;
+ if (messenger->credit == 0) return;
}
- link = pn_link_next(link, PN_LOCAL_ACTIVE);
}
-
- ctor = pn_connector_next(ctor);
+ link = pn_link_next(link, PN_LOCAL_ACTIVE);
}
- if (messenger->unlimited_credit || messenger->credit == prev) break;
+ ctor = pn_connector_next(ctor);
}
}
Modified: qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/messenger/impl/MessengerImpl.java
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/messenger/impl/MessengerImpl.java?rev=1448220&r1=1448219&r2=1448220&view=diff
==============================================================================
--- qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/messenger/impl/MessengerImpl.java (original)
+++ qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/messenger/impl/MessengerImpl.java Wed Feb 20 15:22:26 2013
@@ -74,6 +74,7 @@ public class MessengerImpl implements Me
private byte[] _buffer = new byte[5*1024];
private Driver _driver;
private boolean _unlimitedCredit = false;
+ private static final int _creditBatch = 10;
private int _credit;
private int _distributed;
private TrackerQueue _incoming = new TrackerQueue();
@@ -571,27 +572,44 @@ public class MessengerImpl implements Me
private void distributeCredit()
{
- int previous = 0;
- while (_unlimitedCredit || (_credit > 0 && _credit != previous))
+ int linkCt = 0;
+ // @todo track the number of opened receive links
+ for (Connector c : _driver.connectors())
{
- previous = _credit;
- for (Connector c : _driver.connectors())
+ Connection connection = c.getConnection();
+ for (Link link : new Links(connection, ACTIVE, ANY))
{
- Connection connection = c.getConnection();
- for (Link link : new Links(connection, ACTIVE, ANY))
+ if (link instanceof Receiver) linkCt++;
+ }
+ }
+
+ if (linkCt == 0) return;
+
+ if (_unlimitedCredit)
+ {
+ _credit = linkCt * _creditBatch;
+ }
+
+ int batch = (_credit < linkCt) ? 1 : (_credit/linkCt);
+ for (Connector c : _driver.connectors())
+ {
+ Connection connection = c.getConnection();
+ for (Link link : new Links(connection, ACTIVE, ANY))
+ {
+ if (link instanceof Receiver)
{
- if (link instanceof Receiver)
+ int have = ((Receiver) link).getCredit();
+ if (have < batch)
{
- ((Receiver) link).flow(1);
- _distributed++;
- if (!_unlimitedCredit) {
- _credit--;
- if (_credit == 0) return;
- }
+ int need = batch - have;
+ int amount = (_credit < need) ? _credit : need;
+ ((Receiver) link).flow(amount);
+ _distributed += amount;
+ _credit -= amount;
+ if (_credit == 0) return;
}
}
}
- if (_unlimitedCredit) return;
}
}
Modified: qpid/proton/trunk/tests/python/proton_tests/messenger.py
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/tests/python/proton_tests/messenger.py?rev=1448220&r1=1448219&r2=1448220&view=diff
==============================================================================
--- qpid/proton/trunk/tests/python/proton_tests/messenger.py (original)
+++ qpid/proton/trunk/tests/python/proton_tests/messenger.py Wed Feb 20 15:22:26 2013
@@ -25,7 +25,7 @@ from time import sleep, time
class Test(common.Test):
def setup(self):
- self.server_credit = -1
+ self.server_credit = 10
self.server_received = 0
self.server = Messenger("server")
self.server.timeout=10000
@@ -210,7 +210,6 @@ class MessengerTest(Test):
assert self.client.status(t) is ACCEPTED, (t, self.client.status(t))
def testRejectIndividual(self):
- self.server_credit = 10
self.testReject(self.reject_individual)
def reject_individual(self, msg):
@@ -276,6 +275,7 @@ class MessengerTest(Test):
""" Bring up two links. Verify credit is granted to each link by
transferring a message over each.
"""
+ self.server_credit = -1
self.start()
msg = Message()
@@ -304,7 +304,7 @@ class MessengerTest(Test):
self.client.send()
reply = Message()
- self.client.recv(-1)
+ self.client.recv(1)
assert self.client.incoming == 1
self.client.get(reply)
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org