You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rh...@apache.org on 2013/07/11 22:11:13 UTC
svn commit: r1502346 - in /qpid/proton/trunk: examples/messenger/c/
proton-c/bindings/java/src/main/java/org/apache/qpid/proton/messenger/jni/
proton-c/bindings/perl/lib/qpid/proton/ proton-c/bindings/php/
proton-c/bindings/python/ proton-c/bindings/ru...
Author: rhs
Date: Thu Jul 11 20:11:12 2013
New Revision: 1502346
URL: http://svn.apache.org/r1502346
Log:
Added several related messenger features: non blocking mode, pn_messenger_work, pn_messenger_interrupt, optional arg to pn_messenger_send. Also added basic non blocking smoke test for messenger in ruby and python. These address PROTON-231.
Added:
qpid/proton/trunk/proton-j/proton-api/src/main/java/org/apache/qpid/proton/InterruptException.java
qpid/proton/trunk/tests/ruby/proton_tests/smoke.rb
Modified:
qpid/proton/trunk/examples/messenger/c/send.c
qpid/proton/trunk/proton-c/bindings/java/src/main/java/org/apache/qpid/proton/messenger/jni/JNIMessenger.java
qpid/proton/trunk/proton-c/bindings/perl/lib/qpid/proton/Messenger.pm
qpid/proton/trunk/proton-c/bindings/php/proton.php
qpid/proton/trunk/proton-c/bindings/python/proton.py
qpid/proton/trunk/proton-c/bindings/ruby/lib/qpid_proton/exception_handling.rb
qpid/proton/trunk/proton-c/bindings/ruby/lib/qpid_proton/exceptions.rb
qpid/proton/trunk/proton-c/bindings/ruby/lib/qpid_proton/messenger.rb
qpid/proton/trunk/proton-c/include/proton/error.h
qpid/proton/trunk/proton-c/include/proton/messenger.h
qpid/proton/trunk/proton-c/src/messenger/messenger.c
qpid/proton/trunk/proton-c/src/posix/driver.c
qpid/proton/trunk/proton-j/proton-api/src/main/java/org/apache/qpid/proton/driver/Driver.java
qpid/proton/trunk/proton-j/proton-api/src/main/java/org/apache/qpid/proton/messenger/Messenger.java
qpid/proton/trunk/proton-j/proton-api/src/main/resources/proton.py
qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/driver/impl/DriverImpl.java
qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/messenger/impl/MessengerImpl.java
qpid/proton/trunk/tests/python/proton_tests/messenger.py
qpid/proton/trunk/tests/ruby/proton-test
qpid/proton/trunk/tests/tools/apps/c/msgr-recv.c
qpid/proton/trunk/tests/tools/apps/c/msgr-send.c
Modified: qpid/proton/trunk/examples/messenger/c/send.c
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/examples/messenger/c/send.c?rev=1502346&r1=1502345&r2=1502346&view=diff
==============================================================================
--- qpid/proton/trunk/examples/messenger/c/send.c (original)
+++ qpid/proton/trunk/examples/messenger/c/send.c Thu Jul 11 20:11:12 2013
@@ -101,7 +101,7 @@ int main(int argc, char** argv)
pn_data_put_string(body, pn_bytes(strlen(msgtext), msgtext));
pn_messenger_put(messenger, message);
check(messenger);
- pn_messenger_send(messenger);
+ pn_messenger_send(messenger, -1);
check(messenger);
pn_messenger_stop(messenger);
Modified: qpid/proton/trunk/proton-c/bindings/java/src/main/java/org/apache/qpid/proton/messenger/jni/JNIMessenger.java
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/bindings/java/src/main/java/org/apache/qpid/proton/messenger/jni/JNIMessenger.java?rev=1502346&r1=1502345&r2=1502346&view=diff
==============================================================================
--- qpid/proton/trunk/proton-c/bindings/java/src/main/java/org/apache/qpid/proton/messenger/jni/JNIMessenger.java (original)
+++ qpid/proton/trunk/proton-c/bindings/java/src/main/java/org/apache/qpid/proton/messenger/jni/JNIMessenger.java Thu Jul 11 20:11:12 2013
@@ -81,7 +81,13 @@ class JNIMessenger implements Messenger
@Override
public void send() throws TimeoutException
{
- int err = Proton.pn_messenger_send(_impl);
+ send(-1);
+ }
+
+ @Override
+ public void send(int n) throws TimeoutException
+ {
+ int err = Proton.pn_messenger_send(_impl, n);
check(err);
}
@@ -123,6 +129,29 @@ class JNIMessenger implements Messenger
}
@Override
+ public boolean stopped()
+ {
+ return Proton.pn_messenger_stopped(_impl);
+ }
+
+ @Override
+ public boolean work(long timeout)
+ {
+ int err = Proton.pn_messenger_work(_impl, (int) timeout);
+ if (err == Proton.PN_TIMEOUT)
+ return false;
+ check(err);
+ return true;
+ }
+
+ @Override
+ public void interrupt()
+ {
+ int err = Proton.pn_messenger_interrupt(_impl);
+ check(err);
+ }
+
+ @Override
public void setTimeout(final long timeInMillis)
{
int err = Proton.pn_messenger_set_timeout(_impl, (int) timeInMillis);
@@ -136,6 +165,19 @@ class JNIMessenger implements Messenger
}
@Override
+ public boolean isBlocking()
+ {
+ return Proton.pn_messenger_is_blocking(_impl);
+ }
+
+ @Override
+ public void setBlocking(boolean b)
+ {
+ int err = Proton.pn_messenger_set_blocking(_impl, b);
+ check(err);
+ }
+
+ @Override
public int outgoing()
{
return Proton.pn_messenger_outgoing(_impl);
@@ -232,7 +274,7 @@ class JNIMessenger implements Messenger
private void check(int errorCode) throws ProtonException
{
- if(errorCode != 0)
+ if(errorCode != 0 && errorCode != Proton.PN_INPROGRESS)
{
String errorMessage = Proton.pn_messenger_error(_impl);
if(errorCode == Proton.PN_TIMEOUT)
Modified: qpid/proton/trunk/proton-c/bindings/perl/lib/qpid/proton/Messenger.pm
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/bindings/perl/lib/qpid/proton/Messenger.pm?rev=1502346&r1=1502345&r2=1502346&view=diff
==============================================================================
--- qpid/proton/trunk/proton-c/bindings/perl/lib/qpid/proton/Messenger.pm (original)
+++ qpid/proton/trunk/proton-c/bindings/perl/lib/qpid/proton/Messenger.pm Thu Jul 11 20:11:12 2013
@@ -164,7 +164,7 @@ sub put {
sub send {
my ($self) = @_;
- cproton_perl::pn_messenger_send($self->{_impl});
+ cproton_perl::pn_messenger_send($self->{_impl}, $_[1]);
}
sub get {
Modified: qpid/proton/trunk/proton-c/bindings/php/proton.php
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/bindings/php/proton.php?rev=1502346&r1=1502345&r2=1502346&view=diff
==============================================================================
--- qpid/proton/trunk/proton-c/bindings/php/proton.php (original)
+++ qpid/proton/trunk/proton-c/bindings/php/proton.php Thu Jul 11 20:11:12 2013
@@ -127,11 +127,11 @@ class Messenger
$this->_check(pn_messenger_put($this->impl, $message->impl));
}
- public function send() {
- $this->_check(pn_messenger_send($this->impl));
+ public function send($n = -1) {
+ $this->_check(pn_messenger_send($this->impl, $n));
}
- public function recv($n) {
+ public function recv($n = -1) {
$this->_check(pn_messenger_recv($this->impl, $n));
}
Modified: qpid/proton/trunk/proton-c/bindings/python/proton.py
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/bindings/python/proton.py?rev=1502346&r1=1502345&r2=1502346&view=diff
==============================================================================
--- qpid/proton/trunk/proton-c/bindings/python/proton.py (original)
+++ qpid/proton/trunk/proton-c/bindings/python/proton.py Thu Jul 11 20:11:12 2013
@@ -113,6 +113,12 @@ class Timeout(ProtonException):
"""
pass
+class Interrupt(ProtonException):
+ """
+ An interrupt exception indicaes that a blocking operation was interrupted.
+ """
+ pass
+
class MessengerException(ProtonException):
"""
The root of the messenger exception hierarchy. All exceptions
@@ -129,7 +135,8 @@ class MessageException(ProtonException):
pass
EXCEPTIONS = {
- PN_TIMEOUT: Timeout
+ PN_TIMEOUT: Timeout,
+ PN_INTR: Interrupt
}
PENDING = Constant("PENDING")
@@ -225,6 +232,8 @@ class Messenger(object):
def _check(self, err):
if err < 0:
+ if (err == PN_INPROGRESS):
+ return
exc = EXCEPTIONS.get(err, MessengerException)
raise exc("[%s]: %s" % (err, pn_messenger_error(self._mng)))
else:
@@ -306,6 +315,14 @@ The timeout property contains the defaul
operations performed by the L{Messenger}.
""")
+ def _is_blocking(self):
+ return pn_messenger_is_blocking(self._mng)
+
+ def _set_blocking(self, b):
+ self._check(pn_messenger_set_blocking(self._mng, b))
+
+ blocking = property(_is_blocking, _set_blocking)
+
def _get_incoming_window(self):
return pn_messenger_get_incoming_window(self._mng)
@@ -352,6 +369,10 @@ send. Defaults to zero.
"""
self._check(pn_messenger_stop(self._mng))
+ @property
+ def stopped(self):
+ return pn_messenger_stopped(self._mng)
+
def subscribe(self, source):
"""
Subscribes the L{Messenger} to messages originating from the
@@ -410,13 +431,13 @@ send. Defaults to zero.
flags = 0
self._check(pn_messenger_settle(self._mng, tracker, flags))
- def send(self):
+ def send(self, n=-1):
"""
Blocks until the outgoing queue is empty or the operation times
out. The L{timeout} property controls how long a L{Messenger} will
block before timing out.
"""
- self._check(pn_messenger_send(self._mng))
+ self._check(pn_messenger_send(self._mng, n))
def recv(self, n=None):
"""
@@ -429,6 +450,17 @@ send. Defaults to zero.
n = -1
self._check(pn_messenger_recv(self._mng, n))
+ def work(self, timeout=-1):
+ err = pn_messenger_work(self._mng, timeout)
+ if (err == PN_TIMEOUT):
+ return False
+ else:
+ self._check(err)
+ return True
+
+ def interrupt(self):
+ self._check(pn_messenger_interrupt(self._mng))
+
def get(self, message=None):
"""
Moves the message from the head of the incoming message queue into
@@ -2861,6 +2893,7 @@ __all__ = [
"SSLUnavailable",
"Terminus",
"Timeout",
+ "Interrupt",
"Transport",
"TransportException",
"char",
Modified: qpid/proton/trunk/proton-c/bindings/ruby/lib/qpid_proton/exception_handling.rb
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/bindings/ruby/lib/qpid_proton/exception_handling.rb?rev=1502346&r1=1502345&r2=1502346&view=diff
==============================================================================
--- qpid/proton/trunk/proton-c/bindings/ruby/lib/qpid_proton/exception_handling.rb (original)
+++ qpid/proton/trunk/proton-c/bindings/ruby/lib/qpid_proton/exception_handling.rb Thu Jul 11 20:11:12 2013
@@ -54,6 +54,9 @@ module Qpid
when Qpid::Proton::Error::TIMEOUT
raise Qpid::Proton::TimeoutError.new(self.error)
+ when Qpid::Proton::Error::INPROGRESS
+ return
+
else
raise ::ArgumentError.new("Unknown error code: #{code}")
Modified: qpid/proton/trunk/proton-c/bindings/ruby/lib/qpid_proton/exceptions.rb
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/bindings/ruby/lib/qpid_proton/exceptions.rb?rev=1502346&r1=1502345&r2=1502346&view=diff
==============================================================================
--- qpid/proton/trunk/proton-c/bindings/ruby/lib/qpid_proton/exceptions.rb (original)
+++ qpid/proton/trunk/proton-c/bindings/ruby/lib/qpid_proton/exceptions.rb Thu Jul 11 20:11:12 2013
@@ -31,6 +31,7 @@ module Qpid
STATE = Cproton::PN_STATE_ERR
ARGUMENT = Cproton::PN_ARG_ERR
TIMEOUT = Cproton::PN_TIMEOUT
+ INPROGRESS = Cproton::PN_INPROGRESS
end
Modified: qpid/proton/trunk/proton-c/bindings/ruby/lib/qpid_proton/messenger.rb
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/bindings/ruby/lib/qpid_proton/messenger.rb?rev=1502346&r1=1502345&r2=1502346&view=diff
==============================================================================
--- qpid/proton/trunk/proton-c/bindings/ruby/lib/qpid_proton/messenger.rb (original)
+++ qpid/proton/trunk/proton-c/bindings/ruby/lib/qpid_proton/messenger.rb Thu Jul 11 20:11:12 2013
@@ -46,7 +46,6 @@ module Qpid
def self.finalize!(impl) # :nodoc:
proc {
- Cproton.pn_messenger_stop(impl)
Cproton.pn_messenger_free(impl)
}
end
@@ -76,6 +75,14 @@ module Qpid
Cproton.pn_messenger_get_timeout(@impl)
end
+ def blocking
+ Cproton.pn_mesenger_is_blocking(@impl)
+ end
+
+ def blocking=(blocking)
+ Cproton.pn_messenger_set_blocking(@impl, blocking)
+ end
+
# Reports whether an error occurred.
#
def error?
@@ -108,6 +115,10 @@ module Qpid
check_for_error(Cproton.pn_messenger_stop(@impl))
end
+ def stopped
+ Cproton.pn_messenger_stopped(@impl)
+ end
+
# Subscribes the +Messenger+ to a remote address.
#
def subscribe(address)
@@ -191,8 +202,8 @@ module Qpid
# Sends all outgoing messages, blocking until the outgoing queue
# is empty.
#
- def send
- check_for_error(Cproton.pn_messenger_send(@impl))
+ def send(n = -1)
+ check_for_error(Cproton.pn_messenger_send(@impl, n))
end
# Gets a single message incoming message from the local queue.
@@ -215,12 +226,24 @@ module Qpid
#
# Options ====
#
- # * max - the maximum number of messages to receive
+ # * limit - the maximum number of messages to receive
#
- def receive(max)
- raise TypeError.new("invalid max: #{max}") if max.nil? || max.to_i.zero?
- raise RangeError.new("negative max: #{max}") if max < 0
- check_for_error(Cproton.pn_messenger_recv(@impl, max))
+ def receive(limit=-1)
+ check_for_error(Cproton.pn_messenger_recv(@impl, limit))
+ end
+
+ def receiving
+ Cproton.pn_messenger_receiving(@impl)
+ end
+
+ def work(timeout=-1)
+ err = Cproton.pn_messenger_work(@impl, timeout)
+ if (err == Cproton::PN_TIMEOUT) then
+ return false
+ else
+ check_for_error(err)
+ return true
+ end
end
# Returns the number messages in the outgoing queue that have not been
Modified: qpid/proton/trunk/proton-c/include/proton/error.h
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/include/proton/error.h?rev=1502346&r1=1502345&r2=1502346&view=diff
==============================================================================
--- qpid/proton/trunk/proton-c/include/proton/error.h (original)
+++ qpid/proton/trunk/proton-c/include/proton/error.h Thu Jul 11 20:11:12 2013
@@ -39,6 +39,7 @@ typedef struct pn_error_t pn_error_t;
#define PN_ARG_ERR (-6)
#define PN_TIMEOUT (-7)
#define PN_INTR (-8)
+#define PN_INPROGRESS (-9)
PN_EXTERN const char *pn_code(int code);
Modified: qpid/proton/trunk/proton-c/include/proton/messenger.h
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/include/proton/messenger.h?rev=1502346&r1=1502345&r2=1502346&view=diff
==============================================================================
--- qpid/proton/trunk/proton-c/include/proton/messenger.h (original)
+++ qpid/proton/trunk/proton-c/include/proton/messenger.h Thu Jul 11 20:11:12 2013
@@ -148,6 +148,9 @@ PN_EXTERN int pn_messenger_set_timeout(p
*/
PN_EXTERN int pn_messenger_get_timeout(pn_messenger_t *messenger);
+PN_EXTERN bool pn_messenger_is_blocking(pn_messenger_t *messenger);
+PN_EXTERN int pn_messenger_set_blocking(pn_messenger_t *messenger, bool blocking);
+
/** Frees a Messenger.
*
* @param[in] messenger the messenger to free, no longer valid on
@@ -238,6 +241,8 @@ PN_EXTERN int pn_messenger_start(pn_mess
*/
PN_EXTERN int pn_messenger_stop(pn_messenger_t *messenger);
+PN_EXTERN bool pn_messenger_stopped(pn_messenger_t *messenger);
+
/** Subscribes a messenger to messages from the specified source.
*
* @param[in] messenger the messenger to subscribe
@@ -294,29 +299,60 @@ PN_EXTERN int pn_messenger_settle(pn_mes
*/
PN_EXTERN pn_tracker_t pn_messenger_outgoing_tracker(pn_messenger_t *messenger);
-/** Sends any messages in the outgoing message queue for a messenger.
- * This will block until the messages have been sent.
+/** Sends or receives any outstanding messages queued for a messenger.
+ * This will block for the indicated timeout.
+ *
+ * @param[in] messenger the Messenger
+ * @param[in] timeout the maximum time to block
+ */
+PN_EXTERN int pn_messenger_work(pn_messenger_t *messenger, int timeout);
+
+/** Interrupts a messenger that is blocking. This method may be safely
+ * called from a different thread than the one that is blocking.
+ *
+ * @param[in] messenger the Messenger
+ */
+PN_EXTERN int pn_messenger_interrupt(pn_messenger_t *messenger);
+
+/** Sends messages in the outgoing message queue for a messenger. This
+ * call will block until the indicated number of messages have been
+ * sent. If n is -1 this call will block until all outgoing messages
+ * have been sent. If n is 0 then this call won't block.
*
* @param[in] messenger the messager
+ * @param[in] n the number of messages to send
*
* @return an error code or zero on success
* @see error.h
*/
-PN_EXTERN int pn_messenger_send(pn_messenger_t *messenger);
+PN_EXTERN int pn_messenger_send(pn_messenger_t *messenger, int n);
-/** Receives up to n messages into the incoming message queue of a
- * messenger. If n is -1, Messenger will be able to receive as many
- * messages as it can buffer internally. Blocks until at least one
- * message is available in the incoming queue.
+/** Instructs the messenger to receives up to limit messages into the
+ * incoming message queue of a messenger. If limit is -1, Messenger
+ * will receive as many messages as it can buffer internally. If the
+ * messenger is in blocking mode, this call will block until at least
+ * one message is available in the incoming queue.
+ *
+ * Each call to pn_messenger_recv replaces the previos receive
+ * operation, so pn_messenger_recv(messenger, 0) will cancel any
+ * outstanding receive.
*
* @param[in] messenger the messenger
- * @param[in] n the maximum number of messages to receive or -1 to to
- * receive as many messages as it can buffer internally.
+ * @param[in] limit the maximum number of messages to receive or -1 to
+ * to receive as many messages as it can buffer
+ * internally.
*
* @return an error code or zero on success
* @see error.h
*/
-PN_EXTERN int pn_messenger_recv(pn_messenger_t *messenger, int n);
+PN_EXTERN int pn_messenger_recv(pn_messenger_t *messenger, int limit);
+
+/** Returns the number of messages currently being received by a
+ * messenger.
+ *
+ * @param[in] messenger the messenger
+ */
+PN_EXTERN int pn_messenger_receiving(pn_messenger_t *messenger);
/** Gets a message from the head of the incoming message queue of a
* messenger.
Modified: qpid/proton/trunk/proton-c/src/messenger/messenger.c
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/src/messenger/messenger.c?rev=1502346&r1=1502345&r2=1502346&view=diff
==============================================================================
--- qpid/proton/trunk/proton-c/src/messenger/messenger.c (original)
+++ qpid/proton/trunk/proton-c/src/messenger/messenger.c Thu Jul 11 20:11:12 2013
@@ -53,8 +53,10 @@ struct pn_messenger_t {
char *password;
char *trusted_certificates;
int timeout;
+ bool blocking;
pn_driver_t *driver;
- bool unlimited_credit;
+ int send_threshold;
+ int receiving;
int credit_batch;
int credit;
int distributed;
@@ -73,6 +75,7 @@ struct pn_messenger_t {
pn_tracker_t incoming_tracker;
pn_string_t *original;
pn_string_t *rewritten;
+ bool worked;
};
struct pn_subscription_t {
@@ -182,8 +185,9 @@ pn_messenger_t *pn_messenger(const char
m->password = NULL;
m->trusted_certificates = NULL;
m->timeout = -1;
+ m->blocking = true;
m->driver = pn_driver();
- m->unlimited_credit = false;
+ m->receiving = 0;
m->credit_batch = 10;
m->credit = 0;
m->distributed = 0;
@@ -272,6 +276,18 @@ int pn_messenger_get_timeout(pn_messenge
return messenger ? messenger->timeout : 0;
}
+bool pn_messenger_is_blocking(pn_messenger_t *messenger)
+{
+ assert(messenger);
+ return messenger->blocking;
+}
+
+int pn_messenger_set_blocking(pn_messenger_t *messenger, bool blocking)
+{
+ messenger->blocking = blocking;
+ return 0;
+}
+
static void pni_driver_reclaim(pn_driver_t *driver)
{
pn_listener_t *l = pn_listener_head(driver);
@@ -342,8 +358,13 @@ void pn_messenger_flow(pn_messenger_t *m
if (link_ct == 0) return;
- if (messenger->unlimited_credit)
+ if (messenger->receiving == -1) {
messenger->credit = link_ct * messenger->credit_batch;
+ } else {
+ int total = messenger->credit + messenger->distributed;
+ if (messenger->receiving > total)
+ messenger->credit += (messenger->receiving - total);
+ }
int batch = (messenger->credit < link_ct) ? 1
: (messenger->credit/link_ct);
@@ -619,11 +640,11 @@ int pn_messenger_tsync(pn_messenger_t *m
if (pred || (timeout >= 0 && remaining < 0)) break;
int error = pn_driver_wait(messenger->driver, remaining);
- if (error)
- return error;
+ if (error && error != PN_INTR) return error;
pn_listener_t *l;
while ((l = pn_driver_listener(messenger->driver))) {
+ messenger->worked = true;
pn_listener_ctx_t *ctx = (pn_listener_ctx_t *) pn_listener_context(l);
pn_subscription_t *sub = ctx->subscription;
char *scheme = sub->scheme;
@@ -654,6 +675,7 @@ int pn_messenger_tsync(pn_messenger_t *m
pn_connector_t *c;
while ((c = pn_driver_connector(messenger->driver))) {
+ messenger->worked = true;
pn_connector_process(c);
pn_connection_t *conn = pn_connector_connection(c);
pn_messenger_endpoints(messenger, conn, c);
@@ -673,6 +695,10 @@ int pn_messenger_tsync(pn_messenger_t *m
if (timeout >= 0) {
now = pn_i_now();
}
+
+ if (error == PN_INTR) {
+ return pred ? 0 : PN_INTR;
+ }
}
return pred ? 0 : PN_TIMEOUT;
@@ -680,7 +706,16 @@ int pn_messenger_tsync(pn_messenger_t *m
int pn_messenger_sync(pn_messenger_t *messenger, bool (*predicate)(pn_messenger_t *))
{
- return pn_messenger_tsync(messenger, predicate, messenger->timeout);
+ if (messenger->blocking) {
+ return pn_messenger_tsync(messenger, predicate, messenger->timeout);
+ } else {
+ int err = pn_messenger_tsync(messenger, predicate, 0);
+ if (err == PN_TIMEOUT) {
+ return PN_INPROGRESS;
+ } else {
+ return err;
+ }
+ }
}
int pn_messenger_start(pn_messenger_t *messenger)
@@ -1000,8 +1035,6 @@ static void outward_munge(pn_messenger_t
if (heapbuf) free (heapbuf);
}
-// static bool false_pred(pn_messenger_t *messenger) { return false; }
-
int pni_pump_out(pn_messenger_t *messenger, const char *address, pn_link_t *sender)
{
pni_entry_t *entry = pni_store_get(messenger->outgoing, address);
@@ -1026,8 +1059,6 @@ int pni_pump_out(pn_messenger_t *messeng
} else {
pn_link_advance(sender);
pni_entry_free(entry);
- // XXX: doing this every time is slow, need to be smarter
- //pn_messenger_tsync(messenger, false_pred, 0);
return 0;
}
}
@@ -1150,7 +1181,7 @@ int pn_messenger_settle(pn_messenger_t *
// true if all pending output has been sent to peer
bool pn_messenger_sent(pn_messenger_t *messenger)
{
- if (pni_store_size(messenger->outgoing) > 0) return false;
+ int total = pni_store_size(messenger->outgoing);
pn_connector_t *ctor = pn_connector_head(messenger->driver);
while (ctor) {
@@ -1169,14 +1200,12 @@ bool pn_messenger_sent(pn_messenger_t *m
pn_link_t *link = pn_link_head(conn, PN_LOCAL_ACTIVE);
while (link) {
if (pn_link_is_sender(link)) {
- if (pn_link_queued(link)) {
- return false;
- }
+ total += pn_link_queued(link);
pn_delivery_t *d = pn_unsettled_head(link);
while (d) {
if (!pn_delivery_remote_state(d) && !pn_delivery_settled(d)) {
- return false;
+ total++;
}
d = pn_unsettled_next(d);
}
@@ -1187,7 +1216,7 @@ bool pn_messenger_sent(pn_messenger_t *m
ctor = pn_connector_next(ctor);
}
- return true;
+ return total <= messenger->send_threshold;
}
bool pn_messenger_rcvd(pn_messenger_t *messenger)
@@ -1215,8 +1244,35 @@ bool pn_messenger_rcvd(pn_messenger_t *m
}
}
-int pn_messenger_send(pn_messenger_t *messenger)
+static bool work_pred(pn_messenger_t *messenger) {
+ return messenger->worked;
+}
+
+int pn_messenger_work(pn_messenger_t *messenger, int timeout)
{
+ messenger->worked = false;
+ return pn_messenger_tsync(messenger, work_pred, timeout);
+}
+
+int pn_messenger_interrupt(pn_messenger_t *messenger)
+{
+ assert(messenger);
+ if (messenger->driver) {
+ return pn_driver_wakeup(messenger->driver);
+ } else {
+ return 0;
+ }
+}
+
+int pn_messenger_send(pn_messenger_t *messenger, int n)
+{
+ if (n == -1) {
+ messenger->send_threshold = 0;
+ } else {
+ messenger->send_threshold = pn_messenger_outgoing(messenger) - n;
+ if (messenger->send_threshold < 0)
+ messenger->send_threshold = 0;
+ }
return pn_messenger_sync(messenger, pn_messenger_sent);
}
@@ -1225,14 +1281,7 @@ int pn_messenger_recv(pn_messenger_t *me
if (!messenger) return PN_ARG_ERR;
if (!pn_listener_head(messenger->driver) && !pn_connector_head(messenger->driver))
return pn_error_format(messenger->error, PN_STATE_ERR, "no valid sources");
- int total = messenger->credit + messenger->distributed;
- if (n == -1) {
- messenger->unlimited_credit = true;
- } else {
- messenger->unlimited_credit = false;
- if (n > total)
- messenger->credit += (n - total);
- }
+ messenger->receiving = n;
pn_messenger_flow(messenger);
int err = pn_messenger_sync(messenger, pn_messenger_rcvd);
if (err) return err;
@@ -1245,6 +1294,12 @@ int pn_messenger_recv(pn_messenger_t *me
}
}
+int pn_messenger_receiving(pn_messenger_t *messenger)
+{
+ assert(messenger);
+ return messenger->receiving;
+}
+
int pn_messenger_get(pn_messenger_t *messenger, pn_message_t *msg)
{
if (!messenger) return PN_ARG_ERR;
Modified: qpid/proton/trunk/proton-c/src/posix/driver.c
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/src/posix/driver.c?rev=1502346&r1=1502345&r2=1502346&view=diff
==============================================================================
--- qpid/proton/trunk/proton-c/src/posix/driver.c (original)
+++ qpid/proton/trunk/proton-c/src/posix/driver.c Thu Jul 11 20:11:12 2013
@@ -766,9 +766,11 @@ int pn_driver_wait_2(pn_driver_t *d, int
return result;
}
-void pn_driver_wait_3(pn_driver_t *d)
+int pn_driver_wait_3(pn_driver_t *d)
{
+ bool woken = false;
if (d->fds[0].revents & POLLIN) {
+ woken = true;
//clear the pipe
char buffer[512];
while (read(d->ctrl[0], buffer, 512) == 512);
@@ -800,6 +802,8 @@ void pn_driver_wait_3(pn_driver_t *d)
d->listener_next = d->listener_head;
d->connector_next = d->connector_head;
+
+ return woken ? PN_INTR : 0;
}
//
@@ -818,8 +822,7 @@ int pn_driver_wait(pn_driver_t *d, int t
int result = pn_driver_wait_2(d, timeout);
if (result == -1)
return pn_error_code(d->error);
- pn_driver_wait_3(d);
- return 0;
+ return pn_driver_wait_3(d);
}
pn_listener_t *pn_driver_listener(pn_driver_t *d) {
Added: qpid/proton/trunk/proton-j/proton-api/src/main/java/org/apache/qpid/proton/InterruptException.java
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-j/proton-api/src/main/java/org/apache/qpid/proton/InterruptException.java?rev=1502346&view=auto
==============================================================================
--- qpid/proton/trunk/proton-j/proton-api/src/main/java/org/apache/qpid/proton/InterruptException.java (added)
+++ qpid/proton/trunk/proton-j/proton-api/src/main/java/org/apache/qpid/proton/InterruptException.java Thu Jul 11 20:11:12 2013
@@ -0,0 +1,44 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.proton;
+
+public class InterruptException extends ProtonException
+{
+ public InterruptException()
+ {
+ }
+
+ public InterruptException(String message)
+ {
+ super(message);
+ }
+
+ public InterruptException(String message, Throwable cause)
+ {
+ super(message, cause);
+ }
+
+ public InterruptException(Throwable cause)
+ {
+ super(cause);
+ }
+
+}
Modified: qpid/proton/trunk/proton-j/proton-api/src/main/java/org/apache/qpid/proton/driver/Driver.java
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-j/proton-api/src/main/java/org/apache/qpid/proton/driver/Driver.java?rev=1502346&r1=1502345&r2=1502346&view=diff
==============================================================================
--- qpid/proton/trunk/proton-j/proton-api/src/main/java/org/apache/qpid/proton/driver/Driver.java (original)
+++ qpid/proton/trunk/proton-j/proton-api/src/main/java/org/apache/qpid/proton/driver/Driver.java Thu Jul 11 20:11:12 2013
@@ -56,8 +56,10 @@ public interface Driver
* Thread-safe.
*
* @param timeout maximum time in milliseconds to wait. 0 means wait indefinitely.
+ *
+ * @param returns true if woken up
*/
- void doWait(long timeout);
+ boolean doWait(long timeout);
/**
* Get the next listener with pending data in the driver.
Modified: qpid/proton/trunk/proton-j/proton-api/src/main/java/org/apache/qpid/proton/messenger/Messenger.java
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-j/proton-api/src/main/java/org/apache/qpid/proton/messenger/Messenger.java?rev=1502346&r1=1502345&r2=1502346&view=diff
==============================================================================
--- qpid/proton/trunk/proton-j/proton-api/src/main/java/org/apache/qpid/proton/messenger/Messenger.java (original)
+++ qpid/proton/trunk/proton-j/proton-api/src/main/java/org/apache/qpid/proton/messenger/Messenger.java Thu Jul 11 20:11:12 2013
@@ -82,6 +82,7 @@ public interface Messenger
* to cause the messages to actually be sent out.
*/
void put(Message message) throws MessengerException;
+
/**
* Blocks until the outgoing queue is empty and, in the event that
* an outgoing window has been set, until the messages in that
@@ -91,6 +92,8 @@ public interface Messenger
*/
void send() throws TimeoutException;
+ void send(int n) throws TimeoutException;
+
/**
* Subscribes the Messenger to messages originating from the
* specified source. The source is an address as specified in the
@@ -131,9 +134,18 @@ public interface Messenger
*/
void stop();
+ boolean stopped();
+
+ boolean work(long timeout);
+
+ void interrupt();
+
void setTimeout(long timeInMillis);
long getTimeout();
+ boolean isBlocking();
+ void setBlocking(boolean b);
+
/**
* Returns a count of the messages currently on the outgoing queue
* (i.e. those that have been put() but not yet actually sent
Modified: qpid/proton/trunk/proton-j/proton-api/src/main/resources/proton.py
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-j/proton-api/src/main/resources/proton.py?rev=1502346&r1=1502345&r2=1502346&view=diff
==============================================================================
--- qpid/proton/trunk/proton-j/proton-api/src/main/resources/proton.py (original)
+++ qpid/proton/trunk/proton-j/proton-api/src/main/resources/proton.py Thu Jul 11 20:11:12 2013
@@ -24,6 +24,7 @@ except NameError:
bytes = str
from org.apache.qpid.proton import ProtonFactoryLoader, ProtonUnsupportedOperationException
+from org.apache.qpid.proton import InterruptException as Interrupt
from org.apache.qpid.proton.engine import \
EngineFactory, Transport as JTransport, Sender as JSender, Receiver as JReceiver, \
Sasl, SslDomain as JSslDomain, \
@@ -44,6 +45,7 @@ from java.util import EnumSet, UUID as J
from java.util.concurrent import TimeoutException as Timeout
from java.nio import ByteBuffer
from java.lang import Character as JCharacter, String as JString, Integer as JInteger
+from java.lang import NoClassDefFoundError
@@ -1265,6 +1267,10 @@ class Messenger(object):
def stop(self):
self.impl.stop()
+ @property
+ def stopped(self):
+ return self.impl.stopped()
+
def subscribe(self, source):
self.impl.subscribe(source)
@@ -1272,14 +1278,21 @@ class Messenger(object):
self.impl.put(message.impl)
return self.impl.outgoingTracker()
- def send(self):
- self.impl.send()
+ def send(self, n=-1):
+ self.impl.send(n)
- def recv(self, n):
+ def recv(self, n=-1):
self.impl.recv(n)
+ def work(self, t):
+ return self.impl.work(t)
+
+ def interrupt(self):
+ self.impl.interrupt()
+
def get(self, message=None):
result = self.impl.get()
+ if not result: print "HA"
if message and result:
message.impl = result
return self.impl.incomingTracker()
@@ -1298,6 +1311,13 @@ class Messenger(object):
self.impl.setTimeout(t)
timeout = property(_get_timeout, _set_timeout)
+ def _is_blocking(self):
+ return self.impl.isBlocking()
+
+ def _set_blocking(self, b):
+ self.impl.setBlocking(b)
+
+ blocking = property(_is_blocking, _set_blocking)
def accept(self, tracker=None):
if tracker is None:
tracker = self.impl.incomingTracker()
@@ -1578,7 +1598,10 @@ class SSLDomain(object):
ANONYMOUS_PEER = JSslDomain.VerifyMode.ANONYMOUS_PEER
def __init__(self, mode):
- self._domain = engineFactory.createSslDomain()
+ try:
+ self._domain = engineFactory.createSslDomain()
+ except NoClassDefFoundError, e:
+ raise SSLUnavailable()
self._domain.init(mode)
def set_credentials(self, cert_file, key_file, password):
@@ -1724,6 +1747,7 @@ __all__ = [
"timestamp",
"Terminus",
"Timeout",
+ "Interrupt",
"Transport",
"TransportException",
"ulong",
Modified: qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/driver/impl/DriverImpl.java
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/driver/impl/DriverImpl.java?rev=1502346&r1=1502345&r2=1502346&view=diff
==============================================================================
--- qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/driver/impl/DriverImpl.java (original)
+++ qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/driver/impl/DriverImpl.java Thu Jul 11 20:11:12 2013
@@ -48,6 +48,8 @@ public class DriverImpl implements Drive
private Collection<Listener> _listeners = new LinkedList();
private Collection<Connector> _connectors = new LinkedList();
private Logger _logger = Logger.getLogger("proton.driver");
+ private Object _wakeupLock = new Object();
+ private boolean _woken = false;
DriverImpl() throws IOException
{
@@ -56,15 +58,34 @@ public class DriverImpl implements Drive
public void wakeup()
{
+ synchronized (_wakeupLock) {
+ _woken = true;
+ }
_selector.wakeup();
}
- public void doWait(long timeout)
+ public boolean doWait(long timeout)
{
try
{
- _selector.select(timeout);
+ boolean woken;
+ synchronized (_wakeupLock) {
+ woken = _woken;
+ }
+
+ if (woken) {
+ _selector.selectNow();
+ } else {
+ _selector.select(timeout);
+ }
_selectedKeys = _selector.selectedKeys();
+
+ synchronized (_wakeupLock) {
+ woken = woken || _woken;
+ _woken = false;
+ }
+
+ return woken;
}
catch (IOException e)
{
Modified: qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/messenger/impl/MessengerImpl.java
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/messenger/impl/MessengerImpl.java?rev=1502346&r1=1502345&r2=1502346&view=diff
==============================================================================
--- qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/messenger/impl/MessengerImpl.java (original)
+++ qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/messenger/impl/MessengerImpl.java Thu Jul 11 20:11:12 2013
@@ -29,6 +29,7 @@ import java.util.logging.Level;
import java.util.logging.Logger;
import org.apache.qpid.proton.ProtonFactoryLoader;
+import org.apache.qpid.proton.InterruptException;
import org.apache.qpid.proton.TimeoutException;
import org.apache.qpid.proton.driver.Connector;
import org.apache.qpid.proton.driver.Driver;
@@ -53,6 +54,8 @@ import org.apache.qpid.proton.messenger.
import org.apache.qpid.proton.amqp.messaging.Source;
import org.apache.qpid.proton.amqp.messaging.Target;
+import org.apache.qpid.proton.amqp.Binary;
+
public class MessengerImpl implements Messenger
{
@SuppressWarnings("rawtypes")
@@ -69,6 +72,7 @@ public class MessengerImpl implements Me
private final DriverFactory _driverFactory;
private final MessageFactory _messageFactory;
private long _timeout = -1;
+ private boolean _blocking = true;
private long _nextTag = 1;
private byte[] _buffer = new byte[5*1024];
private Driver _driver;
@@ -119,6 +123,16 @@ public class MessengerImpl implements Me
return _timeout;
}
+ public boolean isBlocking()
+ {
+ return _blocking;
+ }
+
+ public void setBlocking(boolean b)
+ {
+ _blocking = b;
+ }
+
public void start() throws IOException
{
_driver = _driverFactory.createDriver();
@@ -156,15 +170,24 @@ public class MessengerImpl implements Me
_logger.log(Level.WARNING, "Error while closing listener", e);
}
}
- try
- {
- waitUntil(_allClosed);
- }
- catch(TimeoutException e)
- {
- _logger.log(Level.WARNING, "Timed out while waiting for close", e);
- }
- _driver.destroy();
+ waitUntil(_allClosed);
+ //_driver.destroy();
+ }
+
+ public boolean stopped()
+ {
+ return _allClosed.test();
+ }
+
+ public boolean work(long timeout)
+ {
+ _worked = false;
+ return waitUntil(_workPred, timeout);
+ }
+
+ public void interrupt()
+ {
+ _driver.wakeup();
}
public void put(Message m) throws MessengerException
@@ -210,6 +233,11 @@ public class MessengerImpl implements Me
public void send() throws TimeoutException
{
+ send(-1);
+ }
+
+ public void send(int n) throws TimeoutException
+ {
if(_logger.isLoggable(Level.FINE))
{
_logger.fine(this + " about to send");
@@ -249,8 +277,8 @@ public class MessengerImpl implements Me
int size = read((Receiver) delivery.getLink());
Message message = _messageFactory.createMessage();
message.decode(_buffer, 0, size);
- _incoming.add(delivery);
delivery.getLink().advance();
+ _incoming.add(delivery);
return message;
}
else
@@ -383,27 +411,24 @@ public class MessengerImpl implements Me
private int read(Receiver receiver)
{
- //TODO: add pending count to Delivery?
- int total = 0;
- int start = 0;
- while (true)
- {
- int read = receiver.recv(_buffer, start, _buffer.length - start);
- total += read;
- if (read == (_buffer.length - start))
- {
- //may need to expand the buffer (is there a better test?)
- byte[] old = _buffer;
- _buffer = new byte[_buffer.length*2];
- System.arraycopy(old, 0, _buffer, 0, old.length);
- start += read;
- }
- else
- {
- break;
- }
+ Delivery dlv = receiver.current();
+
+ if (dlv.isPartial()) {
+ throw new IllegalStateException();
+ }
+
+ int size = dlv.pending();
+
+ while (_buffer.length < size) {
+ _buffer = new byte[_buffer.length * 2];
+ }
+
+ int read = receiver.recv(_buffer, 0, _buffer.length);
+ if (read != size) {
+ throw new IllegalStateException();
}
- return total;
+
+ return size;
}
private void processAllConnectors()
@@ -426,6 +451,7 @@ public class MessengerImpl implements Me
//process active listeners
for (Listener<?> l = _driver.listener(); l != null; l = _driver.listener())
{
+ _worked = true;
Connector<?> c = l.accept();
Connection connection = _engineFactory.createConnection();
connection.setContainer(_name);
@@ -443,6 +469,7 @@ public class MessengerImpl implements Me
//process active connectors, handling opened & closed connections as needed
for (Connector<?> c = _driver.connector(); c != null; c = _driver.connector())
{
+ _worked = true;
_logger.log(Level.FINE, "Processing active connector " + c);
try
{
@@ -524,12 +551,23 @@ public class MessengerImpl implements Me
}
}
- private void waitUntil(Predicate condition) throws TimeoutException
+ private boolean waitUntil(Predicate condition) throws TimeoutException
{
- waitUntil(condition, _timeout);
+ if (_blocking) {
+ boolean done = waitUntil(condition, _timeout);
+ if (!done) {
+ _logger.log(Level.SEVERE, String.format
+ ("Timeout when waiting for condition %s after %s ms",
+ condition, _timeout));
+ throw new TimeoutException();
+ }
+ return done;
+ } else {
+ return waitUntil(condition, 0);
+ }
}
- private void waitUntil(Predicate condition, long timeout) throws TimeoutException
+ private boolean waitUntil(Predicate condition, long timeout)
{
processAllConnectors();
@@ -539,23 +577,22 @@ public class MessengerImpl implements Me
boolean wait = deadline > System.currentTimeMillis();
boolean first = true;
boolean done = false;
+ boolean woken = false;
while (first || (!done && wait))
{
if (wait && !done && !first) {
- _driver.doWait(timeout < 0 ? 0 : deadline - System.currentTimeMillis());
+ woken = _driver.doWait(timeout < 0 ? 0 : deadline - System.currentTimeMillis());
}
processActive();
wait = deadline > System.currentTimeMillis();
- done = done || condition.test();
+ done = done || woken || condition.test();
first = false;
}
- if (!done)
- {
- _logger.log(Level.SEVERE, String.format(
- "Timeout when waiting for condition %s after %s ms", condition, timeout));
- throw new TimeoutException();
+ if (woken) {
+ throw new InterruptException();
}
+ return done;
}
private Connection lookup(String host, String service)
@@ -724,12 +761,22 @@ public class MessengerImpl implements Me
if (_driver.connectors().iterator().hasNext()) return false;
else return true;
}
+ }
+ private boolean _worked = false;
+
+ private class WorkPred implements Predicate
+ {
+ public boolean test()
+ {
+ return _worked;
+ }
}
private final SentSettled _sentSettled = new SentSettled();
private final MessageAvailable _messageAvailable = new MessageAvailable();
private final AllClosed _allClosed = new AllClosed();
+ private final WorkPred _workPred = new WorkPred();
private interface LinkFinder<C extends Link>
{
Modified: qpid/proton/trunk/tests/python/proton_tests/messenger.py
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/tests/python/proton_tests/messenger.py?rev=1502346&r1=1502345&r2=1502346&view=diff
==============================================================================
--- qpid/proton/trunk/tests/python/proton_tests/messenger.py (original)
+++ qpid/proton/trunk/tests/python/proton_tests/messenger.py Thu Jul 11 20:11:12 2013
@@ -49,27 +49,16 @@ class Test(common.Test):
def _safelyStopClient(self):
existing_exception = None
+ self.server.interrupt()
try:
- # send a message to cause the server to promptly exit
- msg = Message()
- msg.address="amqp://0.0.0.0:12345"
- self.client.put(msg)
- try:
- self.client.send()
- except:
- print "Client failed to send shutdown message due to: %s" % sys.exc_info()[1]
- existing_exception = sys.exc_info()[1]
+ self.client.stop()
+ self.client = None
+ except:
+ print "Client failed to stop due to: %s" % sys.exc_info()[1]
+ if existing_exception:
+ raise existing_exception
+ else:
raise
- finally:
- try:
- self.client.stop()
- self.client = None
- except:
- print "Client failed to stop due to: %s" % sys.exc_info()[1]
- if existing_exception:
- raise existing_exception
- else:
- raise
def teardown(self):
try:
@@ -91,8 +80,11 @@ class MessengerTest(Test):
try:
while self.running:
self.server_is_running_event.set()
- self.server.recv(self.server_credit)
- self.process_incoming(msg)
+ try:
+ self.server.recv(self.server_credit)
+ self.process_incoming(msg)
+ except Interrupt:
+ pass
finally:
self.server.stop()
self.running = False
@@ -113,7 +105,7 @@ class MessengerTest(Test):
self.server.put(msg)
self.server.settle()
- def _testSendReceive(self, size=None):
+ def testSendReceive(self, size=None):
self.start()
msg = Message()
msg.address="amqp://0.0.0.0:12345"
@@ -129,33 +121,30 @@ class MessengerTest(Test):
reply = Message()
self.client.recv(1)
- assert self.client.incoming == 1
+ assert self.client.incoming == 1, self.client.incoming
self.client.get(reply)
assert reply.subject == "Hello World!"
rbod = reply.body
assert rbod == body, (rbod, body)
- def testSendReceive(self):
- self._testSendReceive()
-
def testSendReceive1K(self):
- self._testSendReceive(1024)
+ self.testSendReceive(1024)
def testSendReceive2K(self):
- self._testSendReceive(2*1024)
+ self.testSendReceive(2*1024)
def testSendReceive4K(self):
- self._testSendReceive(4*1024)
+ self.testSendReceive(4*1024)
def testSendReceive10K(self):
- self._testSendReceive(10*1024)
+ self.testSendReceive(10*1024)
def testSendReceive100K(self):
- self._testSendReceive(100*1024)
+ self.testSendReceive(100*1024)
def testSendReceive1M(self):
- self._testSendReceive(1024*1024)
+ self.testSendReceive(1024*1024)
# PROTON-285 - prevent continually failing test
def xtestSendBogus(self):
@@ -589,3 +578,44 @@ class MessengerTest(Test):
def testRewriteOverrideDefault(self):
self.client.rewrite("*", "$1")
self._testRewrite("amqp://user:pass@host", "amqp://user:pass@host")
+
+class NBMessengerTest(common.Test):
+
+ def setup(self):
+ self.client = Messenger()
+ self.server = Messenger()
+ self.client.blocking = False
+ self.server.blocking = False
+ self.server.start()
+ self.client.start()
+ self.server.subscribe("amqp://~0.0.0.0:12345")
+
+ def pump(self):
+ while self.client.work(0) or self.server.work(0): pass
+
+ def teardown(self):
+ self.server.stop()
+ self.client.stop()
+ self.pump()
+ assert self.server.stopped
+ assert self.client.stopped
+
+ def test(self, count=1):
+ self.server.recv()
+
+ msg = Message()
+ msg.address = "amqp://0.0.0.0:12345"
+ for i in range(count):
+ msg.body = "Hello %s" % i
+ self.client.put(msg)
+
+ self.pump()
+ assert self.client.outgoing == 0, self.client.outgoing
+
+ msg2 = Message()
+ for i in range(count):
+ self.server.get(msg2)
+ assert msg2.body == "Hello %s" % i, (msg2.body, i)
+
+ def test1024(self):
+ self.test(1024)
Modified: qpid/proton/trunk/tests/ruby/proton-test
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/tests/ruby/proton-test?rev=1502346&r1=1502345&r2=1502346&view=diff
==============================================================================
--- qpid/proton/trunk/tests/ruby/proton-test (original)
+++ qpid/proton/trunk/tests/ruby/proton-test Thu Jul 11 20:11:12 2013
@@ -2,3 +2,4 @@
require 'test/unit'
require 'proton_tests/interop.rb'
+require 'proton_tests/smoke.rb'
Added: qpid/proton/trunk/tests/ruby/proton_tests/smoke.rb
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/tests/ruby/proton_tests/smoke.rb?rev=1502346&view=auto
==============================================================================
--- qpid/proton/trunk/tests/ruby/proton_tests/smoke.rb (added)
+++ qpid/proton/trunk/tests/ruby/proton_tests/smoke.rb Thu Jul 11 20:11:12 2013
@@ -0,0 +1,57 @@
+#!/usr/bin/env ruby
+
+require 'test/unit'
+require 'qpid_proton'
+
+class SmokeTest < Test::Unit::TestCase
+
+ Messenger = Qpid::Proton::Messenger
+ Message = Qpid::Proton::Message
+
+ def setup
+ @server = Messenger.new()
+ @client = Messenger.new()
+ @server.blocking = false
+ @client.blocking = false
+ @server.subscribe("~0.0.0.0:12345")
+ @server.start()
+ @client.start()
+ pump()
+ end
+
+ def pump
+ while (@server.work(0) or @client.work(0)) do end
+ end
+
+ def teardown
+ @server.stop()
+ @client.stop()
+
+ pump()
+
+ assert @client.stopped
+ assert @server.stopped
+ end
+
+ def testSmoke(count=10)
+ msg = Message.new()
+ msg.address = "0.0.0.0:12345"
+
+ @server.receive()
+
+ count.times {|i|
+ msg.content = "Hello World! #{i}"
+ @client.put(msg)
+ }
+
+ pump()
+
+ msg2 = Message.new()
+
+ count.times {|i|
+ @server.get(msg2)
+ assert msg2.content == "Hello World! #{i}"
+ }
+ end
+
+end
Modified: qpid/proton/trunk/tests/tools/apps/c/msgr-recv.c
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/tests/tools/apps/c/msgr-recv.c?rev=1502346&r1=1502345&r2=1502346&view=diff
==============================================================================
--- qpid/proton/trunk/tests/tools/apps/c/msgr-recv.c (original)
+++ qpid/proton/trunk/tests/tools/apps/c/msgr-recv.c Thu Jul 11 20:11:12 2013
@@ -257,7 +257,7 @@ int main(int argc, char** argv)
// this will flush any pending sends
if (pn_messenger_outgoing(messenger) > 0) {
LOG("Calling pn_messenger_send()\n");
- rc = pn_messenger_send(messenger);
+ rc = pn_messenger_send(messenger, -1);
check(rc == 0, "pn_messenger_send() failed");
}
Modified: qpid/proton/trunk/tests/tools/apps/c/msgr-send.c
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/tests/tools/apps/c/msgr-send.c?rev=1502346&r1=1502345&r2=1502346&view=diff
==============================================================================
--- qpid/proton/trunk/tests/tools/apps/c/msgr-send.c (original)
+++ qpid/proton/trunk/tests/tools/apps/c/msgr-send.c Thu Jul 11 20:11:12 2013
@@ -279,7 +279,7 @@ int main(int argc, char** argv)
}
} else {
LOG("Calling pn_messenger_send()\n");
- rc = pn_messenger_send(messenger);
+ rc = pn_messenger_send(messenger, -1);
check((rc == 0 || rc == PN_TIMEOUT), "pn_messenger_send() failed");
}
}
@@ -300,7 +300,7 @@ int main(int argc, char** argv)
}
} else if (pn_messenger_outgoing(messenger) > 0) {
LOG("Calling pn_messenger_send()\n");
- rc = pn_messenger_send(messenger);
+ rc = pn_messenger_send(messenger, -1);
check(rc == 0, "pn_messenger_send() failed");
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org