You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rh...@apache.org on 2013/07/18 12:13:35 UTC

svn commit: r1504407 - in /qpid/proton/trunk: proton-c/src/messenger/ proton-j/proton-api/src/main/java/org/apache/qpid/proton/driver/ proton-j/proton/src/main/java/org/apache/qpid/proton/driver/impl/ proton-j/proton/src/main/java/org/apache/qpid/proto...

Author: rhs
Date: Thu Jul 18 10:13:34 2013
New Revision: 1504407

URL: http://svn.apache.org/r1504407
Log:
Bounded messenger's credit allocation when an unlimited value is passed to recv; added tests for pushback at both messenger and engine level; fixed proton-j engine to respect credit window; fixed java messenger to properly report if work has been done. This addresses PROTON-350 and PROTON-351.

Modified:
    qpid/proton/trunk/proton-c/src/messenger/messenger.c
    qpid/proton/trunk/proton-j/proton-api/src/main/java/org/apache/qpid/proton/driver/Connector.java
    qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/driver/impl/ConnectorImpl.java
    qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/engine/impl/TransportImpl.java
    qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/engine/impl/TransportLink.java
    qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/messenger/impl/MessengerImpl.java
    qpid/proton/trunk/tests/python/proton_tests/engine.py
    qpid/proton/trunk/tests/python/proton_tests/messenger.py
    qpid/proton/trunk/tests/ruby/proton_tests/smoke.rb

