You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by kg...@apache.org on 2013/12/03 20:16:12 UTC

svn commit: r1547534 - in /qpid/proton/trunk: proton-c/bindings/java/src/main/java/org/apache/qpid/proton/engine/jni/ proton-c/bindings/java/src/main/java/org/apache/qpid/proton/messenger/jni/ proton-c/include/proton/ proton-c/src/messenger/ proton-j/p...

Author: kgiusti
Date: Tue Dec  3 19:16:11 2013
New Revision: 1547534

URL: http://svn.apache.org/r1547534
Log:
PROTON-200: Java implementation of credit scheduler

Modified:
    qpid/proton/trunk/proton-c/bindings/java/src/main/java/org/apache/qpid/proton/engine/jni/JNIDelivery.java
    qpid/proton/trunk/proton-c/bindings/java/src/main/java/org/apache/qpid/proton/engine/jni/JNILink.java
    qpid/proton/trunk/proton-c/bindings/java/src/main/java/org/apache/qpid/proton/engine/jni/JNIReceiver.java
    qpid/proton/trunk/proton-c/bindings/java/src/main/java/org/apache/qpid/proton/messenger/jni/JNIMessenger.java
    qpid/proton/trunk/proton-c/include/proton/messenger.h
    qpid/proton/trunk/proton-c/src/messenger/messenger.c
    qpid/proton/trunk/proton-j/proton-api/src/main/java/org/apache/qpid/proton/driver/Driver.java
    qpid/proton/trunk/proton-j/proton-api/src/main/java/org/apache/qpid/proton/engine/Delivery.java
    qpid/proton/trunk/proton-j/proton-api/src/main/java/org/apache/qpid/proton/engine/Link.java
    qpid/proton/trunk/proton-j/proton-api/src/main/java/org/apache/qpid/proton/engine/Receiver.java
    qpid/proton/trunk/proton-j/proton-api/src/main/java/org/apache/qpid/proton/messenger/Messenger.java
    qpid/proton/trunk/proton-j/proton-api/src/main/java/org/apache/qpid/proton/messenger/Status.java
    qpid/proton/trunk/proton-j/proton-api/src/main/resources/proton.py
    qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/driver/impl/DriverImpl.java
    qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/engine/impl/DeliveryImpl.java
    qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/engine/impl/LinkImpl.java
    qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/engine/impl/ReceiverImpl.java
    qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/messenger/impl/MessengerImpl.java
    qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/messenger/impl/StoreEntry.java
    qpid/proton/trunk/tests/python/proton_tests/messenger.py

Modified: qpid/proton/trunk/proton-c/bindings/java/src/main/java/org/apache/qpid/proton/engine/jni/JNIDelivery.java
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/bindings/java/src/main/java/org/apache/qpid/proton/engine/jni/JNIDelivery.java?rev=1547534&r1=1547533&r2=1547534&view=diff
==============================================================================
--- qpid/proton/trunk/proton-c/bindings/java/src/main/java/org/apache/qpid/proton/engine/jni/JNIDelivery.java (original)
+++ qpid/proton/trunk/proton-c/bindings/java/src/main/java/org/apache/qpid/proton/engine/jni/JNIDelivery.java Tue Dec  3 19:16:11 2013
@@ -266,4 +266,9 @@ public class JNIDelivery implements Deli
         return (int) Proton.pn_delivery_pending(_impl);
     }
 
+    @ProtonCEquivalent("pn_delivery_buffered")
+    public boolean isBuffered()
+    {
+        return Proton.pn_delivery_buffered(_impl);
+    }
 }

Modified: qpid/proton/trunk/proton-c/bindings/java/src/main/java/org/apache/qpid/proton/engine/jni/JNILink.java
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/bindings/java/src/main/java/org/apache/qpid/proton/engine/jni/JNILink.java?rev=1547534&r1=1547533&r2=1547534&view=diff
==============================================================================
--- qpid/proton/trunk/proton-c/bindings/java/src/main/java/org/apache/qpid/proton/engine/jni/JNILink.java (original)
+++ qpid/proton/trunk/proton-c/bindings/java/src/main/java/org/apache/qpid/proton/engine/jni/JNILink.java Tue Dec  3 19:16:11 2013
@@ -538,4 +538,17 @@ abstract class JNILink implements Link
         return Proton.pn_link_drained(getImpl());
     }
 
+    @Override
+    @ProtonCEquivalent("pn_link_remote_credit")
+    public int getRemoteCredit()
+    {
+        return Proton.pn_link_remote_credit(getImpl());
+    }
+
+    @Override
+    @ProtonCEquivalent("pn_link_get_drain")
+    public boolean getDrain()
+    {
+        return Proton.pn_link_get_drain(getImpl());
+    }
 }

Modified: qpid/proton/trunk/proton-c/bindings/java/src/main/java/org/apache/qpid/proton/engine/jni/JNIReceiver.java
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/bindings/java/src/main/java/org/apache/qpid/proton/engine/jni/JNIReceiver.java?rev=1547534&r1=1547533&r2=1547534&view=diff
==============================================================================
--- qpid/proton/trunk/proton-c/bindings/java/src/main/java/org/apache/qpid/proton/engine/jni/JNIReceiver.java (original)
+++ qpid/proton/trunk/proton-c/bindings/java/src/main/java/org/apache/qpid/proton/engine/jni/JNIReceiver.java Tue Dec  3 19:16:11 2013
@@ -62,4 +62,10 @@ public class JNIReceiver extends JNILink
         return Proton.pn_link_draining(getImpl());
     }
 
