You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rh...@apache.org on 2013/07/11 22:11:13 UTC

svn commit: r1502346 - in /qpid/proton/trunk: examples/messenger/c/ proton-c/bindings/java/src/main/java/org/apache/qpid/proton/messenger/jni/ proton-c/bindings/perl/lib/qpid/proton/ proton-c/bindings/php/ proton-c/bindings/python/ proton-c/bindings/ru...

Author: rhs
Date: Thu Jul 11 20:11:12 2013
New Revision: 1502346

URL: http://svn.apache.org/r1502346
Log:
Added several related messenger features: non blocking mode, pn_messenger_work, pn_messenger_interrupt, optional arg to pn_messenger_send. Also added basic non blocking smoke test for messenger in ruby and python. These address PROTON-231.

Added:
    qpid/proton/trunk/proton-j/proton-api/src/main/java/org/apache/qpid/proton/InterruptException.java
    qpid/proton/trunk/tests/ruby/proton_tests/smoke.rb
Modified:
    qpid/proton/trunk/examples/messenger/c/send.c
    qpid/proton/trunk/proton-c/bindings/java/src/main/java/org/apache/qpid/proton/messenger/jni/JNIMessenger.java
    qpid/proton/trunk/proton-c/bindings/perl/lib/qpid/proton/Messenger.pm
    qpid/proton/trunk/proton-c/bindings/php/proton.php
    qpid/proton/trunk/proton-c/bindings/python/proton.py
    qpid/proton/trunk/proton-c/bindings/ruby/lib/qpid_proton/exception_handling.rb
    qpid/proton/trunk/proton-c/bindings/ruby/lib/qpid_proton/exceptions.rb
    qpid/proton/trunk/proton-c/bindings/ruby/lib/qpid_proton/messenger.rb
    qpid/proton/trunk/proton-c/include/proton/error.h
    qpid/proton/trunk/proton-c/include/proton/messenger.h
    qpid/proton/trunk/proton-c/src/messenger/messenger.c
    qpid/proton/trunk/proton-c/src/posix/driver.c
    qpid/proton/trunk/proton-j/proton-api/src/main/java/org/apache/qpid/proton/driver/Driver.java
    qpid/proton/trunk/proton-j/proton-api/src/main/java/org/apache/qpid/proton/messenger/Messenger.java
    qpid/proton/trunk/proton-j/proton-api/src/main/resources/proton.py
    qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/driver/impl/DriverImpl.java
    qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/messenger/impl/MessengerImpl.java
    qpid/proton/trunk/tests/python/proton_tests/messenger.py
    qpid/proton/trunk/tests/ruby/proton-test
    qpid/proton/trunk/tests/tools/apps/c/msgr-recv.c
    qpid/proton/trunk/tests/tools/apps/c/msgr-send.c

Modified: qpid/proton/trunk/examples/messenger/c/send.c
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/examples/messenger/c/send.c?rev=1502346&r1=1502345&r2=1502346&view=diff
==============================================================================
--- qpid/proton/trunk/examples/messenger/c/send.c (original)
+++ qpid/proton/trunk/examples/messenger/c/send.c Thu Jul 11 20:11:12 2013
@@ -101,7 +101,7 @@ int main(int argc, char** argv)
   pn_data_put_string(body, pn_bytes(strlen(msgtext), msgtext));
   pn_messenger_put(messenger, message);
   check(messenger);
-  pn_messenger_send(messenger);
+  pn_messenger_send(messenger, -1);
   check(messenger);
 
   pn_messenger_stop(messenger);

Modified: qpid/proton/trunk/proton-c/bindings/java/src/main/java/org/apache/qpid/proton/messenger/jni/JNIMessenger.java
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/bindings/java/src/main/java/org/apache/qpid/proton/messenger/jni/JNIMessenger.java?rev=1502346&r1=1502345&r2=1502346&view=diff
==============================================================================
--- qpid/proton/trunk/proton-c/bindings/java/src/main/java/org/apache/qpid/proton/messenger/jni/JNIMessenger.java (original)
+++ qpid/proton/trunk/proton-c/bindings/java/src/main/java/org/apache/qpid/proton/messenger/jni/JNIMessenger.java Thu Jul 11 20:11:12 2013
@@ -81,7 +81,13 @@ class JNIMessenger implements Messenger
     @Override
     public void send() throws TimeoutException
     {
-        int err = Proton.pn_messenger_send(_impl);
+        send(-1);
+    }
+
+    @Override
+    public void send(int n) throws TimeoutException
+    {
+        int err = Proton.pn_messenger_send(_impl, n);
         check(err);
     }
 
@@ -123,6 +129,29 @@ class JNIMessenger implements Messenger
     }
 
     @Override
+    public boolean stopped()
+    {
+        return Proton.pn_messenger_stopped(_impl);
+    }
+
+    @Override
+    public boolean work(long timeout)
+    {
+        int err = Proton.pn_messenger_work(_impl, (int) timeout);
+        if (err == Proton.PN_TIMEOUT)
+            return false;
+        check(err);
+        return true;
+    }
+
+    @Override
+    public void interrupt()
+    {
+        int err = Proton.pn_messenger_interrupt(_impl);
+        check(err);
+    }
+
+    @Override
     public void setTimeout(final long timeInMillis)
     {
         int err = Proton.pn_messenger_set_timeout(_impl, (int) timeInMillis);
@@ -136,6 +165,19 @@ class JNIMessenger implements Messenger
     }
 
     @Override