Modified: qpid/proton/trunk/proton-c/src/messenger/messenger.c
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/src/messenger/messenger.c?rev=1504407&r1=1504406&r2=1504407&view=diff
==============================================================================
--- qpid/proton/trunk/proton-c/src/messenger/messenger.c (original)
+++ qpid/proton/trunk/proton-c/src/messenger/messenger.c Thu Jul 18 10:13:34 2013
@@ -188,7 +188,7 @@ pn_messenger_t *pn_messenger(const char 
     m->blocking = true;
     m->driver = pn_driver();
     m->receiving = 0;
-    m->credit_batch = 10;
+    m->credit_batch = 1024;
     m->credit = 0;
     m->distributed = 0;
     m->next_tag = 0;
@@ -359,7 +359,7 @@ void pn_messenger_flow(pn_messenger_t *m
   if (link_ct == 0) return;
 
   if (messenger->receiving == -1) {
-    messenger->credit = link_ct * messenger->credit_batch;
+    messenger->credit = link_ct * messenger->credit_batch - pn_messenger_incoming(messenger);
   } else {
     int total = messenger->credit + messenger->distributed;
     if (messenger->receiving > total)

Modified: qpid/proton/trunk/proton-j/proton-api/src/main/java/org/apache/qpid/proton/driver/Connector.java
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-j/proton-api/src/main/java/org/apache/qpid/proton/driver/Connector.java?rev=1504407&r1=1504406&r2=1504407&view=diff
==============================================================================
--- qpid/proton/trunk/proton-j/proton-api/src/main/java/org/apache/qpid/proton/driver/Connector.java (original)
+++ qpid/proton/trunk/proton-j/proton-api/src/main/java/org/apache/qpid/proton/driver/Connector.java Thu Jul 18 10:13:34 2013
@@ -44,7 +44,7 @@ public interface Connector<C>
      * Typically, applications repeatedly invoke this method
      * during the lifetime of a connection.
      */
-    void process() throws IOException;
+    boolean process() throws IOException;
 
     /**
      * Access the listener which opened this connector.

Modified: qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/driver/impl/ConnectorImpl.java
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/driver/impl/ConnectorImpl.java?rev=1504407&r1=1504406&r2=1504407&view=diff
==============================================================================
--- qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/driver/impl/ConnectorImpl.java (original)
+++ qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/driver/impl/ConnectorImpl.java Thu Jul 18 10:13:34 2013
@@ -71,34 +71,45 @@ class ConnectorImpl<C> implements Connec
         _readPending = true;
     }
 
-    public void process() throws IOException
+    public boolean process() throws IOException
     {
+        boolean processed = false;
         if (_channel.isOpen() && _channel.finishConnect())
         {
             if (_readPending)
             {
-                read();
+                if (read()) {
+                    processed = true;
+                }
                 _readPending = false;
-                if (isClosed()) return;
+                if (isClosed()) return processed;
             }
             else
             {
-                processInput();
+                if (processInput() > 0) {
+                    processed = true;
+                }
+            }
+            if (write()) {
+                processed = true;
             }
-            write();
         }
+        return processed;
     }
 
-    private void read() throws IOException
+    private boolean read() throws IOException
     {
+        boolean processed = false;
         int bytesRead = 0;
         while ((bytesRead = _channel.read(_readBuffer)) > 0)
         {
             processInput();
+            processed = true;
         }
         if (bytesRead == -1) {
             close();
         }
+        return processed;
     }
 
     private int processInput() throws IOException
@@ -127,8 +138,9 @@ class ConnectorImpl<C> implements Connec
         return total;
     }
 
-    private void write() throws IOException
+    private boolean write() throws IOException
     {
+        boolean processed = false;
         int interest = _key.interestOps();
         boolean empty = _writeBuffer.position() == 0;
         boolean done = false;
@@ -138,6 +150,9 @@ class ConnectorImpl<C> implements Connec
             _writeBuffer.position(_writeBuffer.position() + produced);
             _writeBuffer.flip();
             int wrote = _channel.write(_writeBuffer);
+            if (wrote > 0) {
+                processed = true;
+            }
             if (_logger.isLoggable(Level.FINE))
             {
                 _logger.log(Level.FINE, this + " wrote " + wrote + " bytes, " + _writeBuffer.remaining() + " remaining");
@@ -163,6 +178,7 @@ class ConnectorImpl<C> implements Connec
         {
             _logger.log(Level.FINE, this + " finished writing output to the channel.");
         }
+        return processed;
     }
 
     public Listener<C> listener()

Modified: qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/engine/impl/TransportImpl.java
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/engine/impl/TransportImpl.java?rev=1504407&r1=1504406&r2=1504407&view=diff
==============================================================================
--- qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/engine/impl/TransportImpl.java (original)
+++ qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/engine/impl/TransportImpl.java Thu Jul 18 10:13:34 2013
@@ -460,7 +460,8 @@ public class TransportImpl extends Endpo
             while(delivery != null && buffer.remaining() >= _maxFrameSize)
             {
                 if((delivery.getLink() instanceof SenderImpl) && !(delivery.isDone() && delivery.getDataLength() == 0)
-                   && delivery.getLink().getSession().getTransportSession().hasOutgoingCredit())
+                   && delivery.getLink().getSession().getTransportSession().hasOutgoingCredit() &&
+                   delivery.getLink().getTransportLink().hasCredit())
                 {
                     SenderImpl sender = (SenderImpl) delivery.getLink();
 
@@ -916,16 +917,18 @@ public class TransportImpl extends Endpo
             _encoder.writeObject(frameBody);
         }
 
+        ByteBuffer originalPayload = null;
+        if( payload!=null )
+        {
+            originalPayload = payload.duplicate();
+        }
+        TransportFrame frame = new TransportFrame(channel, frameBody, Binary.create(originalPayload));
+        log(OUTGOING, frame);
+
         if( _protocolTracer!=null )
         {
-            ByteBuffer originalPayload = null;
-            if( payload!=null )
-            {
-                originalPayload = payload.duplicate();
-            }
-            _protocolTracer.sentFrame(new TransportFrame(channel, frameBody, Binary.create(originalPayload)));
+            _protocolTracer.sentFrame(frame);
         }
-        _engineLogger.outgoingBytes(channel, frameBody, payload);
 
         int payloadSize = Math.min(payload == null ? 0 : payload.remaining(), _maxFrameSize - (buffer.position() - oldPosition));
         if(payloadSize > 0)
@@ -1189,6 +1192,8 @@ public class TransportImpl extends Endpo
             throw new IllegalStateException("Transport cannot accept frame: " + frame);
         }
 
+        log(INCOMING, frame);
+
         if( _protocolTracer != null )
         {
             _protocolTracer.receivedFrame(frame);
@@ -1291,4 +1296,20 @@ public class TransportImpl extends Endpo
     {
         _frameHandler = frameHandler;
     }
+
+    private static String INCOMING = "<-";
+    private static String OUTGOING = "->";
+
+    private void log(String event, TransportFrame frame)
+    {
+        /*StringBuilder msg = new StringBuilder();
+        msg.append("[").append(System.identityHashCode(this)).append(":")
+            .append(frame.getChannel()).append("]");
+        msg.append(" ").append(event).append(" ").append(frame.getBody());
+        if (frame.getPayload() != null) {
+            msg.append(" \"").append(frame.getPayload()).append("\"");
+        }
+        System.out.println(msg.toString());*/
+    }
+
 }

Modified: qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/engine/impl/TransportLink.java
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/engine/impl/TransportLink.java?rev=1504407&r1=1504406&r2=1504407&view=diff
==============================================================================
--- qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/engine/impl/TransportLink.java (original)
+++ qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/engine/impl/TransportLink.java Thu Jul 18 10:13:34 2013
@@ -105,6 +105,11 @@ class TransportLink<T extends LinkImpl>
         _linkCredit = UnsignedInteger.valueOf(_linkCredit.intValue() + credits);
     }
 