+    @Override
+    @ProtonCEquivalent("pn_link_set_drain")
+    public void setDrain(boolean drain)
+    {
+        Proton.pn_link_set_drain(getImpl(), drain);
+    }
 }

Modified: qpid/proton/trunk/proton-c/bindings/java/src/main/java/org/apache/qpid/proton/messenger/jni/JNIMessenger.java
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/bindings/java/src/main/java/org/apache/qpid/proton/messenger/jni/JNIMessenger.java?rev=1547534&r1=1547533&r2=1547534&view=diff
==============================================================================
--- qpid/proton/trunk/proton-c/bindings/java/src/main/java/org/apache/qpid/proton/messenger/jni/JNIMessenger.java (original)
+++ qpid/proton/trunk/proton-c/bindings/java/src/main/java/org/apache/qpid/proton/messenger/jni/JNIMessenger.java Tue Dec  3 19:16:11 2013
@@ -113,6 +113,12 @@ class JNIMessenger implements Messenger
     }
 
     @Override
+    public int receiving()
+    {
+        return Proton.pn_messenger_receiving(_impl);
+    }
+
+    @Override
     public Message get()
     {
         SWIGTYPE_p_pn_message_t msg = Proton.pn_message();
@@ -295,7 +301,7 @@ class JNIMessenger implements Messenger
 
     private void check(int errorCode) throws ProtonException
     {
-        if(errorCode != 0 && errorCode != Proton.PN_INPROGRESS)
+        if(errorCode < 0 && errorCode != Proton.PN_INPROGRESS)
         {
             String errorMessage = Proton.pn_error_text(Proton.pn_messenger_error(_impl));
             if(errorCode == Proton.PN_TIMEOUT)

Modified: qpid/proton/trunk/proton-c/include/proton/messenger.h
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/include/proton/messenger.h?rev=1547534&r1=1547533&r2=1547534&view=diff
==============================================================================
--- qpid/proton/trunk/proton-c/include/proton/messenger.h (original)
+++ qpid/proton/trunk/proton-c/include/proton/messenger.h Tue Dec  3 19:16:11 2013
@@ -365,7 +365,10 @@ PN_EXTERN pn_tracker_t pn_messenger_outg
  * This will block for the indicated timeout.
  *
  * @param[in] messenger the Messenger
- * @param[in] timeout the maximum time to block
+ * @param[in] timeout the maximum time to block in milliseconds, -1 ==
+ * forever, 0 == do not block
+ *
+ * @return 0 if no work to do, < 0 if error, or 1 if work was done.
  */
 PN_EXTERN int pn_messenger_work(pn_messenger_t *messenger, int timeout);
 

Modified: qpid/proton/trunk/proton-c/src/messenger/messenger.c
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/src/messenger/messenger.c?rev=1547534&r1=1547533&r2=1547534&view=diff
==============================================================================
--- qpid/proton/trunk/proton-c/src/messenger/messenger.c (original)
+++ qpid/proton/trunk/proton-c/src/messenger/messenger.c Tue Dec  3 19:16:11 2013
@@ -1512,7 +1512,9 @@ static bool work_pred(pn_messenger_t *me
 int pn_messenger_work(pn_messenger_t *messenger, int timeout)
 {
   messenger->worked = false;
-  return pn_messenger_tsync(messenger, work_pred, timeout);
+  int err = pn_messenger_tsync(messenger, work_pred, timeout);
+  if (err) return err;
+  return (int) (messenger->worked ? 1 : 0);
 }
 
 int pni_messenger_work(pn_messenger_t *messenger)

Modified: qpid/proton/trunk/proton-j/proton-api/src/main/java/org/apache/qpid/proton/driver/Driver.java
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-j/proton-api/src/main/java/org/apache/qpid/proton/driver/Driver.java?rev=1547534&r1=1547533&r2=1547534&view=diff
==============================================================================
--- qpid/proton/trunk/proton-j/proton-api/src/main/java/org/apache/qpid/proton/driver/Driver.java (original)
+++ qpid/proton/trunk/proton-j/proton-api/src/main/java/org/apache/qpid/proton/driver/Driver.java Tue Dec  3 19:16:11 2013
@@ -55,7 +55,7 @@ public interface Driver
      *
      * Thread-safe.
      *
-     * @param timeout maximum time in milliseconds to wait. 0 means wait indefinitely.
+     * @param timeout maximum time in milliseconds to wait. -1 means wait indefinitely.
      *
      * @param returns true if woken up
      */

Modified: qpid/proton/trunk/proton-j/proton-api/src/main/java/org/apache/qpid/proton/engine/Delivery.java
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-j/proton-api/src/main/java/org/apache/qpid/proton/engine/Delivery.java?rev=1547534&r1=1547533&r2=1547534&view=diff
==============================================================================
--- qpid/proton/trunk/proton-j/proton-api/src/main/java/org/apache/qpid/proton/engine/Delivery.java (original)
+++ qpid/proton/trunk/proton-j/proton-api/src/main/java/org/apache/qpid/proton/engine/Delivery.java Tue Dec  3 19:16:11 2013
@@ -104,4 +104,6 @@ public interface Delivery
 
     public int pending();
 
+    public boolean isBuffered();
+
 }

Modified: qpid/proton/trunk/proton-j/proton-api/src/main/java/org/apache/qpid/proton/engine/Link.java
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-j/proton-api/src/main/java/org/apache/qpid/proton/engine/Link.java?rev=1547534&r1=1547533&r2=1547534&view=diff
==============================================================================
--- qpid/proton/trunk/proton-j/proton-api/src/main/java/org/apache/qpid/proton/engine/Link.java (original)
+++ qpid/proton/trunk/proton-j/proton-api/src/main/java/org/apache/qpid/proton/engine/Link.java Tue Dec  3 19:16:11 2013
@@ -182,4 +182,6 @@ public interface Link extends Endpoint
 
     public int drained();
 
+    public int getRemoteCredit();
+    public boolean getDrain();
 }

Modified: qpid/proton/trunk/proton-j/proton-api/src/main/java/org/apache/qpid/proton/engine/Receiver.java
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-j/proton-api/src/main/java/org/apache/qpid/proton/engine/Receiver.java?rev=1547534&r1=1547533&r2=1547534&view=diff
==============================================================================
--- qpid/proton/trunk/proton-j/proton-api/src/main/java/org/apache/qpid/proton/engine/Receiver.java (original)
+++ qpid/proton/trunk/proton-j/proton-api/src/main/java/org/apache/qpid/proton/engine/Receiver.java Tue Dec  3 19:16:11 2013
@@ -71,4 +71,6 @@ public interface Receiver extends Link
 
     public boolean draining();
 
+    public void setDrain(boolean drain);
+
 }

Modified: qpid/proton/trunk/proton-j/proton-api/src/main/java/org/apache/qpid/proton/messenger/Messenger.java
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-j/proton-api/src/main/java/org/apache/qpid/proton/messenger/Messenger.java?rev=1547534&r1=1547533&r2=1547534&view=diff
==============================================================================
--- qpid/proton/trunk/proton-j/proton-api/src/main/java/org/apache/qpid/proton/messenger/Messenger.java (original)
+++ qpid/proton/trunk/proton-j/proton-api/src/main/java/org/apache/qpid/proton/messenger/Messenger.java Tue Dec  3 19:16:11 2013
@@ -118,6 +118,14 @@ public interface Messenger
      */
     void recv(int count) throws TimeoutException;
     /**
+     * Returns the capacity of the incoming message queue of
+     * messenger. Note this count does not include those messages
+     * already available on the incoming queue (see
+     * incoming()). Rather it returns the number of incoming queue
+     * entries available for receiving messages
+     */
+    int receiving();
+    /**
      * Returns the message from the head of the incoming message
      * queue.
      */
@@ -142,7 +150,12 @@ public interface Messenger
 
     boolean stopped();
 
-    boolean work(long timeout);
+    /** Sends or receives any outstanding messages queued for a
+     * messenger.  If timeout is zero, no blocking is done.  A timeout
+     * of -1 blocks forever, otherwise timeout is the maximum time (in
+     * millisecs) to block.  Returns True if work was performed.
+     */
+    boolean work(long timeout) throws TimeoutException;
 
     void interrupt();
 

Modified: qpid/proton/trunk/proton-j/proton-api/src/main/java/org/apache/qpid/proton/messenger/Status.java
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-j/proton-api/src/main/java/org/apache/qpid/proton/messenger/Status.java?rev=1547534&r1=1547533&r2=1547534&view=diff
==============================================================================
--- qpid/proton/trunk/proton-j/proton-api/src/main/java/org/apache/qpid/proton/messenger/Status.java (original)
+++ qpid/proton/trunk/proton-j/proton-api/src/main/java/org/apache/qpid/proton/messenger/Status.java Tue Dec  3 19:16:11 2013
@@ -26,5 +26,6 @@ public enum Status
     PENDING,
     ACCEPTED,
     REJECTED,
-    MODIFIED
+    MODIFIED,
+    ABORTED
 }

