You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by kg...@apache.org on 2013/12/03 20:16:12 UTC
svn commit: r1547534 - in /qpid/proton/trunk:
proton-c/bindings/java/src/main/java/org/apache/qpid/proton/engine/jni/
proton-c/bindings/java/src/main/java/org/apache/qpid/proton/messenger/jni/
proton-c/include/proton/ proton-c/src/messenger/ proton-j/p...
Author: kgiusti
Date: Tue Dec 3 19:16:11 2013
New Revision: 1547534
URL: http://svn.apache.org/r1547534
Log:
PROTON-200: Java implementation of credit scheduler
Modified:
qpid/proton/trunk/proton-c/bindings/java/src/main/java/org/apache/qpid/proton/engine/jni/JNIDelivery.java
qpid/proton/trunk/proton-c/bindings/java/src/main/java/org/apache/qpid/proton/engine/jni/JNILink.java
qpid/proton/trunk/proton-c/bindings/java/src/main/java/org/apache/qpid/proton/engine/jni/JNIReceiver.java
qpid/proton/trunk/proton-c/bindings/java/src/main/java/org/apache/qpid/proton/messenger/jni/JNIMessenger.java
qpid/proton/trunk/proton-c/include/proton/messenger.h
qpid/proton/trunk/proton-c/src/messenger/messenger.c
qpid/proton/trunk/proton-j/proton-api/src/main/java/org/apache/qpid/proton/driver/Driver.java
qpid/proton/trunk/proton-j/proton-api/src/main/java/org/apache/qpid/proton/engine/Delivery.java
qpid/proton/trunk/proton-j/proton-api/src/main/java/org/apache/qpid/proton/engine/Link.java
qpid/proton/trunk/proton-j/proton-api/src/main/java/org/apache/qpid/proton/engine/Receiver.java
qpid/proton/trunk/proton-j/proton-api/src/main/java/org/apache/qpid/proton/messenger/Messenger.java
qpid/proton/trunk/proton-j/proton-api/src/main/java/org/apache/qpid/proton/messenger/Status.java
qpid/proton/trunk/proton-j/proton-api/src/main/resources/proton.py
qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/driver/impl/DriverImpl.java
qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/engine/impl/DeliveryImpl.java
qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/engine/impl/LinkImpl.java
qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/engine/impl/ReceiverImpl.java
qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/messenger/impl/MessengerImpl.java
qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/messenger/impl/StoreEntry.java
qpid/proton/trunk/tests/python/proton_tests/messenger.py
Modified: qpid/proton/trunk/proton-c/bindings/java/src/main/java/org/apache/qpid/proton/engine/jni/JNIDelivery.java
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/bindings/java/src/main/java/org/apache/qpid/proton/engine/jni/JNIDelivery.java?rev=1547534&r1=1547533&r2=1547534&view=diff
==============================================================================
--- qpid/proton/trunk/proton-c/bindings/java/src/main/java/org/apache/qpid/proton/engine/jni/JNIDelivery.java (original)
+++ qpid/proton/trunk/proton-c/bindings/java/src/main/java/org/apache/qpid/proton/engine/jni/JNIDelivery.java Tue Dec 3 19:16:11 2013
@@ -266,4 +266,9 @@ public class JNIDelivery implements Deli
return (int) Proton.pn_delivery_pending(_impl);
}
+ @ProtonCEquivalent("pn_delivery_buffered")
+ public boolean isBuffered()
+ {
+ return Proton.pn_delivery_buffered(_impl);
+ }
}
Modified: qpid/proton/trunk/proton-c/bindings/java/src/main/java/org/apache/qpid/proton/engine/jni/JNILink.java
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/bindings/java/src/main/java/org/apache/qpid/proton/engine/jni/JNILink.java?rev=1547534&r1=1547533&r2=1547534&view=diff
==============================================================================
--- qpid/proton/trunk/proton-c/bindings/java/src/main/java/org/apache/qpid/proton/engine/jni/JNILink.java (original)
+++ qpid/proton/trunk/proton-c/bindings/java/src/main/java/org/apache/qpid/proton/engine/jni/JNILink.java Tue Dec 3 19:16:11 2013
@@ -538,4 +538,17 @@ abstract class JNILink implements Link
return Proton.pn_link_drained(getImpl());
}
+ @Override
+ @ProtonCEquivalent("pn_link_remote_credit")
+ public int getRemoteCredit()
+ {
+ return Proton.pn_link_remote_credit(getImpl());
+ }
+
+ @Override
+ @ProtonCEquivalent("pn_link_get_drain")
+ public boolean getDrain()
+ {
+ return Proton.pn_link_get_drain(getImpl());
+ }
}
Modified: qpid/proton/trunk/proton-c/bindings/java/src/main/java/org/apache/qpid/proton/engine/jni/JNIReceiver.java
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/bindings/java/src/main/java/org/apache/qpid/proton/engine/jni/JNIReceiver.java?rev=1547534&r1=1547533&r2=1547534&view=diff
==============================================================================
--- qpid/proton/trunk/proton-c/bindings/java/src/main/java/org/apache/qpid/proton/engine/jni/JNIReceiver.java (original)
+++ qpid/proton/trunk/proton-c/bindings/java/src/main/java/org/apache/qpid/proton/engine/jni/JNIReceiver.java Tue Dec 3 19:16:11 2013
@@ -62,4 +62,10 @@ public class JNIReceiver extends JNILink
return Proton.pn_link_draining(getImpl());
}
+ @Override
+ @ProtonCEquivalent("pn_link_set_drain")
+ public void setDrain(boolean drain)
+ {
+ Proton.pn_link_set_drain(getImpl(), drain);
+ }
}
Modified: qpid/proton/trunk/proton-c/bindings/java/src/main/java/org/apache/qpid/proton/messenger/jni/JNIMessenger.java
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/bindings/java/src/main/java/org/apache/qpid/proton/messenger/jni/JNIMessenger.java?rev=1547534&r1=1547533&r2=1547534&view=diff
==============================================================================
--- qpid/proton/trunk/proton-c/bindings/java/src/main/java/org/apache/qpid/proton/messenger/jni/JNIMessenger.java (original)
+++ qpid/proton/trunk/proton-c/bindings/java/src/main/java/org/apache/qpid/proton/messenger/jni/JNIMessenger.java Tue Dec 3 19:16:11 2013
@@ -113,6 +113,12 @@ class JNIMessenger implements Messenger
}
@Override
+ public int receiving()
+ {
+ return Proton.pn_messenger_receiving(_impl);
+ }
+
+ @Override
public Message get()
{
SWIGTYPE_p_pn_message_t msg = Proton.pn_message();
@@ -295,7 +301,7 @@ class JNIMessenger implements Messenger
private void check(int errorCode) throws ProtonException
{
- if(errorCode != 0 && errorCode != Proton.PN_INPROGRESS)
+ if(errorCode < 0 && errorCode != Proton.PN_INPROGRESS)
{
String errorMessage = Proton.pn_error_text(Proton.pn_messenger_error(_impl));
if(errorCode == Proton.PN_TIMEOUT)
Modified: qpid/proton/trunk/proton-c/include/proton/messenger.h
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/include/proton/messenger.h?rev=1547534&r1=1547533&r2=1547534&view=diff
==============================================================================
--- qpid/proton/trunk/proton-c/include/proton/messenger.h (original)
+++ qpid/proton/trunk/proton-c/include/proton/messenger.h Tue Dec 3 19:16:11 2013
@@ -365,7 +365,10 @@ PN_EXTERN pn_tracker_t pn_messenger_outg
* This will block for the indicated timeout.
*
* @param[in] messenger the Messenger
- * @param[in] timeout the maximum time to block
+ * @param[in] timeout the maximum time to block in milliseconds, -1 ==
+ * forever, 0 == do not block
+ *
+ * @return 0 if no work to do, < 0 if error, or 1 if work was done.
*/
PN_EXTERN int pn_messenger_work(pn_messenger_t *messenger, int timeout);
Modified: qpid/proton/trunk/proton-c/src/messenger/messenger.c
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/src/messenger/messenger.c?rev=1547534&r1=1547533&r2=1547534&view=diff
==============================================================================
--- qpid/proton/trunk/proton-c/src/messenger/messenger.c (original)
+++ qpid/proton/trunk/proton-c/src/messenger/messenger.c Tue Dec 3 19:16:11 2013
@@ -1512,7 +1512,9 @@ static bool work_pred(pn_messenger_t *me
int pn_messenger_work(pn_messenger_t *messenger, int timeout)
{
messenger->worked = false;
- return pn_messenger_tsync(messenger, work_pred, timeout);
+ int err = pn_messenger_tsync(messenger, work_pred, timeout);
+ if (err) return err;
+ return (int) (messenger->worked ? 1 : 0);
}
int pni_messenger_work(pn_messenger_t *messenger)
Modified: qpid/proton/trunk/proton-j/proton-api/src/main/java/org/apache/qpid/proton/driver/Driver.java
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-j/proton-api/src/main/java/org/apache/qpid/proton/driver/Driver.java?rev=1547534&r1=1547533&r2=1547534&view=diff
==============================================================================
--- qpid/proton/trunk/proton-j/proton-api/src/main/java/org/apache/qpid/proton/driver/Driver.java (original)
+++ qpid/proton/trunk/proton-j/proton-api/src/main/java/org/apache/qpid/proton/driver/Driver.java Tue Dec 3 19:16:11 2013
@@ -55,7 +55,7 @@ public interface Driver
*
* Thread-safe.
*
- * @param timeout maximum time in milliseconds to wait. 0 means wait indefinitely.
+ * @param timeout maximum time in milliseconds to wait. -1 means wait indefinitely.
*
* @param returns true if woken up
*/
Modified: qpid/proton/trunk/proton-j/proton-api/src/main/java/org/apache/qpid/proton/engine/Delivery.java
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-j/proton-api/src/main/java/org/apache/qpid/proton/engine/Delivery.java?rev=1547534&r1=1547533&r2=1547534&view=diff
==============================================================================
--- qpid/proton/trunk/proton-j/proton-api/src/main/java/org/apache/qpid/proton/engine/Delivery.java (original)
+++ qpid/proton/trunk/proton-j/proton-api/src/main/java/org/apache/qpid/proton/engine/Delivery.java Tue Dec 3 19:16:11 2013
@@ -104,4 +104,6 @@ public interface Delivery
public int pending();
+ public boolean isBuffered();
+
}
Modified: qpid/proton/trunk/proton-j/proton-api/src/main/java/org/apache/qpid/proton/engine/Link.java
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-j/proton-api/src/main/java/org/apache/qpid/proton/engine/Link.java?rev=1547534&r1=1547533&r2=1547534&view=diff
==============================================================================
--- qpid/proton/trunk/proton-j/proton-api/src/main/java/org/apache/qpid/proton/engine/Link.java (original)
+++ qpid/proton/trunk/proton-j/proton-api/src/main/java/org/apache/qpid/proton/engine/Link.java Tue Dec 3 19:16:11 2013
@@ -182,4 +182,6 @@ public interface Link extends Endpoint
public int drained();
+ public int getRemoteCredit();
+ public boolean getDrain();
}
Modified: qpid/proton/trunk/proton-j/proton-api/src/main/java/org/apache/qpid/proton/engine/Receiver.java
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-j/proton-api/src/main/java/org/apache/qpid/proton/engine/Receiver.java?rev=1547534&r1=1547533&r2=1547534&view=diff
==============================================================================
--- qpid/proton/trunk/proton-j/proton-api/src/main/java/org/apache/qpid/proton/engine/Receiver.java (original)
+++ qpid/proton/trunk/proton-j/proton-api/src/main/java/org/apache/qpid/proton/engine/Receiver.java Tue Dec 3 19:16:11 2013
@@ -71,4 +71,6 @@ public interface Receiver extends Link
public boolean draining();
+ public void setDrain(boolean drain);
+
}
Modified: qpid/proton/trunk/proton-j/proton-api/src/main/java/org/apache/qpid/proton/messenger/Messenger.java
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-j/proton-api/src/main/java/org/apache/qpid/proton/messenger/Messenger.java?rev=1547534&r1=1547533&r2=1547534&view=diff
==============================================================================
--- qpid/proton/trunk/proton-j/proton-api/src/main/java/org/apache/qpid/proton/messenger/Messenger.java (original)
+++ qpid/proton/trunk/proton-j/proton-api/src/main/java/org/apache/qpid/proton/messenger/Messenger.java Tue Dec 3 19:16:11 2013
@@ -118,6 +118,14 @@ public interface Messenger
*/
void recv(int count) throws TimeoutException;
/**
+ * Returns the capacity of the incoming message queue of
+ * messenger. Note this count does not include those messages
+ * already available on the incoming queue (see
+ * incoming()). Rather it returns the number of incoming queue
+ * entries available for receiving messages
+ */
+ int receiving();
+ /**
* Returns the message from the head of the incoming message
* queue.
*/
@@ -142,7 +150,12 @@ public interface Messenger
boolean stopped();
- boolean work(long timeout);
+ /** Sends or receives any outstanding messages queued for a
+ * messenger. If timeout is zero, no blocking is done. A timeout
+ * of -1 blocks forever, otherwise timeout is the maximum time (in
+ * millisecs) to block. Returns True if work was performed.
+ */
+ boolean work(long timeout) throws TimeoutException;
void interrupt();
Modified: qpid/proton/trunk/proton-j/proton-api/src/main/java/org/apache/qpid/proton/messenger/Status.java
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-j/proton-api/src/main/java/org/apache/qpid/proton/messenger/Status.java?rev=1547534&r1=1547533&r2=1547534&view=diff
==============================================================================
--- qpid/proton/trunk/proton-j/proton-api/src/main/java/org/apache/qpid/proton/messenger/Status.java (original)
+++ qpid/proton/trunk/proton-j/proton-api/src/main/java/org/apache/qpid/proton/messenger/Status.java Tue Dec 3 19:16:11 2013
@@ -26,5 +26,6 @@ public enum Status
PENDING,
ACCEPTED,
REJECTED,
- MODIFIED
+ MODIFIED,
+ ABORTED
}
Modified: qpid/proton/trunk/proton-j/proton-api/src/main/resources/proton.py
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-j/proton-api/src/main/resources/proton.py?rev=1547534&r1=1547533&r2=1547534&view=diff
==============================================================================
--- qpid/proton/trunk/proton-j/proton-api/src/main/resources/proton.py (original)
+++ qpid/proton/trunk/proton-j/proton-api/src/main/resources/proton.py Tue Dec 3 19:16:11 2013
@@ -25,6 +25,7 @@ except NameError:
from org.apache.qpid.proton import Proton, ProtonUnsupportedOperationException
from org.apache.qpid.proton import InterruptException as Interrupt
+from org.apache.qpid.proton import TimeoutException as Timeout
from org.apache.qpid.proton.engine import \
Transport as JTransport, Sender as JSender, Receiver as JReceiver, \
Sasl, SslDomain as JSslDomain, \
@@ -42,7 +43,6 @@ from org.apache.qpid.proton.amqp import
Decimal32, Decimal64, Decimal128
from jarray import zeros, array
from java.util import EnumSet, UUID as JUUID, Date as JDate, HashMap
-from java.util.concurrent import TimeoutException as Timeout
from java.nio import ByteBuffer
from java.lang import Character as JCharacter, String as JString, Integer as JInteger
from java.lang import NoClassDefFoundError
@@ -1321,12 +1321,20 @@ class Messenger(object):
def recv(self, n=-1):
self.impl.recv(n)
+ @property
+ def receiving(self):
+ return self.impl.receiving()
+
def work(self, timeout=None):
if timeout is None:
t = -1
else:
t = long(1000*timeout)
- return self.impl.work(t)
+ try:
+ err = self.impl.work(t)
+ except Timeout, e:
+ return False
+ return err
def interrupt(self):
self.impl.interrupt()
Modified: qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/driver/impl/DriverImpl.java
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/driver/impl/DriverImpl.java?rev=1547534&r1=1547533&r2=1547534&view=diff
==============================================================================
--- qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/driver/impl/DriverImpl.java (original)
+++ qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/driver/impl/DriverImpl.java Tue Dec 3 19:16:11 2013
@@ -23,6 +23,7 @@ package org.apache.qpid.proton.driver.im
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
+import java.net.StandardSocketOptions;
import java.nio.channels.ClosedChannelException;
import java.nio.channels.SelectableChannel;
import java.nio.channels.SelectionKey;
@@ -192,6 +193,8 @@ public class DriverImpl implements Drive
{
SocketChannel channel = SocketChannel.open();
channel.configureBlocking(false);
+ // Disable the Nagle algorithm on TCP connections.
+ channel.setOption(StandardSocketOptions.TCP_NODELAY, true);
channel.connect(new InetSocketAddress(host, port));
return createConnector(channel, context);
}
Modified: qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/engine/impl/DeliveryImpl.java
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/engine/impl/DeliveryImpl.java?rev=1547534&r1=1547533&r2=1547534&view=diff
==============================================================================
--- qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/engine/impl/DeliveryImpl.java (original)
+++ qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/engine/impl/DeliveryImpl.java Tue Dec 3 19:16:11 2013
@@ -373,7 +373,7 @@ public class DeliveryImpl implements Del
_updated = true;
}
- boolean isBuffered()
+ public boolean isBuffered()
{
if (_remoteSettled) return false;
if (getLink() instanceof SenderImpl) {
Modified: qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/engine/impl/LinkImpl.java
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/engine/impl/LinkImpl.java?rev=1547534&r1=1547533&r2=1547534&view=diff
==============================================================================
--- qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/engine/impl/LinkImpl.java (original)
+++ qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/engine/impl/LinkImpl.java Tue Dec 3 19:16:11 2013
@@ -55,7 +55,6 @@ public abstract class LinkImpl extends E
private final LinkNode<LinkImpl> _node;
private boolean _drain;
-
LinkImpl(SessionImpl session, String name)
{
_session = session;
@@ -296,7 +295,7 @@ public abstract class LinkImpl extends E
_drain = drain;
}
- boolean getDrain()
+ public boolean getDrain()
{
return _drain;
}
@@ -378,4 +377,8 @@ public abstract class LinkImpl extends E
_drained = value;
}
+ public int getRemoteCredit()
+ {
+ return _credit - _queued;
+ }
}
Modified: qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/engine/impl/ReceiverImpl.java
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/engine/impl/ReceiverImpl.java?rev=1547534&r1=1547533&r2=1547534&view=diff
==============================================================================
--- qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/engine/impl/ReceiverImpl.java (original)
+++ qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/engine/impl/ReceiverImpl.java Tue Dec 3 19:16:11 2013
@@ -27,6 +27,7 @@ import org.apache.qpid.proton.engine.Rec
public class ReceiverImpl extends LinkImpl implements Receiver
{
+ private boolean _drainFlagMode = true;
@Override
public boolean advance()
@@ -61,10 +62,14 @@ public class ReceiverImpl extends LinkIm
public void flow(final int credits)
{
- modified();
addCredit(credits);
- setDrain(false);
_unsentCredits += credits;
+ modified();
+ if (!_drainFlagMode)
+ {
+ setDrain(false);
+ _drainFlagMode = false;
+ }
}
int clearUnsentCredits()
@@ -122,8 +127,9 @@ public class ReceiverImpl extends LinkIm
public void drain(int credit)
{
- flow(credit);
setDrain(true);
+ flow(credit);
+ _drainFlagMode = false;
}
public boolean draining()
@@ -131,4 +137,11 @@ public class ReceiverImpl extends LinkIm
return getDrain() && (getCredit() > getQueued());
}
+ @Override
+ public void setDrain(boolean drain)
+ {
+ super.setDrain(drain);
+ modified();
+ _drainFlagMode = true;
+ }
}
Modified: qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/messenger/impl/MessengerImpl.java
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/messenger/impl/MessengerImpl.java?rev=1547534&r1=1547533&r2=1547534&view=diff
==============================================================================
--- qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/messenger/impl/MessengerImpl.java (original)
+++ qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/messenger/impl/MessengerImpl.java Tue Dec 3 19:16:11 2013
@@ -61,6 +61,12 @@ import org.apache.qpid.proton.amqp.Binar
public class MessengerImpl implements Messenger
{
+ private enum LinkCreditMode
+ {
+ // method for replenishing credit
+ LINK_CREDIT_EXPLICIT, // recv(N)
+ LINK_CREDIT_AUTO; // recv()
+ }
private static final EnumSet<EndpointState> UNINIT = EnumSet.of(EndpointState.UNINITIALIZED);
private static final EnumSet<EndpointState> ACTIVE = EnumSet.of(EndpointState.ACTIVE);
@@ -73,10 +79,15 @@ public class MessengerImpl implements Me
private boolean _blocking = true;
private long _nextTag = 1;
private Driver _driver;
- private int _receiving = 0;
- private static final int _creditBatch = 1024;
- private int _credit;
- private int _distributed;
+ private LinkCreditMode _credit_mode = LinkCreditMode.LINK_CREDIT_EXPLICIT;
+ private final int _credit_batch = 1024; // credit_mode == LINK_CREDIT_AUTO
+ private int _credit; // available
+ private int _distributed; // outstanding credit
+ private int _receivers; // total # receiver Links
+ private int _draining; // # Links in drain state
+ private List<Receiver> _credited = new ArrayList<Receiver>();
+ private List<Receiver> _blocked = new ArrayList<Receiver>();
+ private long _next_drain;
private TrackerImpl _incomingTracker;
private TrackerImpl _outgoingTracker;
private Store _incomingStore = new Store();
@@ -165,7 +176,7 @@ public class MessengerImpl implements Me
return _allClosed.test();
}
- public boolean work(long timeout)
+ public boolean work(long timeout) throws TimeoutException
{
if (_driver == null) { return false; }
_worked = false;
@@ -285,10 +296,38 @@ public class MessengerImpl implements Me
pumpOut(m.getAddress(), sender);
}
+ private void reclaimLink(Link link)
+ {
+ if (link instanceof Receiver)
+ {
+ int credit = link.getCredit();
+ if (credit > 0)
+ {
+ _credit += credit;
+ _distributed -= credit;
+ }
+ }
+
+ Iterator<Delivery> dIter = link.unsettled();
+ while (dIter != null && dIter.hasNext())
+ {
+ Delivery delivery = (Delivery) dIter.next();
+ StoreEntry entry = (StoreEntry) delivery.getContext();
+ if (entry != null)
+ {
+ entry.setDelivery(null);
+ if (delivery.isBuffered())
+ entry.setStatus(Status.ABORTED);
+ }
+ }
+ linkRemoved(link);
+ }
+
private int pumpOut( String address, Sender sender )
{
StoreEntry entry = _outgoingStore.get( address );
if (entry == null) {
+ sender.drained();
return 0;
}
@@ -342,12 +381,24 @@ public class MessengerImpl implements Me
throw new IllegalStateException("cannot recv while messenger is stopped");
}
- if(_logger.isLoggable(Level.FINE))
+ if (_logger.isLoggable(Level.FINE) && n != -1)
{
_logger.fine(this + " about to wait for up to " + n + " messages to be received");
}
- _receiving = n;
+ if (n == -1)
+ {
+ _credit_mode = LinkCreditMode.LINK_CREDIT_AUTO;
+ }
+ else
+ {
+ _credit_mode = LinkCreditMode.LINK_CREDIT_EXPLICIT;
+ if (n > _distributed)
+ _credit = n - _distributed;
+ else // cancel unallocated
+ _credit = 0;
+ }
+
distributeCredit();
waitUntil(_messageAvailable);
@@ -360,7 +411,7 @@ public class MessengerImpl implements Me
public int receiving()
{
- return _receiving;
+ return _credit + _distributed;
}
public Message get()
@@ -375,10 +426,8 @@ public class MessengerImpl implements Me
_incomingStore.trackEntry(entry));
_incomingStore.freeEntry( entry );
- _distributed--;
return message;
}
-
return null;
}
@@ -400,6 +449,36 @@ public class MessengerImpl implements Me
}
entry.setEncodedMsg( buffer, size );
receiver.advance();
+
+ // account for the used credit, replenish if
+ // low (< 20% maximum per-link batch) and
+ // extra credit available
+ assert(_distributed > 0);
+ _distributed--;
+ if (!receiver.getDrain() && _blocked.isEmpty() && _credit > 0)
+ {
+ final int max = perLinkCredit();
+ final int lo_thresh = (int)(max * 0.2 + 0.5);
+ if (receiver.getRemoteCredit() < lo_thresh)
+ {
+ final int more = Math.min(_credit, max - receiver.getRemoteCredit());
+ _credit -= more;
+ _distributed += more;
+ receiver.flow(more);
+ }
+ }
+ // check if blocked
+ if (receiver.getRemoteCredit() == 0 && _credited.contains(receiver))
+ {
+ _credited.remove(receiver);
+ if (receiver.getDrain())
+ {
+ receiver.setDrain(false);
+ assert( _draining > 0 );
+ _draining--;
+ }
+ _blocked.add(receiver);
+ }
}
return 0;
}
@@ -423,7 +502,8 @@ public class MessengerImpl implements Me
{
_logger.fine(this + " about to subscribe to source " + source + " using address " + hostName + ":" + port);
}
- _driver.createListener(hostName, port, null);
+ ListenerContext ctx = new ListenerContext( address.getScheme(), hostName, ports );
+ _driver.createListener(hostName, port, ctx);
}
else
{
@@ -585,6 +665,8 @@ public class MessengerImpl implements Me
Connector<?> c = l.accept();
Connection connection = Proton.connection();
connection.setContainer(_name);
+ ListenerContext ctx = (ListenerContext) l.getContext();
+ connection.setContext(new ConnectionContext(ctx.getService(), c));
c.setConnection(connection);
//TODO: SSL and full SASL
Sasl sasl = c.sasl();
@@ -596,18 +678,10 @@ public class MessengerImpl implements Me
}
connection.open();
}
- //process active connectors, handling opened & closed connections as needed
+ // process connectors, reclaiming credit on closed connectors
for (Connector<?> c = _driver.connector(); c != null; c = _driver.connector())
{
_worked = true;
- _logger.log(Level.FINE, "Processing active connector " + c);
- try
- {
- c.process();
- } catch (IOException e) {
- _logger.log(Level.SEVERE, "Error processing connection", e);
- }
- processEndpoints(c);
if (c.isClosed())
{
_awaitingDestruction.add(c);
@@ -615,9 +689,12 @@ public class MessengerImpl implements Me
}
else
{
+ _logger.log(Level.FINE, "Processing active connector " + c);
try
{
c.process();
+ processEndpoints(c);
+ c.process();
}
catch (IOException e)
{
@@ -672,6 +749,7 @@ public class MessengerImpl implements Me
//TODO: the following is not correct; should only copy those properties that we understand
link.setSource(link.getRemoteSource());
link.setTarget(link.getRemoteTarget());
+ linkAdded(link);
link.open();
_logger.log(Level.FINE, "Opened link " + link);
}
@@ -686,14 +764,23 @@ public class MessengerImpl implements Me
}
}
- for (Link link : new Links(connection, ACTIVE, CLOSED))
- {
- link.close();
- }
for (Session session : new Sessions(connection, ACTIVE, CLOSED))
{
session.close();
}
+
+ for (Link link : new Links(connection, ANY, CLOSED))
+ {
+ if (link.getLocalState() == EndpointState.ACTIVE)
+ {
+ link.close();
+ }
+ else
+ {
+ reclaimLink(link);
+ }
+ }
+
if (connection.getRemoteState() == EndpointState.CLOSED)
{
if (connection.getLocalState() == EndpointState.ACTIVE)
@@ -729,7 +816,7 @@ public class MessengerImpl implements Me
// wait until timeout expires or until test is true
long now = System.currentTimeMillis();
- long deadline = timeout < 0 ? Long.MAX_VALUE : now + timeout;
+ final long deadline = timeout < 0 ? Long.MAX_VALUE : now + timeout;
boolean done = false;
while (true)
@@ -737,21 +824,31 @@ public class MessengerImpl implements Me
done = condition.test();
if (done) break;
- boolean woken;
- if ( timeout >= 0 ) {
- long remaining = deadline - now;
+ long remaining;
+ if (timeout < 0)
+ remaining = -1;
+ else {
+ remaining = deadline - now;
if (remaining < 0) break;
- woken = _driver.doWait(remaining);
- } else {
- woken = _driver.doWait(-1);
}
+
+ // Update the credit scheduler. If the scheduler detects
+ // credit imbalance on the links, wake up in time to
+ // service credit drain
+ distributeCredit();
+ if (_next_drain != 0)
+ {
+ long wakeup = (_next_drain > now) ? _next_drain - now : 0;
+ remaining = (remaining == -1) ? wakeup : Math.min(remaining, wakeup);
+ }
+
+ boolean woken;
+ woken = _driver.doWait(remaining);
processActive();
if (woken) {
throw new InterruptException();
}
- if (timeout >= 0) {
- now = System.currentTimeMillis();
- }
+ now = System.currentTimeMillis();
}
return done;
@@ -762,7 +859,8 @@ public class MessengerImpl implements Me
for (Connector<?> c : _driver.connectors())
{
Connection connection = c.getConnection();
- if (host.equals(connection.getRemoteContainer()) || service.equals(connection.getContext()))
+ ConnectionContext ctx = (ConnectionContext) connection.getContext();
+ if (host.equals(connection.getRemoteContainer()) || service.equals(ctx.getService()))
{
return connection;
}
@@ -774,62 +872,108 @@ public class MessengerImpl implements Me
{
for (Link link : new Links(connection, ANY, ANY))
{
- if (link instanceof Receiver && link.getCredit() > 0)
- {
- reclaimCredit(link.getCredit());
- }
+ reclaimLink(link);
}
}
- private void reclaimCredit(int credit)
- {
- _credit += credit;
- _distributed -= credit;
- }
-
private void distributeCredit()
{
- int linkCt = 0;
- // @todo track the number of opened receive links
- for (Connector<?> c : _driver.connectors())
+ if (_receivers == 0) return;
+
+ if (_credit_mode == LinkCreditMode.LINK_CREDIT_AUTO)
{
- if (c.isClosed()) continue;
- Connection connection = c.getConnection();
- for (Link link : new Links(connection, ACTIVE, ANY))
+ // replenish, but limit the max total messages buffered
+ final int max = _receivers * _credit_batch;
+ final int used = _distributed + incoming();
+ if (max > used)
+ _credit = max - used;
+ }
+
+ // reclaim any credit left over after draining links has completed
+ if (_draining > 0)
+ {
+ Iterator<Receiver> itr = _credited.iterator();
+ while (itr.hasNext())
{
- if (link instanceof Receiver) linkCt++;
+ Receiver link = (Receiver) itr.next();
+ if (link.getDrain())
+ {
+ if (!link.draining())
+ {
+ // drain completed for this link
+ int drained = link.drained();
+ assert(_distributed >= drained);
+ _distributed -= drained;
+ _credit += drained;
+ link.setDrain(false);
+ _draining--;
+ itr.remove();
+ _blocked.add(link);
+ }
+ }
}
}
- if (linkCt == 0) return;
+ // distribute available credit to blocked links
+ final int batch = perLinkCredit();
+ while (_credit > 0 && !_blocked.isEmpty())
+ {
+ Receiver link = _blocked.get(0);
+ _blocked.remove(0);
+
+ final int more = Math.min(_credit, batch);
+ _distributed += more;
+ _credit -= more;
- if (_receiving < 0)
- {
- _credit = linkCt * _creditBatch - incoming();
- } else {
- int total = _credit + _distributed;
- if (_receiving > total)
- _credit += _receiving - total;
+ link.flow(more);
+ _credited.add(link);
+
+ // flow changed, must process it
+ ConnectionContext ctx = (ConnectionContext) link.getSession().getConnection().getContext();
+ try
+ {
+ ctx.getConnector().process();
+ } catch (IOException e) {
+ _logger.log(Level.SEVERE, "Error processing connection", e);
+ }
}
- int batch = (_credit < linkCt) ? 1 : (_credit/linkCt);
- for (Connector<?> c : _driver.connectors())
+ if (_blocked.isEmpty())
{
- if (c.isClosed()) continue;
- Connection connection = c.getConnection();
- for (Link link : new Links(connection, ACTIVE, ANY))
+ _next_drain = 0;
+ }
+ else
+ {
+ // not enough credit for all links - start draining granted credit
+ if (_draining == 0)
{
- if (link instanceof Receiver)
+ // don't do it too often - pace ourselves (it's expensive)
+ if (_next_drain == 0)
{
- int have = ((Receiver) link).getCredit();
- if (have < batch)
+ _next_drain = System.currentTimeMillis() + 250;
+ }
+ else if (_next_drain <= System.currentTimeMillis())
+ {
+ // initiate drain, free up at most enough to satisfy blocked
+ _next_drain = 0;
+ int needed = _blocked.size() * batch;
+
+ for (Receiver link : _credited)
{
- int need = batch - have;
- int amount = (_credit < need) ? _credit : need;
- ((Receiver) link).flow(amount);
- _credit -= amount;
- _distributed += amount;
- if (_credit == 0) return;
+ if (!link.getDrain()) {
+ link.setDrain(true);
+ needed -= link.getRemoteCredit();
+ _draining++;
+ // drain requested on link, must process it
+ ConnectionContext ctx = (ConnectionContext) link.getSession().getConnection().getContext();
+ try
+ {
+ ctx.getConnector().process();
+ } catch (IOException e) {
+ _logger.log(Level.SEVERE, "Error processing connection", e);
+ }
+ if (needed <= 0) break;
+ }
}
}
}
@@ -1058,7 +1202,7 @@ public class MessengerImpl implements Me
connection = Proton.connection();
connection.setContainer(_name);
connection.setHostname(host);
- connection.setContext(service);
+ connection.setContext(new ConnectionContext(service, connector));
connector.setConnection(connection);
Sasl sasl = connector.sasl();
if (sasl != null)
@@ -1077,6 +1221,7 @@ public class MessengerImpl implements Me
Session session = connection.session();
session.open();
C link = finder.create(session);
+ linkAdded(link);
link.open();
return link;
}
@@ -1232,4 +1377,97 @@ public class MessengerImpl implements Me
return builder.toString();
}
+ // compute the maximum amount of credit each receiving link is
+ // entitled to. The actual credit given to the link depends on
+ // what amount of credit is actually available.
+ private int perLinkCredit()
+ {
+ if (_receivers == 0) return 0;
+ int total = _credit + _distributed;
+ return Math.max(total/_receivers, 1);
+ }
+
+ // a new link has been created, account for it.
+ private void linkAdded(Link link)
+ {
+ if (link instanceof Receiver)
+ {
+ _receivers++;
+ _blocked.add((Receiver)link);
+ }
+ }
+
+ // a link is being removed, account for it.
+ private void linkRemoved(Link _link)
+ {
+ if (_link instanceof Receiver)
+ {
+ Receiver link = (Receiver)_link;
+ assert _receivers > 0;
+ _receivers--;
+ if (link.getDrain())
+ {
+ link.setDrain(false);
+ assert _draining > 0;
+ _draining--;
+ }
+ if (_blocked.contains(link))
+ _blocked.remove(link);
+ else if (_credited.contains(link))
+ _credited.remove(link);
+ else
+ assert(false);
+ }
+ }
+
+ private class ConnectionContext
+ {
+ private String _service;
+ private Connector _connector;
+
+ public ConnectionContext(String service, Connector connector)
+ {
+ _service = service;
+ _connector = connector;
+ }
+
+ public String getService()
+ {
+ return _service;
+ }
+
+ public Connector getConnector()
+ {
+ return _connector;
+ }
+ }
+
+ private class ListenerContext
+ {
+ private String _host;
+ private String _port;
+ private String _service; // for now. move to subscription later
+
+ public ListenerContext(String service, String host, String port)
+ {
+ _service = service;
+ _host = host;
+ _port = port;
+ }
+
+ public String getService()
+ {
+ return _service;
+ }
+
+ public String getHost()
+ {
+ return _host;
+ }
+
+ public String getPort()
+ {
+ return _port;
+ }
+ }
}
Modified: qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/messenger/impl/StoreEntry.java
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/messenger/impl/StoreEntry.java?rev=1547534&r1=1547533&r2=1547534&view=diff
==============================================================================
--- qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/messenger/impl/StoreEntry.java (original)
+++ qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/messenger/impl/StoreEntry.java Tue Dec 3 19:16:11 2013
@@ -123,6 +123,10 @@ class StoreEntry
return _status;
}
+ public void setStatus(Status status)
+ {
+ _status = status;
+ }
private static Status _disp2status(DeliveryState disp)
{
Modified: qpid/proton/trunk/tests/python/proton_tests/messenger.py
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/tests/python/proton_tests/messenger.py?rev=1547534&r1=1547533&r2=1547534&view=diff
==============================================================================
--- qpid/proton/trunk/tests/python/proton_tests/messenger.py (original)
+++ qpid/proton/trunk/tests/python/proton_tests/messenger.py Tue Dec 3 19:16:11 2013
@@ -6,9 +6,9 @@
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
-#
+#
# http://www.apache.org/licenses/LICENSE-2.0
-#
+#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -640,9 +640,6 @@ class MessengerTest(Test):
""" The server is given a fixed amount of credit, and runs until that
credit is exhausted.
"""
- if sys.platform.startswith("java"):
- raise Skipped("Skipping testCreditBlockingRebalance - credit scheduler TBD for Java Messenger")
-
self.server_finite_credit = True
self.server_credit = 11
self.start()
@@ -794,9 +791,6 @@ class NBMessengerTest(common.Test):
""" Verify that a fixed amount of credit will redistribute to new
links.
"""
- if sys.platform.startswith("java"):
- raise Skipped("Skipping testCreditRedistribution - credit scheduler TBD for Java Messenger")
-
self.server.recv( 5 )
# first link will get all credit
@@ -829,9 +823,6 @@ class NBMessengerTest(common.Test):
""" Verify that credit is reclaimed when a link with outstanding credit is
torn down.
"""
- if sys.platform.startswith("java"):
- raise Skipped("Skipping testCreditReclaim - credit scheduler TBD for Java Messenger")
-
self.server.recv( 9 )
# first link will get all credit
@@ -893,15 +884,10 @@ class NBMessengerTest(common.Test):
assert self.server.incoming == 9, self.server.incoming
assert self.server.receiving == 0, self.server.receiving
-
-
def testCreditReplenish(self):
""" When extra credit is available it should be granted to the first
link that can use it.
"""
- if sys.platform.startswith("java"):
- raise Skipped("Skipping testCreditReplenish - credit scheduler TBD for Java Messenger")
-
# create three links
msg = Message()
for i in range(3):
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org