+    public boolean isBlocking()
+    {
+        return Proton.pn_messenger_is_blocking(_impl);
+    }
+
+    @Override
+    public void setBlocking(boolean b)
+    {
+        int err = Proton.pn_messenger_set_blocking(_impl, b);
+        check(err);
+    }
+
+    @Override
     public int outgoing()
     {
         return Proton.pn_messenger_outgoing(_impl);
@@ -232,7 +274,7 @@ class JNIMessenger implements Messenger
 
     private void check(int errorCode) throws ProtonException
     {
-        if(errorCode != 0)
+        if(errorCode != 0 && errorCode != Proton.PN_INPROGRESS)
         {
             String errorMessage = Proton.pn_messenger_error(_impl);
             if(errorCode == Proton.PN_TIMEOUT)

Modified: qpid/proton/trunk/proton-c/bindings/perl/lib/qpid/proton/Messenger.pm
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/bindings/perl/lib/qpid/proton/Messenger.pm?rev=1502346&r1=1502345&r2=1502346&view=diff
==============================================================================
--- qpid/proton/trunk/proton-c/bindings/perl/lib/qpid/proton/Messenger.pm (original)
+++ qpid/proton/trunk/proton-c/bindings/perl/lib/qpid/proton/Messenger.pm Thu Jul 11 20:11:12 2013
@@ -164,7 +164,7 @@ sub put {
 
 sub send {
     my ($self) = @_;
-    cproton_perl::pn_messenger_send($self->{_impl});
+    cproton_perl::pn_messenger_send($self->{_impl}, $_[1]);
 }
 
 sub get {

Modified: qpid/proton/trunk/proton-c/bindings/php/proton.php
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/bindings/php/proton.php?rev=1502346&r1=1502345&r2=1502346&view=diff
==============================================================================
--- qpid/proton/trunk/proton-c/bindings/php/proton.php (original)
+++ qpid/proton/trunk/proton-c/bindings/php/proton.php Thu Jul 11 20:11:12 2013
@@ -127,11 +127,11 @@ class Messenger
     $this->_check(pn_messenger_put($this->impl, $message->impl));
   }
 
-  public function send() {
-    $this->_check(pn_messenger_send($this->impl));
+  public function send($n = -1) {
+    $this->_check(pn_messenger_send($this->impl, $n));
   }
 
-  public function recv($n) {
+  public function recv($n = -1) {
     $this->_check(pn_messenger_recv($this->impl, $n));
   }
 

Modified: qpid/proton/trunk/proton-c/bindings/python/proton.py
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/bindings/python/proton.py?rev=1502346&r1=1502345&r2=1502346&view=diff
==============================================================================
--- qpid/proton/trunk/proton-c/bindings/python/proton.py (original)
+++ qpid/proton/trunk/proton-c/bindings/python/proton.py Thu Jul 11 20:11:12 2013
@@ -113,6 +113,12 @@ class Timeout(ProtonException):
   """
   pass
 
+class Interrupt(ProtonException):
+  """
+  An interrupt exception indicaes that a blocking operation was interrupted.
+  """
+  pass
+
 class MessengerException(ProtonException):
   """
   The root of the messenger exception hierarchy. All exceptions
@@ -129,7 +135,8 @@ class MessageException(ProtonException):
   pass
 
 EXCEPTIONS = {
-  PN_TIMEOUT: Timeout
+  PN_TIMEOUT: Timeout,
+  PN_INTR: Interrupt
   }
 
 PENDING = Constant("PENDING")
@@ -225,6 +232,8 @@ class Messenger(object):
 
   def _check(self, err):
     if err < 0:
+      if (err == PN_INPROGRESS):
+        return
       exc = EXCEPTIONS.get(err, MessengerException)
       raise exc("[%s]: %s" % (err, pn_messenger_error(self._mng)))
     else:
@@ -306,6 +315,14 @@ The timeout property contains the defaul
 operations performed by the L{Messenger}.
 """)
 
+  def _is_blocking(self):
+    return pn_messenger_is_blocking(self._mng)
+
+  def _set_blocking(self, b):
+    self._check(pn_messenger_set_blocking(self._mng, b))
+
+  blocking = property(_is_blocking, _set_blocking)
+
   def _get_incoming_window(self):
     return pn_messenger_get_incoming_window(self._mng)
 
@@ -352,6 +369,10 @@ send. Defaults to zero.
     """
     self._check(pn_messenger_stop(self._mng))
 
+  @property
+  def stopped(self):
+    return pn_messenger_stopped(self._mng)
+
   def subscribe(self, source):
     """
     Subscribes the L{Messenger} to messages originating from the
@@ -410,13 +431,13 @@ send. Defaults to zero.
       flags = 0
     self._check(pn_messenger_settle(self._mng, tracker, flags))
 
-  def send(self):
+  def send(self, n=-1):
     """
     Blocks until the outgoing queue is empty or the operation times
     out. The L{timeout} property controls how long a L{Messenger} will
     block before timing out.
     """
-    self._check(pn_messenger_send(self._mng))
+    self._check(pn_messenger_send(self._mng, n))
 
   def recv(self, n=None):
     """
@@ -429,6 +450,17 @@ send. Defaults to zero.
       n = -1
     self._check(pn_messenger_recv(self._mng, n))
 
+  def work(self, timeout=-1):
+    err = pn_messenger_work(self._mng, timeout)
+    if (err == PN_TIMEOUT):
+      return False
+    else:
+      self._check(err)
+      return True
+
+  def interrupt(self):
+    self._check(pn_messenger_interrupt(self._mng))
+
   def get(self, message=None):
     """
     Moves the message from the head of the incoming message queue into
@@ -2861,6 +2893,7 @@ __all__ = [
            "SSLUnavailable",
            "Terminus",
            "Timeout",
+           "Interrupt",
            "Transport",
            "TransportException",
            "char",

Modified: qpid/proton/trunk/proton-c/bindings/ruby/lib/qpid_proton/exception_handling.rb
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/bindings/ruby/lib/qpid_proton/exception_handling.rb?rev=1502346&r1=1502345&r2=1502346&view=diff
==============================================================================
--- qpid/proton/trunk/proton-c/bindings/ruby/lib/qpid_proton/exception_handling.rb (original)
+++ qpid/proton/trunk/proton-c/bindings/ruby/lib/qpid_proton/exception_handling.rb Thu Jul 11 20:11:12 2013
@@ -54,6 +54,9 @@ module Qpid
         when Qpid::Proton::Error::TIMEOUT
           raise Qpid::Proton::TimeoutError.new(self.error)
 
+        when Qpid::Proton::Error::INPROGRESS
+          return
+
         else
 
           raise ::ArgumentError.new("Unknown error code: #{code}")

Modified: qpid/proton/trunk/proton-c/bindings/ruby/lib/qpid_proton/exceptions.rb
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/bindings/ruby/lib/qpid_proton/exceptions.rb?rev=1502346&r1=1502345&r2=1502346&view=diff
==============================================================================
--- qpid/proton/trunk/proton-c/bindings/ruby/lib/qpid_proton/exceptions.rb (original)
+++ qpid/proton/trunk/proton-c/bindings/ruby/lib/qpid_proton/exceptions.rb Thu Jul 11 20:11:12 2013
@@ -31,6 +31,7 @@ module Qpid
       STATE = Cproton::PN_STATE_ERR
       ARGUMENT = Cproton::PN_ARG_ERR
       TIMEOUT = Cproton::PN_TIMEOUT
+      INPROGRESS = Cproton::PN_INPROGRESS
 
     end
 

Modified: qpid/proton/trunk/proton-c/bindings/ruby/lib/qpid_proton/messenger.rb
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/bindings/ruby/lib/qpid_proton/messenger.rb?rev=1502346&r1=1502345&r2=1502346&view=diff
==============================================================================
--- qpid/proton/trunk/proton-c/bindings/ruby/lib/qpid_proton/messenger.rb (original)
+++ qpid/proton/trunk/proton-c/bindings/ruby/lib/qpid_proton/messenger.rb Thu Jul 11 20:11:12 2013
@@ -46,7 +46,6 @@ module Qpid
 
       def self.finalize!(impl) # :nodoc:
         proc {
-          Cproton.pn_messenger_stop(impl)
           Cproton.pn_messenger_free(impl)
         }
       end
@@ -76,6 +75,14 @@ module Qpid
         Cproton.pn_messenger_get_timeout(@impl)
       end
 
+      def blocking
+        Cproton.pn_mesenger_is_blocking(@impl)
+      end
+
+      def blocking=(blocking)
+        Cproton.pn_messenger_set_blocking(@impl, blocking)
+      end
+
       # Reports whether an error occurred.
       #
       def error?
@@ -108,6 +115,10 @@ module Qpid
         check_for_error(Cproton.pn_messenger_stop(@impl))
       end
 
+      def stopped
+        Cproton.pn_messenger_stopped(@impl)
+      end
+
       # Subscribes the +Messenger+ to a remote address.
       #
       def subscribe(address)
@@ -191,8 +202,8 @@ module Qpid
       # Sends all outgoing messages, blocking until the outgoing queue
       # is empty.
       #
-      def send
-        check_for_error(Cproton.pn_messenger_send(@impl))
+      def send(n = -1)
+        check_for_error(Cproton.pn_messenger_send(@impl, n))
       end
 
       # Gets a single message incoming message from the local queue.
@@ -215,12 +226,24 @@ module Qpid
       #
       # Options ====
       #
-      # * max - the maximum number of messages to receive
+      # * limit - the maximum number of messages to receive
       #
-      def receive(max)
-        raise TypeError.new("invalid max: #{max}") if max.nil? || max.to_i.zero?
-        raise RangeError.new("negative max: #{max}") if max < 0
-        check_for_error(Cproton.pn_messenger_recv(@impl, max))
+      def receive(limit=-1)
+        check_for_error(Cproton.pn_messenger_recv(@impl, limit))
+      end
+
+      def receiving
+        Cproton.pn_messenger_receiving(@impl)
+      end
+
+      def work(timeout=-1)
+        err = Cproton.pn_messenger_work(@impl, timeout)
+        if (err == Cproton::PN_TIMEOUT) then
+          return false
+        else
+          check_for_error(err)
+          return true
+        end
       end
 
       # Returns the number messages in the outgoing queue that have not been

Modified: qpid/proton/trunk/proton-c/include/proton/error.h
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/include/proton/error.h?rev=1502346&r1=1502345&r2=1502346&view=diff
==============================================================================
--- qpid/proton/trunk/proton-c/include/proton/error.h (original)
+++ qpid/proton/trunk/proton-c/include/proton/error.h Thu Jul 11 20:11:12 2013
@@ -39,6 +39,7 @@ typedef struct pn_error_t pn_error_t;
 #define PN_ARG_ERR (-6)
 #define PN_TIMEOUT (-7)
 #define PN_INTR (-8)
+#define PN_INPROGRESS (-9)
 
 PN_EXTERN const char *pn_code(int code);
 

Modified: qpid/proton/trunk/proton-c/include/proton/messenger.h
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/include/proton/messenger.h?rev=1502346&r1=1502345&r2=1502346&view=diff
==============================================================================
--- qpid/proton/trunk/proton-c/include/proton/messenger.h (original)
+++ qpid/proton/trunk/proton-c/include/proton/messenger.h Thu Jul 11 20:11:12 2013
@@ -148,6 +148,9 @@ PN_EXTERN int pn_messenger_set_timeout(p
  */
 PN_EXTERN int pn_messenger_get_timeout(pn_messenger_t *messenger);
 
+PN_EXTERN bool pn_messenger_is_blocking(pn_messenger_t *messenger);
+PN_EXTERN int pn_messenger_set_blocking(pn_messenger_t *messenger, bool blocking);
+
 /** Frees a Messenger.
  *
  * @param[in] messenger the messenger to free, no longer valid on
@@ -238,6 +241,8 @@ PN_EXTERN int pn_messenger_start(pn_mess
  */
 PN_EXTERN int pn_messenger_stop(pn_messenger_t *messenger);
 
+PN_EXTERN bool pn_messenger_stopped(pn_messenger_t *messenger);
+
 /** Subscribes a messenger to messages from the specified source.
  *
  * @param[in] messenger the messenger to subscribe
@@ -294,29 +299,60 @@ PN_EXTERN int pn_messenger_settle(pn_mes
  */
 PN_EXTERN pn_tracker_t pn_messenger_outgoing_tracker(pn_messenger_t *messenger);
 
-/** Sends any messages in the outgoing message queue for a messenger.
- * This will block until the messages have been sent.
+/** Sends or receives any outstanding messages queued for a messenger.
+ * This will block for the indicated timeout.
+ *
+ * @param[in] messenger the Messenger
+ * @param[in] timeout the maximum time to block
+ */
+PN_EXTERN int pn_messenger_work(pn_messenger_t *messenger, int timeout);
+
+/** Interrupts a messenger that is blocking. This method may be safely
+ * called from a different thread than the one that is blocking.
+ *
+ * @param[in] messenger the Messenger
+ */
+PN_EXTERN int pn_messenger_interrupt(pn_messenger_t *messenger);
+
+/** Sends messages in the outgoing message queue for a messenger. This
+ * call will block until the indicated number of messages have been
+ * sent. If n is -1 this call will block until all outgoing messages
+ * have been sent. If n is 0 then this call won't block.
  *
  * @param[in] messenger the messager
+ * @param[in] n the number of messages to send
  *
  * @return an error code or zero on success
  * @see error.h
  */
-PN_EXTERN int pn_messenger_send(pn_messenger_t *messenger);
+PN_EXTERN int pn_messenger_send(pn_messenger_t *messenger, int n);
 
-/** Receives up to n messages into the incoming message queue of a
- * messenger. If n is -1, Messenger will be able to receive as many
- * messages as it can buffer internally.  Blocks until at least one
- * message is available in the incoming queue.
+/** Instructs the messenger to receives up to limit messages into the
+ * incoming message queue of a messenger. If limit is -1, Messenger
+ * will receive as many messages as it can buffer internally. If the
+ * messenger is in blocking mode, this call will block until at least
+ * one message is available in the incoming queue.
+ *
+ * Each call to pn_messenger_recv replaces the previos receive
+ * operation, so pn_messenger_recv(messenger, 0) will cancel any
+ * outstanding receive.
  *
  * @param[in] messenger the messenger
- * @param[in] n the maximum number of messages to receive or -1 to to
- * receive as many messages as it can buffer internally.
+ * @param[in] limit the maximum number of messages to receive or -1 to
+ *                  to receive as many messages as it can buffer
+ *                  internally.
  *
  * @return an error code or zero on success
  * @see error.h
  */
-PN_EXTERN int pn_messenger_recv(pn_messenger_t *messenger, int n);
+PN_EXTERN int pn_messenger_recv(pn_messenger_t *messenger, int limit);
+
+/** Returns the number of messages currently being received by a
+ * messenger.
+ *
+ * @param[in] messenger the messenger
+ */
+PN_EXTERN int pn_messenger_receiving(pn_messenger_t *messenger);
 
 /** Gets a message from the head of the incoming message queue of a
  * messenger.

Modified: qpid/proton/trunk/proton-c/src/messenger/messenger.c
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/src/messenger/messenger.c?rev=1502346&r1=1502345&r2=1502346&view=diff
==============================================================================
--- qpid/proton/trunk/proton-c/src/messenger/messenger.c (original)
+++ qpid/proton/trunk/proton-c/src/messenger/messenger.c Thu Jul 11 20:11:12 2013
@@ -53,8 +53,10 @@ struct pn_messenger_t {
   char *password;
   char *trusted_certificates;
   int timeout;
+  bool blocking;
   pn_driver_t *driver;
-  bool unlimited_credit;
+  int send_threshold;
+  int receiving;
   int credit_batch;
   int credit;
   int distributed;
@@ -73,6 +75,7 @@ struct pn_messenger_t {
   pn_tracker_t incoming_tracker;
   pn_string_t *original;
   pn_string_t *rewritten;
+  bool worked;
 };
 
 struct pn_subscription_t {
@@ -182,8 +185,9 @@ pn_messenger_t *pn_messenger(const char 
     m->password = NULL;
     m->trusted_certificates = NULL;
     m->timeout = -1;
+    m->blocking = true;
     m->driver = pn_driver();
-    m->unlimited_credit = false;
+    m->receiving = 0;
     m->credit_batch = 10;
     m->credit = 0;
     m->distributed = 0;
@@ -272,6 +276,18 @@ int pn_messenger_get_timeout(pn_messenge
   return messenger ? messenger->timeout : 0;
 }
 
+bool pn_messenger_is_blocking(pn_messenger_t *messenger)
+{
+  assert(messenger);
+  return messenger->blocking;
+}
+
+int pn_messenger_set_blocking(pn_messenger_t *messenger, bool blocking)
+{
+  messenger->blocking = blocking;
+  return 0;
+}
+
 static void pni_driver_reclaim(pn_driver_t *driver)
 {
   pn_listener_t *l = pn_listener_head(driver);
@@ -342,8 +358,13 @@ void pn_messenger_flow(pn_messenger_t *m
 
   if (link_ct == 0) return;
 
-  if (messenger->unlimited_credit)
+  if (messenger->receiving == -1) {
     messenger->credit = link_ct * messenger->credit_batch;
+  } else {
+    int total = messenger->credit + messenger->distributed;
+    if (messenger->receiving > total)
+      messenger->credit += (messenger->receiving - total);
+  }
 
   int batch = (messenger->credit < link_ct) ? 1
     : (messenger->credit/link_ct);
@@ -619,11 +640,11 @@ int pn_messenger_tsync(pn_messenger_t *m
     if (pred || (timeout >= 0 && remaining < 0)) break;
 
     int error = pn_driver_wait(messenger->driver, remaining);
-    if (error)
-        return error;
+    if (error && error != PN_INTR) return error;
 
     pn_listener_t *l;
     while ((l = pn_driver_listener(messenger->driver))) {
+      messenger->worked = true;
       pn_listener_ctx_t *ctx = (pn_listener_ctx_t *) pn_listener_context(l);
       pn_subscription_t *sub = ctx->subscription;
       char *scheme = sub->scheme;
@@ -654,6 +675,7 @@ int pn_messenger_tsync(pn_messenger_t *m
 
     pn_connector_t *c;
     while ((c = pn_driver_connector(messenger->driver))) {
+      messenger->worked = true;
       pn_connector_process(c);
       pn_connection_t *conn = pn_connector_connection(c);
       pn_messenger_endpoints(messenger, conn, c);
@@ -673,6 +695,10 @@ int pn_messenger_tsync(pn_messenger_t *m
     if (timeout >= 0) {
       now = pn_i_now();
     }
+
+    if (error == PN_INTR) {
+      return pred ? 0 : PN_INTR;
+    }
   }
 
   return pred ? 0 : PN_TIMEOUT;
@@ -680,7 +706,16 @@ int pn_messenger_tsync(pn_messenger_t *m
 
 int pn_messenger_sync(pn_messenger_t *messenger, bool (*predicate)(pn_messenger_t *))
 {
-  return pn_messenger_tsync(messenger, predicate, messenger->timeout);
+  if (messenger->blocking) {
+    return pn_messenger_tsync(messenger, predicate, messenger->timeout);
+  } else {
+    int err = pn_messenger_tsync(messenger, predicate, 0);
+    if (err == PN_TIMEOUT) {
+      return PN_INPROGRESS;
+    } else {
+      return err;
+    }
+  }
 }
 
 int pn_messenger_start(pn_messenger_t *messenger)
@@ -1000,8 +1035,6 @@ static void outward_munge(pn_messenger_t
   if (heapbuf) free (heapbuf);
 }
 
-// static bool false_pred(pn_messenger_t *messenger) { return false; }
-
 int pni_pump_out(pn_messenger_t *messenger, const char *address, pn_link_t *sender)
 {
   pni_entry_t *entry = pni_store_get(messenger->outgoing, address);
@@ -1026,8 +1059,6 @@ int pni_pump_out(pn_messenger_t *messeng
   } else {
     pn_link_advance(sender);
     pni_entry_free(entry);
-    // XXX: doing this every time is slow, need to be smarter
-    //pn_messenger_tsync(messenger, false_pred, 0);
     return 0;
   }
 }
@@ -1150,7 +1181,7 @@ int pn_messenger_settle(pn_messenger_t *
 // true if all pending output has been sent to peer
 bool pn_messenger_sent(pn_messenger_t *messenger)
 {
-  if (pni_store_size(messenger->outgoing) > 0) return false;
+  int total = pni_store_size(messenger->outgoing);
 
   pn_connector_t *ctor = pn_connector_head(messenger->driver);
   while (ctor) {
@@ -1169,14 +1200,12 @@ bool pn_messenger_sent(pn_messenger_t *m
     pn_link_t *link = pn_link_head(conn, PN_LOCAL_ACTIVE);
     while (link) {
       if (pn_link_is_sender(link)) {
-        if (pn_link_queued(link)) {
-          return false;
-        }
+        total += pn_link_queued(link);
 
         pn_delivery_t *d = pn_unsettled_head(link);
         while (d) {
           if (!pn_delivery_remote_state(d) && !pn_delivery_settled(d)) {
-            return false;
+            total++;
           }
           d = pn_unsettled_next(d);
         }
@@ -1187,7 +1216,7 @@ bool pn_messenger_sent(pn_messenger_t *m
     ctor = pn_connector_next(ctor);
   }
 
-  return true;
+  return total <= messenger->send_threshold;
 }
 
 bool pn_messenger_rcvd(pn_messenger_t *messenger)
@@ -1215,8 +1244,35 @@ bool pn_messenger_rcvd(pn_messenger_t *m
   }
 }
 
-int pn_messenger_send(pn_messenger_t *messenger)
+static bool work_pred(pn_messenger_t *messenger) {
+  return messenger->worked;
+}
+
+int pn_messenger_work(pn_messenger_t *messenger, int timeout)
 {
+  messenger->worked = false;
+  return pn_messenger_tsync(messenger, work_pred, timeout);
+}
+
+int pn_messenger_interrupt(pn_messenger_t *messenger)
+{
+  assert(messenger);
+  if (messenger->driver) {
+    return pn_driver_wakeup(messenger->driver);
+  } else {
+    return 0;
+  }
+}
+
+int pn_messenger_send(pn_messenger_t *messenger, int n)
+{
+  if (n == -1) {
+    messenger->send_threshold = 0;
+  } else {
+    messenger->send_threshold = pn_messenger_outgoing(messenger) - n;
+    if (messenger->send_threshold < 0)
+      messenger->send_threshold = 0;
+  }
   return pn_messenger_sync(messenger, pn_messenger_sent);
 }
 
@@ -1225,14 +1281,7 @@ int pn_messenger_recv(pn_messenger_t *me
   if (!messenger) return PN_ARG_ERR;
   if (!pn_listener_head(messenger->driver) && !pn_connector_head(messenger->driver))
     return pn_error_format(messenger->error, PN_STATE_ERR, "no valid sources");
-  int total = messenger->credit + messenger->distributed;
-  if (n == -1) {
-    messenger->unlimited_credit = true;
-  } else  {
-    messenger->unlimited_credit = false;
-    if (n > total)
-      messenger->credit += (n - total);
-  }
+  messenger->receiving = n;
   pn_messenger_flow(messenger);
   int err = pn_messenger_sync(messenger, pn_messenger_rcvd);
   if (err) return err;
@@ -1245,6 +1294,12 @@ int pn_messenger_recv(pn_messenger_t *me
   }
 }
 
+int pn_messenger_receiving(pn_messenger_t *messenger)
+{
+  assert(messenger);
+  return messenger->receiving;
+}
+
 int pn_messenger_get(pn_messenger_t *messenger, pn_message_t *msg)
 {
   if (!messenger) return PN_ARG_ERR;

Modified: qpid/proton/trunk/proton-c/src/posix/driver.c
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/src/posix/driver.c?rev=1502346&r1=1502345&r2=1502346&view=diff
==============================================================================
--- qpid/proton/trunk/proton-c/src/posix/driver.c (original)
+++ qpid/proton/trunk/proton-c/src/posix/driver.c Thu Jul 11 20:11:12 2013
@@ -766,9 +766,11 @@ int pn_driver_wait_2(pn_driver_t *d, int
   return result;
 }
 
-void pn_driver_wait_3(pn_driver_t *d)
+int pn_driver_wait_3(pn_driver_t *d)
 {
+  bool woken = false;
   if (d->fds[0].revents & POLLIN) {
+    woken = true;
     //clear the pipe
     char buffer[512];
     while (read(d->ctrl[0], buffer, 512) == 512);
@@ -800,6 +802,8 @@ void pn_driver_wait_3(pn_driver_t *d)
 
   d->listener_next = d->listener_head;
   d->connector_next = d->connector_head;
+
+  return woken ? PN_INTR : 0;
 }
 
 //
@@ -818,8 +822,7 @@ int pn_driver_wait(pn_driver_t *d, int t
     int result = pn_driver_wait_2(d, timeout);
     if (result == -1)
         return pn_error_code(d->error);
-    pn_driver_wait_3(d);
-    return 0;
+    return pn_driver_wait_3(d);
 }
 
 pn_listener_t *pn_driver_listener(pn_driver_t *d) {

Added: qpid/proton/trunk/proton-j/proton-api/src/main/java/org/apache/qpid/proton/InterruptException.java
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-j/proton-api/src/main/java/org/apache/qpid/proton/InterruptException.java?rev=1502346&view=auto
==============================================================================
--- qpid/proton/trunk/proton-j/proton-api/src/main/java/org/apache/qpid/proton/InterruptException.java (added)
+++ qpid/proton/trunk/proton-j/proton-api/src/main/java/org/apache/qpid/proton/InterruptException.java Thu Jul 11 20:11:12 2013
@@ -0,0 +1,44 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.proton;
+
+public class InterruptException extends ProtonException
+{
+    public InterruptException()
+    {
+    }
+
+    public InterruptException(String message)
+    {
+        super(message);
+    }
+
+    public InterruptException(String message, Throwable cause)
+    {
+        super(message, cause);
+    }
+
+    public InterruptException(Throwable cause)
+    {
+        super(cause);
+    }
+
+}

Modified: qpid/proton/trunk/proton-j/proton-api/src/main/java/org/apache/qpid/proton/driver/Driver.java
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-j/proton-api/src/main/java/org/apache/qpid/proton/driver/Driver.java?rev=1502346&r1=1502345&r2=1502346&view=diff
==============================================================================
--- qpid/proton/trunk/proton-j/proton-api/src/main/java/org/apache/qpid/proton/driver/Driver.java (original)
+++ qpid/proton/trunk/proton-j/proton-api/src/main/java/org/apache/qpid/proton/driver/Driver.java Thu Jul 11 20:11:12 2013
@@ -56,8 +56,10 @@ public interface Driver
      * Thread-safe.
      *
      * @param timeout maximum time in milliseconds to wait. 0 means wait indefinitely.
+     *
+     * @param returns true if woken up
      */
-    void doWait(long timeout);
+    boolean doWait(long timeout);
 
     /**
      * Get the next listener with pending data in the driver.

Modified: qpid/proton/trunk/proton-j/proton-api/src/main/java/org/apache/qpid/proton/messenger/Messenger.java
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-j/proton-api/src/main/java/org/apache/qpid/proton/messenger/Messenger.java?rev=1502346&r1=1502345&r2=1502346&view=diff
==============================================================================
--- qpid/proton/trunk/proton-j/proton-api/src/main/java/org/apache/qpid/proton/messenger/Messenger.java (original)
+++ qpid/proton/trunk/proton-j/proton-api/src/main/java/org/apache/qpid/proton/messenger/Messenger.java Thu Jul 11 20:11:12 2013
@@ -82,6 +82,7 @@ public interface Messenger
      * to cause the messages to actually be sent out.
      */
     void put(Message message) throws MessengerException;
+
     /**
      * Blocks until the outgoing queue is empty and, in the event that
      * an outgoing window has been set, until the messages in that
@@ -91,6 +92,8 @@ public interface Messenger
      */
     void send() throws TimeoutException;
 
+    void send(int n) throws TimeoutException;
+
     /**
      * Subscribes the Messenger to messages originating from the
      * specified source. The source is an address as specified in the
@@ -131,9 +134,18 @@ public interface Messenger
      */
     void stop();
 
+    boolean stopped();
+
+    boolean work(long timeout);
+
+    void interrupt();
+
     void setTimeout(long timeInMillis);
     long getTimeout();
 
+    boolean isBlocking();
+    void setBlocking(boolean b);
+
     /**
      * Returns a count of the messages currently on the outgoing queue
      * (i.e. those that have been put() but not yet actually sent

Modified: qpid/proton/trunk/proton-j/proton-api/src/main/resources/proton.py
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-j/proton-api/src/main/resources/proton.py?rev=1502346&r1=1502345&r2=1502346&view=diff
==============================================================================
--- qpid/proton/trunk/proton-j/proton-api/src/main/resources/proton.py (original)
+++ qpid/proton/trunk/proton-j/proton-api/src/main/resources/proton.py Thu Jul 11 20:11:12 2013
@@ -24,6 +24,7 @@ except NameError:
   bytes = str
 
 from org.apache.qpid.proton import ProtonFactoryLoader, ProtonUnsupportedOperationException
+from org.apache.qpid.proton import InterruptException as Interrupt
 from org.apache.qpid.proton.engine import \
     EngineFactory, Transport as JTransport, Sender as JSender, Receiver as JReceiver, \
     Sasl, SslDomain as JSslDomain, \
@@ -44,6 +45,7 @@ from java.util import EnumSet, UUID as J
 from java.util.concurrent import TimeoutException as Timeout
 from java.nio import ByteBuffer
 from java.lang import Character as JCharacter, String as JString, Integer as JInteger
+from java.lang import NoClassDefFoundError
 
 
 
@@ -1265,6 +1267,10 @@ class Messenger(object):
   def stop(self):
     self.impl.stop()
 
+  @property
+  def stopped(self):
+    return self.impl.stopped()
+
   def subscribe(self, source):
     self.impl.subscribe(source)
 
@@ -1272,14 +1278,21 @@ class Messenger(object):
     self.impl.put(message.impl)
     return self.impl.outgoingTracker()
 
-  def send(self):
-    self.impl.send()
+  def send(self, n=-1):
+    self.impl.send(n)
 
-  def recv(self, n):
+  def recv(self, n=-1):
     self.impl.recv(n)
 
+  def work(self, t):
+    return self.impl.work(t)
+
+  def interrupt(self):
+    self.impl.interrupt()
+
   def get(self, message=None):
     result = self.impl.get()
+    if not result: print "HA"
     if message and result:
       message.impl = result
     return self.impl.incomingTracker()
@@ -1298,6 +1311,13 @@ class Messenger(object):
     self.impl.setTimeout(t)
   timeout = property(_get_timeout, _set_timeout)
 
+  def _is_blocking(self):
+    return self.impl.isBlocking()
+
+  def _set_blocking(self, b):
+    self.impl.setBlocking(b)
+
+  blocking = property(_is_blocking, _set_blocking)
   def accept(self, tracker=None):
     if tracker is None:
       tracker = self.impl.incomingTracker()
@@ -1578,7 +1598,10 @@ class SSLDomain(object):
   ANONYMOUS_PEER = JSslDomain.VerifyMode.ANONYMOUS_PEER
 
   def __init__(self, mode):
-    self._domain = engineFactory.createSslDomain()
+    try:
+      self._domain = engineFactory.createSslDomain()
+    except NoClassDefFoundError, e:
+      raise SSLUnavailable()
     self._domain.init(mode)
 
   def set_credentials(self, cert_file, key_file, password):
@@ -1724,6 +1747,7 @@ __all__ = [
            "timestamp",
            "Terminus",
            "Timeout",
+           "Interrupt",
            "Transport",
            "TransportException",
            "ulong",

Modified: qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/driver/impl/DriverImpl.java
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/driver/impl/DriverImpl.java?rev=1502346&r1=1502345&r2=1502346&view=diff
==============================================================================
--- qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/driver/impl/DriverImpl.java (original)
+++ qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/driver/impl/DriverImpl.java Thu Jul 11 20:11:12 2013
@@ -48,6 +48,8 @@ public class DriverImpl implements Drive
     private Collection<Listener> _listeners = new LinkedList();
     private Collection<Connector> _connectors = new LinkedList();
     private Logger _logger = Logger.getLogger("proton.driver");
+    private Object _wakeupLock = new Object();
+    private boolean _woken = false;
 
     DriverImpl() throws IOException
     {
@@ -56,15 +58,34 @@ public class DriverImpl implements Drive
 
     public void wakeup()
     {
+        synchronized (_wakeupLock) {
+            _woken = true;
+        }
         _selector.wakeup();
     }
 
-    public void doWait(long timeout)
+    public boolean doWait(long timeout)
     {
         try
         {
-            _selector.select(timeout);
+            boolean woken;
+            synchronized (_wakeupLock) {
+                woken = _woken;
+            }
+
+            if (woken) {
+                _selector.selectNow();
+            } else {
+                _selector.select(timeout);
+            }
             _selectedKeys = _selector.selectedKeys();
+
+            synchronized (_wakeupLock) {
+                woken = woken || _woken;
+                _woken = false;
+            }
+
+            return woken;
         }
         catch (IOException e)
         {

Modified: qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/messenger/impl/MessengerImpl.java
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/messenger/impl/MessengerImpl.java?rev=1502346&r1=1502345&r2=1502346&view=diff
==============================================================================
--- qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/messenger/impl/MessengerImpl.java (original)
+++ qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/messenger/impl/MessengerImpl.java Thu Jul 11 20:11:12 2013
@@ -29,6 +29,7 @@ import java.util.logging.Level;
 import java.util.logging.Logger;
 
 import org.apache.qpid.proton.ProtonFactoryLoader;
+import org.apache.qpid.proton.InterruptException;
 import org.apache.qpid.proton.TimeoutException;
 import org.apache.qpid.proton.driver.Connector;
 import org.apache.qpid.proton.driver.Driver;
@@ -53,6 +54,8 @@ import org.apache.qpid.proton.messenger.
 import org.apache.qpid.proton.amqp.messaging.Source;
 import org.apache.qpid.proton.amqp.messaging.Target;
 
+import org.apache.qpid.proton.amqp.Binary;
+
 public class MessengerImpl implements Messenger
 {
     @SuppressWarnings("rawtypes")
@@ -69,6 +72,7 @@ public class MessengerImpl implements Me
     private final DriverFactory _driverFactory;
     private final MessageFactory _messageFactory;
     private long _timeout = -1;
+    private boolean _blocking = true;
     private long _nextTag = 1;
     private byte[] _buffer = new byte[5*1024];
     private Driver _driver;
@@ -119,6 +123,16 @@ public class MessengerImpl implements Me
         return _timeout;
     }
 
+    public boolean isBlocking()
+    {
+        return _blocking;
+    }
+
+    public void setBlocking(boolean b)
+    {
+        _blocking = b;
+    }
+
     public void start() throws IOException
     {
         _driver = _driverFactory.createDriver();
@@ -156,15 +170,24 @@ public class MessengerImpl implements Me
                 _logger.log(Level.WARNING, "Error while closing listener", e);
             }
         }
-        try
-        {
-            waitUntil(_allClosed);
-        }
-        catch(TimeoutException e)
-        {
-            _logger.log(Level.WARNING, "Timed out while waiting for close", e);
-        }
-        _driver.destroy();
+        waitUntil(_allClosed);
+        //_driver.destroy();
+    }
+
+    public boolean stopped()
+    {
+        return _allClosed.test();
+    }
+
+    public boolean work(long timeout)
+    {
+        _worked = false;
+        return waitUntil(_workPred, timeout);
+    }
+
+    public void interrupt()
+    {
+        _driver.wakeup();
     }
 
     public void put(Message m) throws MessengerException
@@ -210,6 +233,11 @@ public class MessengerImpl implements Me
 
     public void send() throws TimeoutException
     {
+        send(-1);
+    }
+
+    public void send(int n) throws TimeoutException
+    {
         if(_logger.isLoggable(Level.FINE))
         {
             _logger.fine(this + " about to send");
@@ -249,8 +277,8 @@ public class MessengerImpl implements Me
                     int size = read((Receiver) delivery.getLink());
                     Message message = _messageFactory.createMessage();
                     message.decode(_buffer, 0, size);
-                    _incoming.add(delivery);
                     delivery.getLink().advance();
+                    _incoming.add(delivery);
                     return message;
                 }
                 else
@@ -383,27 +411,24 @@ public class MessengerImpl implements Me
 
     private int read(Receiver receiver)
     {
-        //TODO: add pending count to Delivery?
-        int total = 0;
-        int start = 0;
-        while (true)
-        {
-            int read = receiver.recv(_buffer, start, _buffer.length - start);
-            total += read;
-            if (read == (_buffer.length - start))
-            {
-                //may need to expand the buffer (is there a better test?)
-                byte[] old = _buffer;
-                _buffer = new byte[_buffer.length*2];
-                System.arraycopy(old, 0, _buffer, 0, old.length);
-                start += read;
-            }
-            else
-            {
-                break;
-            }
+        Delivery dlv = receiver.current();
+
+        if (dlv.isPartial()) {
+            throw new IllegalStateException();
+        }
+
+        int size = dlv.pending();
+
+        while (_buffer.length < size) {
+            _buffer = new byte[_buffer.length * 2];
+        }
+
+        int read = receiver.recv(_buffer, 0, _buffer.length);
+        if (read != size) {
+            throw new IllegalStateException();
         }
-        return total;
+
+        return size;
     }
 
     private void processAllConnectors()
@@ -426,6 +451,7 @@ public class MessengerImpl implements Me
         //process active listeners
         for (Listener<?> l = _driver.listener(); l != null; l = _driver.listener())
         {
+            _worked = true;
             Connector<?> c = l.accept();
             Connection connection = _engineFactory.createConnection();
             connection.setContainer(_name);
@@ -443,6 +469,7 @@ public class MessengerImpl implements Me
         //process active connectors, handling opened & closed connections as needed
         for (Connector<?> c = _driver.connector(); c != null; c = _driver.connector())
         {
+            _worked = true;
             _logger.log(Level.FINE, "Processing active connector " + c);
             try
             {
@@ -524,12 +551,23 @@ public class MessengerImpl implements Me
         }
     }
 
-    private void waitUntil(Predicate condition) throws TimeoutException
+    private boolean waitUntil(Predicate condition) throws TimeoutException
     {
-        waitUntil(condition, _timeout);
+        if (_blocking) {
+            boolean done = waitUntil(condition, _timeout);
+            if (!done) {
+                _logger.log(Level.SEVERE, String.format
+                            ("Timeout when waiting for condition %s after %s ms",
+                             condition, _timeout));
+                throw new TimeoutException();
+            }
+            return done;
+        } else {
+            return waitUntil(condition, 0);
+        }
     }
 
-    private void waitUntil(Predicate condition, long timeout) throws TimeoutException
+    private boolean waitUntil(Predicate condition, long timeout)
     {
         processAllConnectors();
 
@@ -539,23 +577,22 @@ public class MessengerImpl implements Me
         boolean wait = deadline > System.currentTimeMillis();
         boolean first = true;
         boolean done = false;
+        boolean woken = false;
 
         while (first || (!done && wait))
         {
             if (wait && !done && !first) {
-                _driver.doWait(timeout < 0 ? 0 : deadline - System.currentTimeMillis());
+                woken = _driver.doWait(timeout < 0 ? 0 : deadline - System.currentTimeMillis());
             }
             processActive();
             wait = deadline > System.currentTimeMillis();
-            done = done || condition.test();
+            done = done || woken || condition.test();
             first = false;
         }
-        if (!done)
-        {
-            _logger.log(Level.SEVERE, String.format(
-                    "Timeout when waiting for condition %s after %s ms", condition, timeout));
-            throw new TimeoutException();
+        if (woken) {
+            throw new InterruptException();
         }
+        return done;
     }
 
     private Connection lookup(String host, String service)
@@ -724,12 +761,22 @@ public class MessengerImpl implements Me
             if (_driver.connectors().iterator().hasNext()) return false;
             else return true;
         }
+    }
 
+    private boolean _worked = false;
+
+    private class WorkPred implements Predicate
+    {
+        public boolean test()
+        {
+            return _worked;
+        }
     }
 
     private final SentSettled _sentSettled = new SentSettled();
     private final MessageAvailable _messageAvailable = new MessageAvailable();
     private final AllClosed _allClosed = new AllClosed();
+    private final WorkPred _workPred = new WorkPred();
 
     private interface LinkFinder<C extends Link>
     {

Modified: qpid/proton/trunk/tests/python/proton_tests/messenger.py
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/tests/python/proton_tests/messenger.py?rev=1502346&r1=1502345&r2=1502346&view=diff
==============================================================================
--- qpid/proton/trunk/tests/python/proton_tests/messenger.py (original)
+++ qpid/proton/trunk/tests/python/proton_tests/messenger.py Thu Jul 11 20:11:12 2013
@@ -49,27 +49,16 @@ class Test(common.Test):
 
   def _safelyStopClient(self):
     existing_exception = None
+    self.server.interrupt()
     try:
-      # send a message to cause the server to promptly exit
-      msg = Message()
-      msg.address="amqp://0.0.0.0:12345"
-      self.client.put(msg)
-      try:
-        self.client.send()
-      except:
-        print "Client failed to send shutdown message due to: %s" % sys.exc_info()[1]
-        existing_exception = sys.exc_info()[1]
+      self.client.stop()
+      self.client = None
+    except:
+      print "Client failed to stop due to: %s" % sys.exc_info()[1]
+      if existing_exception:
+        raise existing_exception
+      else:
         raise
-    finally:
-      try:
-        self.client.stop()
-        self.client = None
-      except:
-        print "Client failed to stop due to: %s" % sys.exc_info()[1]
-        if existing_exception:
-          raise existing_exception
-        else:
-          raise
 
   def teardown(self):
     try:
@@ -91,8 +80,11 @@ class MessengerTest(Test):
     try:
       while self.running:
         self.server_is_running_event.set()
-        self.server.recv(self.server_credit)
-        self.process_incoming(msg)
+        try:
+          self.server.recv(self.server_credit)
+          self.process_incoming(msg)
+        except Interrupt:
+          pass
     finally:
       self.server.stop()
       self.running = False
@@ -113,7 +105,7 @@ class MessengerTest(Test):
       self.server.put(msg)
       self.server.settle()
 
-  def _testSendReceive(self, size=None):
+  def testSendReceive(self, size=None):
     self.start()
     msg = Message()
     msg.address="amqp://0.0.0.0:12345"
@@ -129,33 +121,30 @@ class MessengerTest(Test):
 
     reply = Message()
     self.client.recv(1)
-    assert self.client.incoming == 1
+    assert self.client.incoming == 1, self.client.incoming
     self.client.get(reply)
 
     assert reply.subject == "Hello World!"
     rbod = reply.body
     assert rbod == body, (rbod, body)
 
-  def testSendReceive(self):
-    self._testSendReceive()
-
   def testSendReceive1K(self):
-    self._testSendReceive(1024)
+    self.testSendReceive(1024)
 
   def testSendReceive2K(self):
-    self._testSendReceive(2*1024)
+    self.testSendReceive(2*1024)
 
   def testSendReceive4K(self):
-    self._testSendReceive(4*1024)
+    self.testSendReceive(4*1024)
 
   def testSendReceive10K(self):
-    self._testSendReceive(10*1024)
+    self.testSendReceive(10*1024)
 
   def testSendReceive100K(self):
-    self._testSendReceive(100*1024)
+    self.testSendReceive(100*1024)
 
   def testSendReceive1M(self):
-    self._testSendReceive(1024*1024)
+    self.testSendReceive(1024*1024)
 
   # PROTON-285 - prevent continually failing test
   def xtestSendBogus(self):
@@ -589,3 +578,44 @@ class MessengerTest(Test):
   def testRewriteOverrideDefault(self):
     self.client.rewrite("*", "$1")
     self._testRewrite("amqp://user:pass@host", "amqp://user:pass@host")
+
+class NBMessengerTest(common.Test):
+
+  def setup(self):
+    self.client = Messenger()
+    self.server = Messenger()
+    self.client.blocking = False
+    self.server.blocking = False
+    self.server.start()
+    self.client.start()
+    self.server.subscribe("amqp://~0.0.0.0:12345")
+
+  def pump(self):
+    while self.client.work(0) or self.server.work(0): pass
+
+  def teardown(self):
+    self.server.stop()
+    self.client.stop()
+    self.pump()
+    assert self.server.stopped
+    assert self.client.stopped
+
+  def test(self, count=1):
+    self.server.recv()
+
+    msg = Message()
+    msg.address = "amqp://0.0.0.0:12345"
+    for i in range(count):
+      msg.body = "Hello %s" % i
+      self.client.put(msg)
+
+    self.pump()
+    assert self.client.outgoing == 0, self.client.outgoing
+
+    msg2 = Message()
+    for i in range(count):
+      self.server.get(msg2)
+      assert msg2.body == "Hello %s" % i, (msg2.body, i)
+
+  def test1024(self):
+    self.test(1024)

Modified: qpid/proton/trunk/tests/ruby/proton-test
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/tests/ruby/proton-test?rev=1502346&r1=1502345&r2=1502346&view=diff
==============================================================================
--- qpid/proton/trunk/tests/ruby/proton-test (original)
+++ qpid/proton/trunk/tests/ruby/proton-test Thu Jul 11 20:11:12 2013
@@ -2,3 +2,4 @@
 
 require 'test/unit'
 require 'proton_tests/interop.rb'
+require 'proton_tests/smoke.rb'

Added: qpid/proton/trunk/tests/ruby/proton_tests/smoke.rb
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/tests/ruby/proton_tests/smoke.rb?rev=1502346&view=auto
==============================================================================
--- qpid/proton/trunk/tests/ruby/proton_tests/smoke.rb (added)
+++ qpid/proton/trunk/tests/ruby/proton_tests/smoke.rb Thu Jul 11 20:11:12 2013
@@ -0,0 +1,57 @@
+#!/usr/bin/env ruby
+
+require 'test/unit'
+require 'qpid_proton'
+
+class SmokeTest < Test::Unit::TestCase
+
+  Messenger = Qpid::Proton::Messenger
+  Message = Qpid::Proton::Message
+
+  def setup
+    @server = Messenger.new()
+    @client = Messenger.new()
+    @server.blocking = false
+    @client.blocking = false
+    @server.subscribe("~0.0.0.0:12345")
+    @server.start()
+    @client.start()
+    pump()
+  end
+
+  def pump
+    while (@server.work(0) or @client.work(0)) do end
+  end
+
+  def teardown
+    @server.stop()
+    @client.stop()
+
+    pump()
+
+    assert @client.stopped
+    assert @server.stopped
+  end
+
+  def testSmoke(count=10)
+    msg = Message.new()
+    msg.address = "0.0.0.0:12345"
+
+    @server.receive()
+
+    count.times {|i|
+      msg.content = "Hello World! #{i}"
+      @client.put(msg)
+    }
+
+    pump()
+
+    msg2 = Message.new()
+
+    count.times {|i|
+      @server.get(msg2)
+      assert msg2.content == "Hello World! #{i}"
+    }
+  end
+
+end

Modified: qpid/proton/trunk/tests/tools/apps/c/msgr-recv.c
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/tests/tools/apps/c/msgr-recv.c?rev=1502346&r1=1502345&r2=1502346&view=diff
==============================================================================
--- qpid/proton/trunk/tests/tools/apps/c/msgr-recv.c (original)
+++ qpid/proton/trunk/tests/tools/apps/c/msgr-recv.c Thu Jul 11 20:11:12 2013
@@ -257,7 +257,7 @@ int main(int argc, char** argv)
     // this will flush any pending sends
     if (pn_messenger_outgoing(messenger) > 0) {
         LOG("Calling pn_messenger_send()\n");
-        rc = pn_messenger_send(messenger);
+        rc = pn_messenger_send(messenger, -1);
         check(rc == 0, "pn_messenger_send() failed");
     }
 

Modified: qpid/proton/trunk/tests/tools/apps/c/msgr-send.c
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/tests/tools/apps/c/msgr-send.c?rev=1502346&r1=1502345&r2=1502346&view=diff
==============================================================================
--- qpid/proton/trunk/tests/tools/apps/c/msgr-send.c (original)
+++ qpid/proton/trunk/tests/tools/apps/c/msgr-send.c Thu Jul 11 20:11:12 2013
@@ -279,7 +279,7 @@ int main(int argc, char** argv)
                 }
             } else {
                 LOG("Calling pn_messenger_send()\n");
-                rc = pn_messenger_send(messenger);
+                rc = pn_messenger_send(messenger, -1);
                 check((rc == 0 || rc == PN_TIMEOUT), "pn_messenger_send() failed");
             }
         }
@@ -300,7 +300,7 @@ int main(int argc, char** argv)
         }
     } else if (pn_messenger_outgoing(messenger) > 0) {
         LOG("Calling pn_messenger_send()\n");
-        rc = pn_messenger_send(messenger);
+        rc = pn_messenger_send(messenger, -1);
         check(rc == 0, "pn_messenger_send() failed");
     }
 



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org