Modified: qpid/proton/trunk/proton-j/proton-api/src/main/resources/proton.py
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-j/proton-api/src/main/resources/proton.py?rev=1547534&r1=1547533&r2=1547534&view=diff
==============================================================================
--- qpid/proton/trunk/proton-j/proton-api/src/main/resources/proton.py (original)
+++ qpid/proton/trunk/proton-j/proton-api/src/main/resources/proton.py Tue Dec  3 19:16:11 2013
@@ -25,6 +25,7 @@ except NameError:
 
 from org.apache.qpid.proton import Proton, ProtonUnsupportedOperationException
 from org.apache.qpid.proton import InterruptException as Interrupt
+from org.apache.qpid.proton import TimeoutException as Timeout
 from org.apache.qpid.proton.engine import \
     Transport as JTransport, Sender as JSender, Receiver as JReceiver, \
     Sasl, SslDomain as JSslDomain, \
@@ -42,7 +43,6 @@ from org.apache.qpid.proton.amqp import 
     Decimal32, Decimal64, Decimal128
 from jarray import zeros, array
 from java.util import EnumSet, UUID as JUUID, Date as JDate, HashMap
-from java.util.concurrent import TimeoutException as Timeout
 from java.nio import ByteBuffer
 from java.lang import Character as JCharacter, String as JString, Integer as JInteger
 from java.lang import NoClassDefFoundError
