You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rh...@apache.org on 2013/09/30 14:58:51 UTC
svn commit: r1527535 - in /qpid/proton/trunk:
proton-c/bindings/java/src/main/java/org/apache/qpid/proton/engine/jni/
proton-c/bindings/python/ proton-c/include/proton/ proton-c/src/engine/
proton-c/src/transport/ proton-j/proton-api/src/main/java/org/...
Author: rhs
Date: Mon Sep 30 12:58:51 2013
New Revision: 1527535
URL: http://svn.apache.org/r1527535
Log:
Modified the engine to track how much credit is drained vs used. This should simplify PROTON-200.
Modified:
qpid/proton/trunk/proton-c/bindings/java/src/main/java/org/apache/qpid/proton/engine/jni/JNILink.java
qpid/proton/trunk/proton-c/bindings/java/src/main/java/org/apache/qpid/proton/engine/jni/JNISender.java
qpid/proton/trunk/proton-c/bindings/python/proton.py
qpid/proton/trunk/proton-c/include/proton/engine.h
qpid/proton/trunk/proton-c/src/engine/engine-internal.h
qpid/proton/trunk/proton-c/src/engine/engine.c
qpid/proton/trunk/proton-c/src/transport/transport.c
qpid/proton/trunk/proton-j/proton-api/src/main/java/org/apache/qpid/proton/engine/Link.java
qpid/proton/trunk/proton-j/proton-api/src/main/java/org/apache/qpid/proton/engine/Sender.java
qpid/proton/trunk/proton-j/proton-api/src/main/resources/proton.py
qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/engine/impl/LinkImpl.java
qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/engine/impl/SenderImpl.java
qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/engine/impl/TransportImpl.java
qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/engine/impl/TransportReceiver.java
qpid/proton/trunk/tests/python/proton_tests/engine.py
Modified: qpid/proton/trunk/proton-c/bindings/java/src/main/java/org/apache/qpid/proton/engine/jni/JNILink.java
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/bindings/java/src/main/java/org/apache/qpid/proton/engine/jni/JNILink.java?rev=1527535&r1=1527534&r2=1527535&view=diff
==============================================================================
--- qpid/proton/trunk/proton-c/bindings/java/src/main/java/org/apache/qpid/proton/engine/jni/JNILink.java (original)
+++ qpid/proton/trunk/proton-c/bindings/java/src/main/java/org/apache/qpid/proton/engine/jni/JNILink.java Mon Sep 30 12:58:51 2013
@@ -530,4 +530,12 @@ abstract class JNILink implements Link
free();
super.finalize();
}
+
+ @Override
+ @ProtonCEquivalent("pn_link_drained")
+ public int drained()
+ {
+ return Proton.pn_link_drained(getImpl());
+ }
+
}
Modified: qpid/proton/trunk/proton-c/bindings/java/src/main/java/org/apache/qpid/proton/engine/jni/JNISender.java
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/bindings/java/src/main/java/org/apache/qpid/proton/engine/jni/JNISender.java?rev=1527535&r1=1527534&r2=1527535&view=diff
==============================================================================
--- qpid/proton/trunk/proton-c/bindings/java/src/main/java/org/apache/qpid/proton/engine/jni/JNISender.java (original)
+++ qpid/proton/trunk/proton-c/bindings/java/src/main/java/org/apache/qpid/proton/engine/jni/JNISender.java Mon Sep 30 12:58:51 2013
@@ -55,10 +55,4 @@ public class JNISender extends JNILink i
// TODO
}
- @Override
- @ProtonCEquivalent("pn_link_drained")
- public void drained()
- {
- Proton.pn_link_drained(getImpl());
- }
}
Modified: qpid/proton/trunk/proton-c/bindings/python/proton.py
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/bindings/python/proton.py?rev=1527535&r1=1527534&r2=1527535&view=diff
==============================================================================
--- qpid/proton/trunk/proton-c/bindings/python/proton.py (original)
+++ qpid/proton/trunk/proton-c/bindings/python/proton.py Mon Sep 30 12:58:51 2013
@@ -2237,6 +2237,8 @@ class Link(Endpoint):
pn_link_set_rcv_settle_mode(self._link, mode)
rcv_settle_mode = property(_get_rcv_settle_mode, _set_rcv_settle_mode)
+ def drained(self):
+ return pn_link_drained(self._link)
class Terminus(object):
@@ -2333,9 +2335,6 @@ class Sender(Link):
def send(self, bytes):
return self._check(pn_link_send(self._link, bytes))
- def drained(self):
- pn_link_drained(self._link)
-
class Receiver(Link):
def flow(self, n):
Modified: qpid/proton/trunk/proton-c/include/proton/engine.h
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/include/proton/engine.h?rev=1527535&r1=1527534&r2=1527535&view=diff
==============================================================================
--- qpid/proton/trunk/proton-c/include/proton/engine.h (original)
+++ qpid/proton/trunk/proton-c/include/proton/engine.h Mon Sep 30 12:58:51 2013
@@ -496,7 +496,7 @@ PN_EXTERN void pn_link_set_context(pn_li
// sender
PN_EXTERN void pn_link_offered(pn_link_t *sender, int credit);
PN_EXTERN ssize_t pn_link_send(pn_link_t *sender, const char *bytes, size_t n);
-PN_EXTERN void pn_link_drained(pn_link_t *sender);
+PN_EXTERN int pn_link_drained(pn_link_t *sender);
//void pn_link_abort(pn_sender_t *sender);
// receiver
Modified: qpid/proton/trunk/proton-c/src/engine/engine-internal.h
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/src/engine/engine-internal.h?rev=1527535&r1=1527534&r2=1527535&view=diff
==============================================================================
--- qpid/proton/trunk/proton-c/src/engine/engine-internal.h (original)
+++ qpid/proton/trunk/proton-c/src/engine/engine-internal.h Mon Sep 30 12:58:51 2013
@@ -241,7 +241,7 @@ struct pn_link_t {
pn_sequence_t credit;
pn_sequence_t queued;
bool drain;
- bool drained; // sender only
+ int drained; // number of drained credits
void *context;
pn_link_state_t state;
};
Modified: qpid/proton/trunk/proton-c/src/engine/engine.c
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/src/engine/engine.c?rev=1527535&r1=1527534&r2=1527535&view=diff
==============================================================================
--- qpid/proton/trunk/proton-c/src/engine/engine.c (original)
+++ qpid/proton/trunk/proton-c/src/engine/engine.c Mon Sep 30 12:58:51 2013
@@ -680,7 +680,7 @@ pn_link_t *pn_link_new(int type, pn_sess
link->credit = 0;
link->queued = 0;
link->drain = false;
- link->drained = false;
+ link->drained = 0;
link->context = 0;
link->snd_settle_mode = PN_SND_MIXED;
link->rcv_settle_mode = PN_RCV_FIRST;
@@ -1253,13 +1253,24 @@ ssize_t pn_link_send(pn_link_t *sender,
return n;
}
-void pn_link_drained(pn_link_t *sender)
+int pn_link_drained(pn_link_t *link)
{
- if (sender && sender->drain && sender->credit > 0) {
- sender->credit = 0;
- sender->drained = true;
- pn_modified(sender->session->connection, &sender->endpoint);
+ assert(link);
+ int drained = 0;
+
+ if (pn_link_is_sender(link)) {
+ if (link->drain && link->credit > 0) {
+ link->drained = link->credit;
+ link->credit = 0;
+ pn_modified(link->session->connection, &link->endpoint);
+ drained = link->drained;
+ }
+ } else {
+ drained = link->drained;
+ link->drained = 0;
}
+
+ return drained;
}
ssize_t pn_link_recv(pn_link_t *receiver, char *bytes, size_t n)
@@ -1295,10 +1306,10 @@ void pn_link_flow(pn_link_t *receiver, i
void pn_link_drain(pn_link_t *receiver, int credit)
{
- if (receiver && pn_link_is_receiver(receiver)) {
- pn_link_flow(receiver, credit);
- receiver->drain = true;
- }
+ assert(receiver);
+ assert(pn_link_is_receiver(receiver));
+ pn_link_flow(receiver, credit);
+ receiver->drain = true;
}
pn_link_t *pn_delivery_link(pn_delivery_t *delivery)
Modified: qpid/proton/trunk/proton-c/src/transport/transport.c
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/src/transport/transport.c?rev=1527535&r1=1527534&r2=1527535&view=diff
==============================================================================
--- qpid/proton/trunk/proton-c/src/transport/transport.c (original)
+++ qpid/proton/trunk/proton-c/src/transport/transport.c Mon Sep 30 12:58:51 2013
@@ -750,6 +750,7 @@ int pn_do_flow(pn_dispatcher_t *disp)
link->state.delivery_count += delta;
link->state.link_credit -= delta;
link->credit -= delta;
+ link->drained += delta;
}
}
}
@@ -1471,7 +1472,7 @@ int pn_process_flow_sender(pn_transport_
if (!tail || !pn_delivery_buffered(tail)) {
state->delivery_count += state->link_credit;
state->link_credit = 0;
- snd->drained = false;
+ snd->drained = 0;
return pn_post_flow(transport, ssn, snd);
}
}
Modified: qpid/proton/trunk/proton-j/proton-api/src/main/java/org/apache/qpid/proton/engine/Link.java
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-j/proton-api/src/main/java/org/apache/qpid/proton/engine/Link.java?rev=1527535&r1=1527534&r2=1527535&view=diff
==============================================================================
--- qpid/proton/trunk/proton-j/proton-api/src/main/java/org/apache/qpid/proton/engine/Link.java (original)
+++ qpid/proton/trunk/proton-j/proton-api/src/main/java/org/apache/qpid/proton/engine/Link.java Mon Sep 30 12:58:51 2013
@@ -179,4 +179,7 @@ public interface Link extends Endpoint
*/
@Deprecated
void setRemoteSenderSettleMode(SenderSettleMode remoteSenderSettleMode);
+
+ public int drained();
+
}
Modified: qpid/proton/trunk/proton-j/proton-api/src/main/java/org/apache/qpid/proton/engine/Sender.java
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-j/proton-api/src/main/java/org/apache/qpid/proton/engine/Sender.java?rev=1527535&r1=1527534&r2=1527535&view=diff
==============================================================================
--- qpid/proton/trunk/proton-j/proton-api/src/main/java/org/apache/qpid/proton/engine/Sender.java (original)
+++ qpid/proton/trunk/proton-j/proton-api/src/main/java/org/apache/qpid/proton/engine/Sender.java Mon Sep 30 12:58:51 2013
@@ -55,8 +55,6 @@ public interface Sender extends Link
*/
public void abort();
- public void drained();
-
/**
* {@inheritDoc}
*
@@ -70,4 +68,5 @@ public interface Sender extends Link
*/
@Override
public boolean advance();
+
}
Modified: qpid/proton/trunk/proton-j/proton-api/src/main/resources/proton.py
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-j/proton-api/src/main/resources/proton.py?rev=1527535&r1=1527534&r2=1527535&view=diff
==============================================================================
--- qpid/proton/trunk/proton-j/proton-api/src/main/resources/proton.py (original)
+++ qpid/proton/trunk/proton-j/proton-api/src/main/resources/proton.py Mon Sep 30 12:58:51 2013
@@ -396,6 +396,9 @@ class Link(Endpoint):
self.impl.setReceiverSettleMode(mode)
rcv_settle_mode = property(_get_rcv_settle_mode, _set_rcv_settle_mode)
+ def drained(self):
+ return self.impl.drained()
+
class DataDummy:
@@ -475,9 +478,6 @@ class Sender(Link):
def send(self, bytes):
return self.impl.send(bytes, 0, len(bytes))
- def drained(self):
- self.impl.drained()
-
class Receiver(Link):
def flow(self, n):
Modified: qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/engine/impl/LinkImpl.java
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/engine/impl/LinkImpl.java?rev=1527535&r1=1527534&r2=1527535&view=diff
==============================================================================
--- qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/engine/impl/LinkImpl.java (original)
+++ qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/engine/impl/LinkImpl.java Mon Sep 30 12:58:51 2013
@@ -44,7 +44,7 @@ public abstract class LinkImpl extends E
private int _queued;
private int _credit;
private int _unsettled;
-
+ private int _drained;
private SenderSettleMode _senderSettleMode;
private SenderSettleMode _remoteSenderSettleMode;
@@ -353,4 +353,35 @@ public abstract class LinkImpl extends E
{
_remoteReceiverSettleMode = remoteReceiverSettleMode;
}
+
+ public int drained()
+ {
+ int drained = 0;
+
+ if (this instanceof SenderImpl) {
+ if(getDrain() && hasCredit())
+ {
+ _drained = getCredit();
+ setCredit(0);
+ modified();
+ drained = _drained;
+ }
+ } else {
+ drained = _drained;
+ _drained = 0;
+ }
+
+ return drained;
+ }
+
+ int getDrained()
+ {
+ return _drained;
+ }
+
+ void setDrained(int value)
+ {
+ _drained = value;
+ }
+
}
Modified: qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/engine/impl/SenderImpl.java
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/engine/impl/SenderImpl.java?rev=1527535&r1=1527534&r2=1527535&view=diff
==============================================================================
--- qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/engine/impl/SenderImpl.java (original)
+++ qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/engine/impl/SenderImpl.java Mon Sep 30 12:58:51 2013
@@ -29,7 +29,6 @@ public class SenderImpl extends LinkImp
{
private int _offered;
private TransportSender _transportLink;
- private boolean _drained;
SenderImpl(SessionImpl session, String name)
{
@@ -130,23 +129,4 @@ public class SenderImpl extends LinkImp
}*/
}
- public void drained()
- {
- if(getDrain())
- {
- _drained = true;
- setCredit(0);
- modified();
- }
- }
-
- boolean clearDrained()
- {
- final boolean drained = _drained;
- if(drained)
- {
- _drained = false;
- }
- return drained;
- }
}
Modified: qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/engine/impl/TransportImpl.java
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/engine/impl/TransportImpl.java?rev=1527535&r1=1527534&r2=1527535&view=diff
==============================================================================
--- qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/engine/impl/TransportImpl.java (original)
+++ qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/engine/impl/TransportImpl.java Mon Sep 30 12:58:51 2013
@@ -382,7 +382,7 @@ public class TransportImpl extends Endpo
if(endpoint instanceof SenderImpl)
{
SenderImpl sender = (SenderImpl) endpoint;
- if(sender.getDrain() && sender.clearDrained())
+ if(sender.getDrain() && sender.getDrained() > 0)
{
TransportSender transportLink = sender.getTransportLink();
TransportSession transportSession = sender.getSession().getTransportSession();
@@ -390,6 +390,7 @@ public class TransportImpl extends Endpo
transportLink.setLinkCredit(UnsignedInteger.valueOf(0));
transportLink.setDeliveryCount(transportLink.getDeliveryCount().add(credits));
transportLink.setLinkCredit(UnsignedInteger.ZERO);
+ sender.setDrained(0);
writeFlow(transportSession, transportLink);
Modified: qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/engine/impl/TransportReceiver.java
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/engine/impl/TransportReceiver.java?rev=1527535&r1=1527534&r2=1527535&view=diff
==============================================================================
--- qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/engine/impl/TransportReceiver.java (original)
+++ qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/engine/impl/TransportReceiver.java Mon Sep 30 12:58:51 2013
@@ -42,11 +42,15 @@ class TransportReceiver extends Transpor
void handleFlow(Flow flow)
{
super.handleFlow(flow);
- if(getRemoteDeliveryCount().compareTo(getDeliveryCount())>=0)
+ int remote = getRemoteDeliveryCount().intValue();
+ int local = getDeliveryCount().intValue();
+ int delta = remote - local;
+ if(delta > 0)
{
- getLink().setCredit(getLink().getQueued());
+ getLink().addCredit(-delta);
setLinkCredit(getRemoteLinkCredit());
setDeliveryCount(getRemoteDeliveryCount());
+ getLink().setDrained(getLink().getDrained() + delta);
}
Modified: qpid/proton/trunk/tests/python/proton_tests/engine.py
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/tests/python/proton_tests/engine.py?rev=1527535&r1=1527534&r2=1527535&view=diff
==============================================================================
--- qpid/proton/trunk/tests/python/proton_tests/engine.py (original)
+++ qpid/proton/trunk/tests/python/proton_tests/engine.py Mon Sep 30 12:58:51 2013
@@ -1049,6 +1049,8 @@ class CreditTest(Test):
self.pump()
assert self.rcv.credit == 0
assert self.snd.credit == 0
+ drained = self.rcv.drained()
+ assert drained == 10, drained
def testPartialDrain(self):
self.rcv.drain(2)
@@ -1066,6 +1068,8 @@ class CreditTest(Test):
assert self.rcv.advance()
assert not self.rcv.current
assert self.rcv.credit == 0, self.rcv.credit
+ drained = self.rcv.drained()
+ assert drained == 1, drained
def testDrainFlow(self):
assert self.rcv.credit == 0
@@ -1094,6 +1098,8 @@ class CreditTest(Test):
self.pump()
assert self.rcv.credit == 10
assert self.snd.credit == 10
+ drained = self.rcv.drained()
+ assert drained == 10, drained
def testNegative(self):
assert self.snd.credit == 0
@@ -1123,6 +1129,8 @@ class CreditTest(Test):
assert self.snd.credit == 0
assert self.rcv.credit == 0
assert self.rcv.queued == 0
+ drained = self.rcv.drained()
+ assert drained == 0
self.rcv.flow(10)
self.pump()
@@ -1135,6 +1143,8 @@ class CreditTest(Test):
assert self.snd.credit == 10
assert self.rcv.credit == 10
assert self.rcv.queued == 0
+ drained = self.rcv.drained()
+ assert drained == 0
self.rcv.drain(0)
assert self.snd.credit == 10
@@ -1151,11 +1161,112 @@ class CreditTest(Test):
assert self.snd.credit == 0
assert self.rcv.credit == 10
assert self.rcv.queued == 0
+ drained = self.rcv.drained()
+ assert drained == 0
self.pump()
assert self.snd.credit == 0
assert self.rcv.credit == 0
assert self.rcv.queued == 0
+ drained = self.rcv.drained()
+ assert drained == 10
+
+
+ def testDrainOrder(self):
+ """ Verify drain/drained works regardless of ordering. See PROTON-401
+ """
+ assert self.snd.credit == 0
+ assert self.rcv.credit == 0
+ assert self.rcv.queued == 0
+
+ #self.rcv.session.connection._transport.trace(Transport.TRACE_FRM)
+ #self.snd.session.connection._transport.trace(Transport.TRACE_FRM)
+
+ ## verify that a sender that has reached the drain state will respond
+ ## promptly to a drain issued by the peer.
+ self.rcv.flow(10)
+ self.pump()
+ assert self.snd.credit == 10, self.snd.credit
+ assert self.rcv.credit == 10, self.rcv.credit
+
+ sd = self.snd.delivery("tagA")
+ assert sd
+ n = self.snd.send("A")
+ assert n == 1
+ self.pump()
+ self.snd.advance()
+
+ # done sending, so signal that we are drained:
+ self.snd.drained()
+ self.pump()
+ assert self.snd.credit == 9, self.snd.credit
+ assert self.rcv.credit == 10, self.rcv.credit
+
+ self.rcv.drain(0)
+ self.pump()
+ assert self.snd.credit == 9, self.snd.credit
+ assert self.rcv.credit == 10, self.rcv.credit
+
+ bytes = self.rcv.recv(10)
+ assert bytes == "A", bytes
+ self.rcv.advance()
+ self.pump()
+ assert self.snd.credit == 9, self.snd.credit
+ assert self.rcv.credit == 9, self.rcv.credit
+
+ self.snd.drained()
+ self.pump()
+ assert self.snd.credit == 0, self.snd.credit
+ assert self.rcv.credit == 0, self.rcv.credit
+
+ # verify that a drain requested by the peer is not "acknowledged" until
+ # after the sender has completed sending its pending messages
+
+ self.rcv.flow(10)
+ self.pump()
+ assert self.snd.credit == 10, self.snd.credit
+ assert self.rcv.credit == 10, self.rcv.credit
+
+ sd = self.snd.delivery("tagB")
+ assert sd
+ n = self.snd.send("B")
+ assert n == 1
+ self.snd.advance()
+ self.pump()
+ assert self.snd.credit == 9, self.snd.credit
+ assert self.rcv.credit == 10, self.rcv.credit
+
+ self.rcv.drain(0)
+ self.pump()
+ assert self.snd.credit == 9, self.snd.credit
+ assert self.rcv.credit == 10, self.rcv.credit
+
+ sd = self.snd.delivery("tagC")
+ assert sd
+ n = self.snd.send("C")
+ assert n == 1
+ self.snd.advance()
+ self.pump()
+ assert self.snd.credit == 8, self.snd.credit
+ assert self.rcv.credit == 10, self.rcv.credit
+
+ # now that the sender has finished sending everything, it can signal
+ # drained
+ self.snd.drained()
+ self.pump()
+ assert self.snd.credit == 0, self.snd.credit
+ assert self.rcv.credit == 2, self.rcv.credit
+
+ bytes = self.rcv.recv(10)
+ assert bytes == "B", bytes
+ self.rcv.advance()
+ bytes = self.rcv.recv(10)
+ assert bytes == "C", bytes
+ self.rcv.advance()
+ self.pump()
+ assert self.snd.credit == 0, self.snd.credit
+ assert self.rcv.credit == 0, self.rcv.credit
+
def testPushback(self, count=10):
assert self.snd.credit == 0
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org