You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rh...@apache.org on 2013/07/18 12:13:35 UTC
svn commit: r1504407 - in /qpid/proton/trunk: proton-c/src/messenger/
proton-j/proton-api/src/main/java/org/apache/qpid/proton/driver/
proton-j/proton/src/main/java/org/apache/qpid/proton/driver/impl/
proton-j/proton/src/main/java/org/apache/qpid/proto...
Author: rhs
Date: Thu Jul 18 10:13:34 2013
New Revision: 1504407
URL: http://svn.apache.org/r1504407
Log:
Bounded messenger's credit allocation when an unlimited value is passed to recv; added tests for pushback at both messenger and engine level; fixed proton-j engine to respect credit window; fixed java messenger to properly report if work has been done. This addresses PROTON-350 and PROTON-351.
Modified:
qpid/proton/trunk/proton-c/src/messenger/messenger.c
qpid/proton/trunk/proton-j/proton-api/src/main/java/org/apache/qpid/proton/driver/Connector.java
qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/driver/impl/ConnectorImpl.java
qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/engine/impl/TransportImpl.java
qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/engine/impl/TransportLink.java
qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/messenger/impl/MessengerImpl.java
qpid/proton/trunk/tests/python/proton_tests/engine.py
qpid/proton/trunk/tests/python/proton_tests/messenger.py
qpid/proton/trunk/tests/ruby/proton_tests/smoke.rb
Modified: qpid/proton/trunk/proton-c/src/messenger/messenger.c
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/src/messenger/messenger.c?rev=1504407&r1=1504406&r2=1504407&view=diff
==============================================================================
--- qpid/proton/trunk/proton-c/src/messenger/messenger.c (original)
+++ qpid/proton/trunk/proton-c/src/messenger/messenger.c Thu Jul 18 10:13:34 2013
@@ -188,7 +188,7 @@ pn_messenger_t *pn_messenger(const char
m->blocking = true;
m->driver = pn_driver();
m->receiving = 0;
- m->credit_batch = 10;
+ m->credit_batch = 1024;
m->credit = 0;
m->distributed = 0;
m->next_tag = 0;
@@ -359,7 +359,7 @@ void pn_messenger_flow(pn_messenger_t *m
if (link_ct == 0) return;
if (messenger->receiving == -1) {
- messenger->credit = link_ct * messenger->credit_batch;
+ messenger->credit = link_ct * messenger->credit_batch - pn_messenger_incoming(messenger);
} else {
int total = messenger->credit + messenger->distributed;
if (messenger->receiving > total)
Modified: qpid/proton/trunk/proton-j/proton-api/src/main/java/org/apache/qpid/proton/driver/Connector.java
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-j/proton-api/src/main/java/org/apache/qpid/proton/driver/Connector.java?rev=1504407&r1=1504406&r2=1504407&view=diff
==============================================================================
--- qpid/proton/trunk/proton-j/proton-api/src/main/java/org/apache/qpid/proton/driver/Connector.java (original)
+++ qpid/proton/trunk/proton-j/proton-api/src/main/java/org/apache/qpid/proton/driver/Connector.java Thu Jul 18 10:13:34 2013
@@ -44,7 +44,7 @@ public interface Connector<C>
* Typically, applications repeatedly invoke this method
* during the lifetime of a connection.
*/
- void process() throws IOException;
+ boolean process() throws IOException;
/**
* Access the listener which opened this connector.
Modified: qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/driver/impl/ConnectorImpl.java
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/driver/impl/ConnectorImpl.java?rev=1504407&r1=1504406&r2=1504407&view=diff
==============================================================================
--- qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/driver/impl/ConnectorImpl.java (original)
+++ qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/driver/impl/ConnectorImpl.java Thu Jul 18 10:13:34 2013
@@ -71,34 +71,45 @@ class ConnectorImpl<C> implements Connec
_readPending = true;
}
- public void process() throws IOException
+ public boolean process() throws IOException
{
+ boolean processed = false;
if (_channel.isOpen() && _channel.finishConnect())
{
if (_readPending)
{
- read();
+ if (read()) {
+ processed = true;
+ }
_readPending = false;
- if (isClosed()) return;
+ if (isClosed()) return processed;
}
else
{
- processInput();
+ if (processInput() > 0) {
+ processed = true;
+ }
+ }
+ if (write()) {
+ processed = true;
}
- write();
}
+ return processed;
}
- private void read() throws IOException
+ private boolean read() throws IOException
{
+ boolean processed = false;
int bytesRead = 0;
while ((bytesRead = _channel.read(_readBuffer)) > 0)
{
processInput();
+ processed = true;
}
if (bytesRead == -1) {
close();
}
+ return processed;
}
private int processInput() throws IOException
@@ -127,8 +138,9 @@ class ConnectorImpl<C> implements Connec
return total;
}
- private void write() throws IOException
+ private boolean write() throws IOException
{
+ boolean processed = false;
int interest = _key.interestOps();
boolean empty = _writeBuffer.position() == 0;
boolean done = false;
@@ -138,6 +150,9 @@ class ConnectorImpl<C> implements Connec
_writeBuffer.position(_writeBuffer.position() + produced);
_writeBuffer.flip();
int wrote = _channel.write(_writeBuffer);
+ if (wrote > 0) {
+ processed = true;
+ }
if (_logger.isLoggable(Level.FINE))
{
_logger.log(Level.FINE, this + " wrote " + wrote + " bytes, " + _writeBuffer.remaining() + " remaining");
@@ -163,6 +178,7 @@ class ConnectorImpl<C> implements Connec
{
_logger.log(Level.FINE, this + " finished writing output to the channel.");
}
+ return processed;
}
public Listener<C> listener()
Modified: qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/engine/impl/TransportImpl.java
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/engine/impl/TransportImpl.java?rev=1504407&r1=1504406&r2=1504407&view=diff
==============================================================================
--- qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/engine/impl/TransportImpl.java (original)
+++ qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/engine/impl/TransportImpl.java Thu Jul 18 10:13:34 2013
@@ -460,7 +460,8 @@ public class TransportImpl extends Endpo
while(delivery != null && buffer.remaining() >= _maxFrameSize)
{
if((delivery.getLink() instanceof SenderImpl) && !(delivery.isDone() && delivery.getDataLength() == 0)
- && delivery.getLink().getSession().getTransportSession().hasOutgoingCredit())
+ && delivery.getLink().getSession().getTransportSession().hasOutgoingCredit() &&
+ delivery.getLink().getTransportLink().hasCredit())
{
SenderImpl sender = (SenderImpl) delivery.getLink();
@@ -916,16 +917,18 @@ public class TransportImpl extends Endpo
_encoder.writeObject(frameBody);
}
+ ByteBuffer originalPayload = null;
+ if( payload!=null )
+ {
+ originalPayload = payload.duplicate();
+ }
+ TransportFrame frame = new TransportFrame(channel, frameBody, Binary.create(originalPayload));
+ log(OUTGOING, frame);
+
if( _protocolTracer!=null )
{
- ByteBuffer originalPayload = null;
- if( payload!=null )
- {
- originalPayload = payload.duplicate();
- }
- _protocolTracer.sentFrame(new TransportFrame(channel, frameBody, Binary.create(originalPayload)));
+ _protocolTracer.sentFrame(frame);
}
- _engineLogger.outgoingBytes(channel, frameBody, payload);
int payloadSize = Math.min(payload == null ? 0 : payload.remaining(), _maxFrameSize - (buffer.position() - oldPosition));
if(payloadSize > 0)
@@ -1189,6 +1192,8 @@ public class TransportImpl extends Endpo
throw new IllegalStateException("Transport cannot accept frame: " + frame);
}
+ log(INCOMING, frame);
+
if( _protocolTracer != null )
{
_protocolTracer.receivedFrame(frame);
@@ -1291,4 +1296,20 @@ public class TransportImpl extends Endpo
{
_frameHandler = frameHandler;
}
+
+ private static String INCOMING = "<-";
+ private static String OUTGOING = "->";
+
+ private void log(String event, TransportFrame frame)
+ {
+ /*StringBuilder msg = new StringBuilder();
+ msg.append("[").append(System.identityHashCode(this)).append(":")
+ .append(frame.getChannel()).append("]");
+ msg.append(" ").append(event).append(" ").append(frame.getBody());
+ if (frame.getPayload() != null) {
+ msg.append(" \"").append(frame.getPayload()).append("\"");
+ }
+ System.out.println(msg.toString());*/
+ }
+
}
Modified: qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/engine/impl/TransportLink.java
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/engine/impl/TransportLink.java?rev=1504407&r1=1504406&r2=1504407&view=diff
==============================================================================
--- qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/engine/impl/TransportLink.java (original)
+++ qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/engine/impl/TransportLink.java Thu Jul 18 10:13:34 2013
@@ -105,6 +105,11 @@ class TransportLink<T extends LinkImpl>
_linkCredit = UnsignedInteger.valueOf(_linkCredit.intValue() + credits);
}
+ public boolean hasCredit()
+ {
+ return getLinkCredit().compareTo(UnsignedInteger.ZERO) > 0;
+ }
+
public T getLink()
{
return _link;
Modified: qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/messenger/impl/MessengerImpl.java
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/messenger/impl/MessengerImpl.java?rev=1504407&r1=1504406&r2=1504407&view=diff
==============================================================================
--- qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/messenger/impl/MessengerImpl.java (original)
+++ qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/messenger/impl/MessengerImpl.java Thu Jul 18 10:13:34 2013
@@ -76,9 +76,10 @@ public class MessengerImpl implements Me
private long _nextTag = 1;
private byte[] _buffer = new byte[5*1024];
private Driver _driver;
- private boolean _unlimitedCredit = false;
- private static final int _creditBatch = 10;
+ private int _receiving = 0;
+ private static final int _creditBatch = 1024;
private int _credit;
+ private int _distributed;
private TrackerQueue _incoming = new TrackerQueue();
private TrackerQueue _outgoing = new TrackerQueue();
@@ -251,17 +252,18 @@ public class MessengerImpl implements Me
{
_logger.fine(this + " about to wait for up to " + n + " messages to be received");
}
- if (n == -1) {
- _unlimitedCredit = true;
- } else {
- _credit += n;
- _unlimitedCredit = false;
- }
+
+ _receiving = n;
distributeCredit();
waitUntil(_messageAvailable);
}
+ public int receiving()
+ {
+ return _receiving;
+ }
+
public Message get()
{
for (Connector<?> c : _driver.connectors())
@@ -279,6 +281,7 @@ public class MessengerImpl implements Me
message.decode(_buffer, 0, size);
delivery.getLink().advance();
_incoming.add(delivery);
+ _distributed--;
return message;
}
else
@@ -433,11 +436,14 @@ public class MessengerImpl implements Me
private void processAllConnectors()
{
+ distributeCredit();
for (Connector<?> c : _driver.connectors())
{
try
{
- c.process();
+ if (c.process()) {
+ _worked = true;
+ }
}
catch (IOException e)
{
@@ -536,6 +542,9 @@ public class MessengerImpl implements Me
{
reclaimCredit(connection);
c.destroy();
+ // XXX: could we do this once at the end of the loop
+ // instead of every time we reclaim?
+ distributeCredit();
}
else
{
@@ -622,6 +631,7 @@ public class MessengerImpl implements Me
private void reclaimCredit(int credit)
{
_credit += credit;
+ _distributed -= credit;
}
private void distributeCredit()
@@ -639,9 +649,13 @@ public class MessengerImpl implements Me
if (linkCt == 0) return;
- if (_unlimitedCredit)
+ if (_receiving < 0)
{
- _credit = linkCt * _creditBatch;
+ _credit = linkCt * _creditBatch - incoming();
+ } else {
+ int total = _credit + _distributed;
+ if (_receiving > total)
+ _credit += _receiving - total;
}
int batch = (_credit < linkCt) ? 1 : (_credit/linkCt);
@@ -659,6 +673,7 @@ public class MessengerImpl implements Me
int amount = (_credit < need) ? _credit : need;
((Receiver) link).flow(amount);
_credit -= amount;
+ _distributed += amount;
if (_credit == 0) return;
}
}
Modified: qpid/proton/trunk/tests/python/proton_tests/engine.py
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/tests/python/proton_tests/engine.py?rev=1504407&r1=1504406&r2=1504407&view=diff
==============================================================================
--- qpid/proton/trunk/tests/python/proton_tests/engine.py (original)
+++ qpid/proton/trunk/tests/python/proton_tests/engine.py Thu Jul 18 10:13:34 2013
@@ -1157,6 +1157,33 @@ class CreditTest(Test):
assert self.rcv.credit == 0
assert self.rcv.queued == 0
+ def testPushback(self, count=10):
+ assert self.snd.credit == 0
+ assert self.rcv.credit == 0
+
+ self.rcv.flow(count)
+ self.pump()
+
+ for i in range(count):
+ d = self.snd.delivery("tag%s" % i)
+ assert d
+ self.snd.advance()
+
+ assert self.snd.queued == count
+ assert self.rcv.queued == 0
+ self.pump()
+ assert self.snd.queued == 0
+ assert self.rcv.queued == count
+
+ d = self.snd.delivery("extra")
+ self.snd.advance()
+
+ assert self.snd.queued == 1
+ assert self.rcv.queued == count
+ self.pump()
+ assert self.snd.queued == 1
+ assert self.rcv.queued == count
+
class SessionCreditTest(Test):
def teardown(self):
Modified: qpid/proton/trunk/tests/python/proton_tests/messenger.py
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/tests/python/proton_tests/messenger.py?rev=1504407&r1=1504406&r2=1504407&view=diff
==============================================================================
--- qpid/proton/trunk/tests/python/proton_tests/messenger.py (original)
+++ qpid/proton/trunk/tests/python/proton_tests/messenger.py Thu Jul 18 10:13:34 2013
@@ -588,6 +588,7 @@ class NBMessengerTest(common.Test):
self.server.blocking = False
self.server.start()
self.client.start()
+ self.address = "amqp://0.0.0.0:12345"
self.server.subscribe("amqp://~0.0.0.0:12345")
def pump(self):
@@ -600,22 +601,42 @@ class NBMessengerTest(common.Test):
assert self.server.stopped
assert self.client.stopped
- def test(self, count=1):
+ def testSmoke(self, count=1):
self.server.recv()
msg = Message()
- msg.address = "amqp://0.0.0.0:12345"
+ msg.address = self.address
for i in range(count):
msg.body = "Hello %s" % i
self.client.put(msg)
- self.pump()
- assert self.client.outgoing == 0, self.client.outgoing
-
msg2 = Message()
for i in range(count):
+ if self.server.incoming == 0:
+ self.pump()
+ assert self.server.incoming > 0
self.server.get(msg2)
assert msg2.body == "Hello %s" % i, (msg2.body, i)
- def test1024(self):
- self.test(1024)
+ assert self.client.outgoing == 0, self.client.outgoing
+ assert self.server.incoming == 0, self.client.incoming
+
+ def testSmoke1024(self):
+ self.testSmoke(1024)
+
+ def testSmoke4096(self):
+ self.testSmoke(4096)
+
+ def testPushback(self):
+ self.server.recv()
+
+ msg = Message()
+ msg.address = self.address
+ for i in xrange(16):
+ for i in xrange(1024):
+ self.client.put(msg)
+ self.pump()
+ if self.client.outgoing > 0:
+ break
+
+ assert self.client.outgoing > 0
Modified: qpid/proton/trunk/tests/ruby/proton_tests/smoke.rb
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/tests/ruby/proton_tests/smoke.rb?rev=1504407&r1=1504406&r2=1504407&view=diff
==============================================================================
--- qpid/proton/trunk/tests/ruby/proton_tests/smoke.rb (original)
+++ qpid/proton/trunk/tests/ruby/proton_tests/smoke.rb Thu Jul 18 10:13:34 2013
@@ -44,14 +44,18 @@ class SmokeTest < Test::Unit::TestCase
@client.put(msg)
}
- pump()
-
msg2 = Message.new()
count.times {|i|
+ if (@server.incoming == 0) then
+ pump()
+ end
@server.get(msg2)
assert msg2.content == "Hello World! #{i}"
}
+
+ assert @client.outgoing == 0, @client.outgoing
+ assert @server.incoming == 0, @server.incoming
end
end
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org