@@ -1321,12 +1321,20 @@ class Messenger(object):
   def recv(self, n=-1):
     self.impl.recv(n)
 
+  @property
+  def receiving(self):
+    return self.impl.receiving()
+
   def work(self, timeout=None):
     if timeout is None:
       t = -1
     else:
       t = long(1000*timeout)
-    return self.impl.work(t)
+    try:
+        err = self.impl.work(t)
+    except Timeout, e:
+        return False
+    return err
 
   def interrupt(self):
     self.impl.interrupt()

Modified: qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/driver/impl/DriverImpl.java
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/driver/impl/DriverImpl.java?rev=1547534&r1=1547533&r2=1547534&view=diff
==============================================================================
--- qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/driver/impl/DriverImpl.java (original)
+++ qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/driver/impl/DriverImpl.java Tue Dec  3 19:16:11 2013
@@ -23,6 +23,7 @@ package org.apache.qpid.proton.driver.im
 import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.net.ServerSocket;
+import java.net.StandardSocketOptions;
 import java.nio.channels.ClosedChannelException;
 import java.nio.channels.SelectableChannel;
 import java.nio.channels.SelectionKey;
@@ -192,6 +193,8 @@ public class DriverImpl implements Drive
         {
             SocketChannel channel = SocketChannel.open();
             channel.configureBlocking(false);
+            // Disable the Nagle algorithm on TCP connections.
+            channel.setOption(StandardSocketOptions.TCP_NODELAY, true);
             channel.connect(new InetSocketAddress(host, port));
             return createConnector(channel, context);
         }

Modified: qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/engine/impl/DeliveryImpl.java
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/engine/impl/DeliveryImpl.java?rev=1547534&r1=1547533&r2=1547534&view=diff
==============================================================================
--- qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/engine/impl/DeliveryImpl.java (original)
+++ qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/engine/impl/DeliveryImpl.java Tue Dec  3 19:16:11 2013
@@ -373,7 +373,7 @@ public class DeliveryImpl implements Del
         _updated = true;
     }
 
-    boolean isBuffered()
+    public boolean isBuffered()
     {
         if (_remoteSettled) return false;
         if (getLink() instanceof SenderImpl) {

Modified: qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/engine/impl/LinkImpl.java
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/engine/impl/LinkImpl.java?rev=1547534&r1=1547533&r2=1547534&view=diff
==============================================================================
--- qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/engine/impl/LinkImpl.java (original)
+++ qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/engine/impl/LinkImpl.java Tue Dec  3 19:16:11 2013
@@ -55,7 +55,6 @@ public abstract class LinkImpl extends E
     private final LinkNode<LinkImpl> _node;
     private boolean _drain;
 
-
     LinkImpl(SessionImpl session, String name)
     {
         _session = session;
@@ -296,7 +295,7 @@ public abstract class LinkImpl extends E
         _drain = drain;
     }
 
-    boolean getDrain()
+    public boolean getDrain()
     {
         return _drain;
     }
@@ -378,4 +377,8 @@ public abstract class LinkImpl extends E
         _drained = value;
     }
 
+    public int getRemoteCredit()
+    {
+        return _credit - _queued;
+    }
 }

Modified: qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/engine/impl/ReceiverImpl.java
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/engine/impl/ReceiverImpl.java?rev=1547534&r1=1547533&r2=1547534&view=diff
==============================================================================
--- qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/engine/impl/ReceiverImpl.java (original)
+++ qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/engine/impl/ReceiverImpl.java Tue Dec  3 19:16:11 2013
@@ -27,6 +27,7 @@ import org.apache.qpid.proton.engine.Rec
 
 public class ReceiverImpl extends LinkImpl implements Receiver
 {
+    private boolean _drainFlagMode = true;
 
     @Override
     public boolean advance()
@@ -61,10 +62,14 @@ public class ReceiverImpl extends LinkIm
 
     public void flow(final int credits)
     {
-        modified();
         addCredit(credits);
-        setDrain(false);
         _unsentCredits += credits;
+        modified();
+        if (!_drainFlagMode)
+        {
+            setDrain(false);
+            _drainFlagMode = false;
+        }
     }
 
     int clearUnsentCredits()
@@ -122,8 +127,9 @@ public class ReceiverImpl extends LinkIm
 
     public void drain(int credit)
     {
-        flow(credit);
         setDrain(true);
+        flow(credit);
+        _drainFlagMode = false;
     }
 
     public boolean draining()
@@ -131,4 +137,11 @@ public class ReceiverImpl extends LinkIm
         return getDrain() && (getCredit() > getQueued());
     }
 
+    @Override
+    public void setDrain(boolean drain)
+    {
+        super.setDrain(drain);
+        modified();
+        _drainFlagMode = true;
+    }
 }