+    public boolean hasCredit()
+    {
+        return getLinkCredit().compareTo(UnsignedInteger.ZERO) > 0;
+    }
+
     public T getLink()
     {
         return _link;

Modified: qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/messenger/impl/MessengerImpl.java
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/messenger/impl/MessengerImpl.java?rev=1504407&r1=1504406&r2=1504407&view=diff
==============================================================================
--- qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/messenger/impl/MessengerImpl.java (original)
+++ qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/messenger/impl/MessengerImpl.java Thu Jul 18 10:13:34 2013
@@ -76,9 +76,10 @@ public class MessengerImpl implements Me
     private long _nextTag = 1;
     private byte[] _buffer = new byte[5*1024];
     private Driver _driver;
-    private boolean _unlimitedCredit = false;
-    private static final int _creditBatch = 10;
+    private int _receiving = 0;
+    private static final int _creditBatch = 1024;
     private int _credit;
+    private int _distributed;
     private TrackerQueue _incoming = new TrackerQueue();
     private TrackerQueue _outgoing = new TrackerQueue();
 
@@ -251,17 +252,18 @@ public class MessengerImpl implements Me
         {
             _logger.fine(this + " about to wait for up to " + n + " messages to be received");
         }
-        if (n == -1) {
-            _unlimitedCredit = true;
-        } else {
-            _credit += n;
-            _unlimitedCredit = false;
-        }
+
+        _receiving = n;
         distributeCredit();
 
         waitUntil(_messageAvailable);
     }
 
+    public int receiving()
+    {
+        return _receiving;
+    }
+
     public Message get()
     {
         for (Connector<?> c : _driver.connectors())
@@ -279,6 +281,7 @@ public class MessengerImpl implements Me
                     message.decode(_buffer, 0, size);
                     delivery.getLink().advance();
                     _incoming.add(delivery);
+                    _distributed--;
                     return message;
                 }
                 else