Modified: qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/messenger/impl/MessengerImpl.java
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/messenger/impl/MessengerImpl.java?rev=1547534&r1=1547533&r2=1547534&view=diff
==============================================================================
--- qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/messenger/impl/MessengerImpl.java (original)
+++ qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/messenger/impl/MessengerImpl.java Tue Dec  3 19:16:11 2013
@@ -61,6 +61,12 @@ import org.apache.qpid.proton.amqp.Binar
 
 public class MessengerImpl implements Messenger
 {
+    private enum LinkCreditMode
+    {
+        // method for replenishing credit
+        LINK_CREDIT_EXPLICIT,   // recv(N)
+        LINK_CREDIT_AUTO;       // recv()
+    }
 
     private static final EnumSet<EndpointState> UNINIT = EnumSet.of(EndpointState.UNINITIALIZED);
     private static final EnumSet<EndpointState> ACTIVE = EnumSet.of(EndpointState.ACTIVE);
@@ -73,10 +79,15 @@ public class MessengerImpl implements Me
     private boolean _blocking = true;
     private long _nextTag = 1;
     private Driver _driver;
-    private int _receiving = 0;
-    private static final int _creditBatch = 1024;
-    private int _credit;
-    private int _distributed;
+    private LinkCreditMode _credit_mode = LinkCreditMode.LINK_CREDIT_EXPLICIT;
+    private final int _credit_batch = 1024;   // credit_mode == LINK_CREDIT_AUTO
+    private int _credit;        // available
+    private int _distributed;    // outstanding credit
+    private int _receivers;      // total # receiver Links
+    private int _draining;       // # Links in drain state
+    private List<Receiver> _credited = new ArrayList<Receiver>();
+    private List<Receiver> _blocked = new ArrayList<Receiver>();
+    private long _next_drain;
     private TrackerImpl _incomingTracker;
     private TrackerImpl _outgoingTracker;
     private Store _incomingStore = new Store();
@@ -165,7 +176,7 @@ public class MessengerImpl implements Me
         return _allClosed.test();
     }
 
-    public boolean work(long timeout)
+    public boolean work(long timeout) throws TimeoutException
     {
         if (_driver == null) { return false; }
         _worked = false;
@@ -285,10 +296,38 @@ public class MessengerImpl implements Me
         pumpOut(m.getAddress(), sender);
     }
 
+    private void reclaimLink(Link link)
+    {
+        if (link instanceof Receiver)
+        {
+            int credit = link.getCredit();
+            if (credit > 0)
+            {
+                _credit += credit;
+                _distributed -= credit;
+            }
+        }
+
+        Iterator<Delivery> dIter = link.unsettled();
+        while (dIter != null && dIter.hasNext())
+        {
+            Delivery delivery = (Delivery) dIter.next();
+            StoreEntry entry = (StoreEntry) delivery.getContext();
+            if (entry != null)
+            {
+                entry.setDelivery(null);
+                if (delivery.isBuffered())
+                    entry.setStatus(Status.ABORTED);
+            }
+        }
+        linkRemoved(link);
+    }
+
     private int pumpOut( String address, Sender sender )
     {
         StoreEntry entry = _outgoingStore.get( address );
         if (entry == null) {
+            sender.drained();
             return 0;
         }
 
@@ -342,12 +381,24 @@ public class MessengerImpl implements Me
             throw new IllegalStateException("cannot recv while messenger is stopped");
         }
 
-        if(_logger.isLoggable(Level.FINE))
+        if (_logger.isLoggable(Level.FINE) && n != -1)
         {
             _logger.fine(this + " about to wait for up to " + n + " messages to be received");
         }
 
-        _receiving = n;
+        if (n == -1)
+        {
+            _credit_mode = LinkCreditMode.LINK_CREDIT_AUTO;
+        }
+        else
+        {
+            _credit_mode = LinkCreditMode.LINK_CREDIT_EXPLICIT;
+            if (n > _distributed)
+                _credit = n - _distributed;
+            else        // cancel unallocated
+                _credit = 0;
+        }
+
         distributeCredit();
 
         waitUntil(_messageAvailable);
@@ -360,7 +411,7 @@ public class MessengerImpl implements Me
 
     public int receiving()
     {
-        return _receiving;
+        return _credit + _distributed;
     }
 
     public Message get()
@@ -375,10 +426,8 @@ public class MessengerImpl implements Me
                                                _incomingStore.trackEntry(entry));
 
             _incomingStore.freeEntry( entry );
-            _distributed--;
             return message;
         }
-
         return null;
     }
 
@@ -400,6 +449,36 @@ public class MessengerImpl implements Me
             }
             entry.setEncodedMsg( buffer, size );
             receiver.advance();
+
+            // account for the used credit, replenish if
+            // low (< 20% maximum per-link batch) and
+            // extra credit available
+            assert(_distributed > 0);
+            _distributed--;
+            if (!receiver.getDrain() && _blocked.isEmpty() && _credit > 0)
+            {
+                final int max = perLinkCredit();
+                final int lo_thresh = (int)(max * 0.2 + 0.5);
+                if (receiver.getRemoteCredit() < lo_thresh)
+                {
+                    final int more = Math.min(_credit, max - receiver.getRemoteCredit());
+                    _credit -= more;
+                    _distributed += more;
+                    receiver.flow(more);
+                }
+            }
+            // check if blocked
+            if (receiver.getRemoteCredit() == 0 && _credited.contains(receiver))
+            {
+                _credited.remove(receiver);
+                if (receiver.getDrain())
+                {
+                    receiver.setDrain(false);
+                    assert( _draining > 0 );
+                    _draining--;
+                }
+                _blocked.add(receiver);
+            }
         }
         return 0;
     }