@@ -433,11 +436,14 @@ public class MessengerImpl implements Me
 
     private void processAllConnectors()
     {
+        distributeCredit();
         for (Connector<?> c : _driver.connectors())
         {
             try
             {
-                c.process();
+                if (c.process()) {
+                    _worked = true;
+                }
             }
             catch (IOException e)
             {
@@ -536,6 +542,9 @@ public class MessengerImpl implements Me
             {
                 reclaimCredit(connection);
                 c.destroy();
+                // XXX: could we do this once at the end of the loop
+                // instead of every time we reclaim?
+                distributeCredit();
             }
             else
             {
@@ -622,6 +631,7 @@ public class MessengerImpl implements Me
     private void reclaimCredit(int credit)
     {
         _credit += credit;
+        _distributed -= credit;
     }
 
     private void distributeCredit()
@@ -639,9 +649,13 @@ public class MessengerImpl implements Me
 
         if (linkCt == 0) return;
 
-        if (_unlimitedCredit)
+        if (_receiving < 0)
         {
-            _credit = linkCt * _creditBatch;
+            _credit = linkCt * _creditBatch - incoming();
+        } else {
+            int total = _credit + _distributed;
+            if (_receiving > total)
+                _credit += _receiving - total;
         }
 
         int batch = (_credit < linkCt) ? 1 : (_credit/linkCt);
@@ -659,6 +673,7 @@ public class MessengerImpl implements Me
                         int amount = (_credit < need) ? _credit : need;
                         ((Receiver) link).flow(amount);
                         _credit -= amount;
+                        _distributed += amount;
                         if (_credit == 0) return;
                     }
                 }

Modified: qpid/proton/trunk/tests/python/proton_tests/engine.py
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/tests/python/proton_tests/engine.py?rev=1504407&r1=1504406&r2=1504407&view=diff
==============================================================================
--- qpid/proton/trunk/tests/python/proton_tests/engine.py (original)
+++ qpid/proton/trunk/tests/python/proton_tests/engine.py Thu Jul 18 10:13:34 2013
@@ -1157,6 +1157,33 @@ class CreditTest(Test):
     assert self.rcv.credit == 0
     assert self.rcv.queued == 0
 
+  def testPushback(self, count=10):
+    assert self.snd.credit == 0
+    assert self.rcv.credit == 0
+
+    self.rcv.flow(count)
+    self.pump()
+
+    for i in range(count):
+      d = self.snd.delivery("tag%s" % i)
+      assert d
+      self.snd.advance()
+
+    assert self.snd.queued == count
+    assert self.rcv.queued == 0
+    self.pump()
+    assert self.snd.queued == 0
+    assert self.rcv.queued == count
+
+    d = self.snd.delivery("extra")
+    self.snd.advance()
+
+    assert self.snd.queued == 1
+    assert self.rcv.queued == count
+    self.pump()
+    assert self.snd.queued == 1
+    assert self.rcv.queued == count
+
 class SessionCreditTest(Test):
 
   def teardown(self):

Modified: qpid/proton/trunk/tests/python/proton_tests/messenger.py
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/tests/python/proton_tests/messenger.py?rev=1504407&r1=1504406&r2=1504407&view=diff
==============================================================================
--- qpid/proton/trunk/tests/python/proton_tests/messenger.py (original)
+++ qpid/proton/trunk/tests/python/proton_tests/messenger.py Thu Jul 18 10:13:34 2013
@@ -588,6 +588,7 @@ class NBMessengerTest(common.Test):
     self.server.blocking = False
     self.server.start()
     self.client.start()
+    self.address = "amqp://0.0.0.0:12345"
     self.server.subscribe("amqp://~0.0.0.0:12345")
 
   def pump(self):
@@ -600,22 +601,42 @@ class NBMessengerTest(common.Test):
     assert self.server.stopped
     assert self.client.stopped
 
-  def test(self, count=1):
+  def testSmoke(self, count=1):
     self.server.recv()
 
     msg = Message()
-    msg.address = "amqp://0.0.0.0:12345"
+    msg.address = self.address
     for i in range(count):
       msg.body = "Hello %s" % i
       self.client.put(msg)
 
-    self.pump()
-    assert self.client.outgoing == 0, self.client.outgoing
-
     msg2 = Message()
     for i in range(count):
+      if self.server.incoming == 0:
+        self.pump()
+      assert self.server.incoming > 0
       self.server.get(msg2)
       assert msg2.body == "Hello %s" % i, (msg2.body, i)
 
-  def test1024(self):
-    self.test(1024)
+    assert self.client.outgoing == 0, self.client.outgoing
+    assert self.server.incoming == 0, self.client.incoming
+
+  def testSmoke1024(self):
+    self.testSmoke(1024)
+
+  def testSmoke4096(self):
+    self.testSmoke(4096)
+
+  def testPushback(self):
+    self.server.recv()
+
+    msg = Message()
+    msg.address = self.address
+    for i in xrange(16):
+      for i in xrange(1024):
+        self.client.put(msg)
+      self.pump()
+      if self.client.outgoing > 0:
+        break
+
+    assert self.client.outgoing > 0

Modified: qpid/proton/trunk/tests/ruby/proton_tests/smoke.rb
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/tests/ruby/proton_tests/smoke.rb?rev=1504407&r1=1504406&r2=1504407&view=diff
==============================================================================
--- qpid/proton/trunk/tests/ruby/proton_tests/smoke.rb (original)
+++ qpid/proton/trunk/tests/ruby/proton_tests/smoke.rb Thu Jul 18 10:13:34 2013
@@ -44,14 +44,18 @@ class SmokeTest < Test::Unit::TestCase
       @client.put(msg)
     }
 
-    pump()
-
     msg2 = Message.new()
 
     count.times {|i|
+      if (@server.incoming == 0) then
+        pump()
+      end
       @server.get(msg2)
       assert msg2.content == "Hello World! #{i}"
     }
+
+    assert @client.outgoing == 0, @client.outgoing
+    assert @server.incoming == 0, @server.incoming
   end
 
 end



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org