@@ -423,7 +502,8 @@ public class MessengerImpl implements Me
             {
                 _logger.fine(this + " about to subscribe to source " + source + " using address " + hostName + ":" + port);
             }
-            _driver.createListener(hostName, port, null);
+            ListenerContext ctx = new ListenerContext( address.getScheme(), hostName, ports );
+            _driver.createListener(hostName, port, ctx);
         }
         else
         {
@@ -585,6 +665,8 @@ public class MessengerImpl implements Me
             Connector<?> c = l.accept();
             Connection connection = Proton.connection();
             connection.setContainer(_name);
+            ListenerContext ctx = (ListenerContext) l.getContext();
+            connection.setContext(new ConnectionContext(ctx.getService(), c));
             c.setConnection(connection);
             //TODO: SSL and full SASL
             Sasl sasl = c.sasl();
@@ -596,18 +678,10 @@ public class MessengerImpl implements Me
             }
             connection.open();
         }
-        //process active connectors, handling opened & closed connections as needed
+        // process connectors, reclaiming credit on closed connectors
         for (Connector<?> c = _driver.connector(); c != null; c = _driver.connector())
         {
             _worked = true;
-            _logger.log(Level.FINE, "Processing active connector " + c);
-            try
-            {
-                c.process();
-            } catch (IOException e) {
-                _logger.log(Level.SEVERE, "Error processing connection", e);
-            }
-            processEndpoints(c);
             if (c.isClosed())
             {
                 _awaitingDestruction.add(c);
@@ -615,9 +689,12 @@ public class MessengerImpl implements Me
             }
             else
             {
+                _logger.log(Level.FINE, "Processing active connector " + c);
                 try
                 {
                     c.process();
+                    processEndpoints(c);
+                    c.process();
                 }
                 catch (IOException e)
                 {
@@ -672,6 +749,7 @@ public class MessengerImpl implements Me
             //TODO: the following is not correct; should only copy those properties that we understand
             link.setSource(link.getRemoteSource());
             link.setTarget(link.getRemoteTarget());
+            linkAdded(link);
             link.open();
             _logger.log(Level.FINE, "Opened link " + link);
         }
@@ -686,14 +764,23 @@ public class MessengerImpl implements Me
             }
         }
 
-        for (Link link : new Links(connection, ACTIVE, CLOSED))
-        {
-            link.close();
-        }
         for (Session session : new Sessions(connection, ACTIVE, CLOSED))
         {
             session.close();
         }
+
+        for (Link link : new Links(connection, ANY, CLOSED))
+        {
+            if (link.getLocalState() == EndpointState.ACTIVE)
+            {
+                link.close();
+            }
+            else
+            {
+                reclaimLink(link);
+            }
+        }
+
         if (connection.getRemoteState() == EndpointState.CLOSED)
         {
             if (connection.getLocalState() == EndpointState.ACTIVE)
@@ -729,7 +816,7 @@ public class MessengerImpl implements Me
 
         // wait until timeout expires or until test is true
         long now = System.currentTimeMillis();
-        long deadline = timeout < 0 ? Long.MAX_VALUE : now + timeout;
+        final long deadline = timeout < 0 ? Long.MAX_VALUE : now + timeout;
         boolean done = false;
 
         while (true)
@@ -737,21 +824,31 @@ public class MessengerImpl implements Me
             done = condition.test();
             if (done) break;
 
-            boolean woken;
-            if ( timeout >= 0 ) {
-                long remaining = deadline - now;
+            long remaining;
+            if (timeout < 0)
+                remaining = -1;
+            else {
+                remaining = deadline - now;
                 if (remaining < 0) break;
-                woken = _driver.doWait(remaining);
-            } else {
-                woken = _driver.doWait(-1);
             }
+
+            // Update the credit scheduler. If the scheduler detects
+            // credit imbalance on the links, wake up in time to
+            // service credit drain
+            distributeCredit();
+            if (_next_drain != 0)
+            {
+                long wakeup = (_next_drain > now) ? _next_drain - now : 0;
+                remaining = (remaining == -1) ? wakeup : Math.min(remaining, wakeup);
+            }
+
+            boolean woken;
+            woken = _driver.doWait(remaining);
             processActive();
             if (woken) {
                 throw new InterruptException();
             }
-            if (timeout >= 0) {
-                now = System.currentTimeMillis();
-            }
+            now = System.currentTimeMillis();
         }
 
         return done;
@@ -762,7 +859,8 @@ public class MessengerImpl implements Me
         for (Connector<?> c : _driver.connectors())
         {
             Connection connection = c.getConnection();
-            if (host.equals(connection.getRemoteContainer()) || service.equals(connection.getContext()))
+            ConnectionContext ctx = (ConnectionContext) connection.getContext();
+            if (host.equals(connection.getRemoteContainer()) || service.equals(ctx.getService()))
             {
                 return connection;
             }
@@ -774,62 +872,108 @@ public class MessengerImpl implements Me
     {
         for (Link link : new Links(connection, ANY, ANY))
         {
-            if (link instanceof Receiver && link.getCredit() > 0)
-            {
-                reclaimCredit(link.getCredit());
-            }
+            reclaimLink(link);
         }
     }
 
-    private void reclaimCredit(int credit)
-    {
-        _credit += credit;
-        _distributed -= credit;
-    }
-
     private void distributeCredit()
     {
-        int linkCt = 0;
-        // @todo track the number of opened receive links
-        for (Connector<?> c : _driver.connectors())
+        if (_receivers == 0) return;
+
+        if (_credit_mode == LinkCreditMode.LINK_CREDIT_AUTO)
         {
-            if (c.isClosed()) continue;
-            Connection connection = c.getConnection();
-            for (Link link : new Links(connection, ACTIVE, ANY))
+            // replenish, but limit the max total messages buffered
+            final int max = _receivers * _credit_batch;
+            final int used = _distributed + incoming();
+            if (max > used)
+                _credit = max - used;
+        }
+
+        // reclaim any credit left over after draining links has completed
+        if (_draining > 0)
+        {
+            Iterator<Receiver> itr = _credited.iterator();
+            while (itr.hasNext())
             {
-                if (link instanceof Receiver) linkCt++;
+                Receiver link = (Receiver) itr.next();
+                if (link.getDrain())
+                {
+                    if (!link.draining())
+                    {
+                        // drain completed for this link
+                        int drained = link.drained();
+                        assert(_distributed >= drained);
+                        _distributed -= drained;
+                        _credit += drained;
+                        link.setDrain(false);
+                        _draining--;
+                        itr.remove();
+                        _blocked.add(link);
+                    }
+                }
             }
         }
 
-        if (linkCt == 0) return;
+        // distribute available credit to blocked links
+        final int batch = perLinkCredit();
+        while (_credit > 0 && !_blocked.isEmpty())
+        {
+            Receiver link = _blocked.get(0);
+            _blocked.remove(0);
+
+            final int more = Math.min(_credit, batch);
+            _distributed += more;
+            _credit -= more;
 
-        if (_receiving < 0)
-        {
-            _credit = linkCt * _creditBatch - incoming();
-        } else {
-            int total = _credit + _distributed;
-            if (_receiving > total)
-                _credit += _receiving - total;
+            link.flow(more);
+            _credited.add(link);
+
+            // flow changed, must process it
+            ConnectionContext ctx = (ConnectionContext) link.getSession().getConnection().getContext();
+            try
+            {
+                ctx.getConnector().process();
+            } catch (IOException e) {
+                _logger.log(Level.SEVERE, "Error processing connection", e);
+            }
         }
 
-        int batch = (_credit < linkCt) ? 1 : (_credit/linkCt);
-        for (Connector<?> c : _driver.connectors())
+        if (_blocked.isEmpty())
         {
-            if (c.isClosed()) continue;
-            Connection connection = c.getConnection();
-            for (Link link : new Links(connection, ACTIVE, ANY))
+            _next_drain = 0;
+        }
+        else
+        {
+            // not enough credit for all links - start draining granted credit
+            if (_draining == 0)
             {
-                if (link instanceof Receiver)
+                // don't do it too often - pace ourselves (it's expensive)
+                if (_next_drain == 0)
                 {
-                    int have = ((Receiver) link).getCredit();
-                    if (have < batch)
+                    _next_drain = System.currentTimeMillis() + 250;
+                }
+                else if (_next_drain <= System.currentTimeMillis())
+                {
+                    // initiate drain, free up at most enough to satisfy blocked
+                    _next_drain = 0;
+                    int needed = _blocked.size() * batch;
+
+                    for (Receiver link : _credited)
                     {
-                        int need = batch - have;
-                        int amount = (_credit < need) ? _credit : need;
-                        ((Receiver) link).flow(amount);
-                        _credit -= amount;
-                        _distributed += amount;
-                        if (_credit == 0) return;
+                        if (!link.getDrain()) {
+                            link.setDrain(true);
+                            needed -= link.getRemoteCredit();
+                            _draining++;
+                            // drain requested on link, must process it
+                            ConnectionContext ctx = (ConnectionContext) link.getSession().getConnection().getContext();
+                            try
+                            {
+                                ctx.getConnector().process();
+                            } catch (IOException e) {
+                                _logger.log(Level.SEVERE, "Error processing connection", e);
+                            }
+                            if (needed <= 0) break;
+                        }
                     }
                 }
             }
@@ -1058,7 +1202,7 @@ public class MessengerImpl implements Me
             connection = Proton.connection();
             connection.setContainer(_name);
             connection.setHostname(host);
-            connection.setContext(service);
+            connection.setContext(new ConnectionContext(service, connector));
             connector.setConnection(connection);
             Sasl sasl = connector.sasl();
             if (sasl != null)
@@ -1077,6 +1221,7 @@ public class MessengerImpl implements Me
         Session session = connection.session();
         session.open();
         C link = finder.create(session);
+        linkAdded(link);
         link.open();
         return link;
     }
@@ -1232,4 +1377,97 @@ public class MessengerImpl implements Me
         return builder.toString();
     }
 
+    // compute the maximum amount of credit each receiving link is
+    // entitled to.  The actual credit given to the link depends on
+    // what amount of credit is actually available.
+    private int perLinkCredit()
+    {
+        if (_receivers == 0) return 0;
+        int total = _credit + _distributed;
+        return Math.max(total/_receivers, 1);
+    }
+
+    // a new link has been created, account for it.
+    private void linkAdded(Link link)
+    {
+        if (link instanceof Receiver)
+        {
+            _receivers++;
+            _blocked.add((Receiver)link);
+        }
+    }
+
+    // a link is being removed, account for it.
+    private void linkRemoved(Link _link)
+    {
+        if (_link instanceof Receiver)
+        {
+            Receiver link = (Receiver)_link;
+            assert _receivers > 0;
+            _receivers--;
+            if (link.getDrain())
+            {
+                link.setDrain(false);
+                assert _draining > 0;
+                _draining--;
+            }
+            if (_blocked.contains(link))
+                _blocked.remove(link);
+            else if (_credited.contains(link))
+                _credited.remove(link);
+            else
+                assert(false);
+        }
+    }
+
+    private class ConnectionContext
+    {
+        private String _service;
+        private Connector _connector;
+
+        public ConnectionContext(String service, Connector connector)
+        {
+            _service = service;
+            _connector = connector;
+        }
+
+        public String getService()
+        {
+            return _service;
+        }
+
+        public Connector getConnector()
+        {
+            return _connector;
+        }
+    }
+
+    private class ListenerContext
+    {
+        private String _host;
+        private String _port;
+        private String _service;  // for now. move to subscription later
+
+        public ListenerContext(String service, String host, String port)
+        {
+            _service = service;
+            _host = host;
+            _port = port;
+        }
+
+        public String getService()
+        {
+            return _service;
+        }
+
+        public String getHost()
+        {
+            return _host;
+        }
+
+        public String getPort()
+        {
+            return _port;
+        }
+    }
 }

Modified: qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/messenger/impl/StoreEntry.java
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/messenger/impl/StoreEntry.java?rev=1547534&r1=1547533&r2=1547534&view=diff
==============================================================================
--- qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/messenger/impl/StoreEntry.java (original)
+++ qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/messenger/impl/StoreEntry.java Tue Dec  3 19:16:11 2013
@@ -123,6 +123,10 @@ class StoreEntry
         return _status;
     }
 
+    public void setStatus(Status status)
+    {
+        _status = status;
+    }
 
     private static Status _disp2status(DeliveryState disp)
     {

Modified: qpid/proton/trunk/tests/python/proton_tests/messenger.py
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/tests/python/proton_tests/messenger.py?rev=1547534&r1=1547533&r2=1547534&view=diff
==============================================================================
--- qpid/proton/trunk/tests/python/proton_tests/messenger.py (original)
+++ qpid/proton/trunk/tests/python/proton_tests/messenger.py Tue Dec  3 19:16:11 2013
@@ -6,9 +6,9 @@
 # to you under the Apache License, Version 2.0 (the
 # "License"); you may not use this file except in compliance
 # with the License.  You may obtain a copy of the License at
-# 
+#
 #   http://www.apache.org/licenses/LICENSE-2.0
-# 
+#
 # Unless required by applicable law or agreed to in writing,
 # software distributed under the License is distributed on an
 # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -640,9 +640,6 @@ class MessengerTest(Test):
     """ The server is given a fixed amount of credit, and runs until that
     credit is exhausted.
     """
-    if sys.platform.startswith("java"):
-        raise Skipped("Skipping testCreditBlockingRebalance - credit scheduler TBD for Java Messenger")
-
     self.server_finite_credit = True
     self.server_credit = 11
     self.start()
@@ -794,9 +791,6 @@ class NBMessengerTest(common.Test):
     """ Verify that a fixed amount of credit will redistribute to new
     links.
     """
-    if sys.platform.startswith("java"):
-        raise Skipped("Skipping testCreditRedistribution - credit scheduler TBD for Java Messenger")
-
     self.server.recv( 5 )
 
     # first link will get all credit
@@ -829,9 +823,6 @@ class NBMessengerTest(common.Test):
     """ Verify that credit is reclaimed when a link with outstanding credit is
     torn down.
     """
-    if sys.platform.startswith("java"):
-        raise Skipped("Skipping testCreditReclaim - credit scheduler TBD for Java Messenger")
-
     self.server.recv( 9 )
 
     # first link will get all credit
@@ -893,15 +884,10 @@ class NBMessengerTest(common.Test):
     assert self.server.incoming == 9, self.server.incoming
     assert self.server.receiving == 0, self.server.receiving
 
-
-
   def testCreditReplenish(self):
     """ When extra credit is available it should be granted to the first
     link that can use it.
     """
-    if sys.platform.startswith("java"):
-        raise Skipped("Skipping testCreditReplenish - credit scheduler TBD for Java Messenger")
-
     # create three links
     msg = Message()
     for i in range(3):



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org