You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rh...@apache.org on 2013/11/05 15:47:07 UTC

svn commit: r1539013 - in /qpid/proton/trunk: proton-c/bindings/python/ proton-c/include/proton/ proton-c/src/messenger/ proton-c/src/posix/ proton-c/src/ssl/ proton-j/proton-api/src/main/java/org/apache/qpid/proton/engine/ proton-j/proton-api/src/main...

Author: rhs
Date: Tue Nov  5 14:47:07 2013
New Revision: 1539013

URL: http://svn.apache.org/r1539013
Log:
PROTON-302: added negative testing for messenger ssl; added proper validation of messenger credentials; fixed the java work queue and transport work queue implementation; added the missing Delivery.clear() method to proton-j

Modified:
    qpid/proton/trunk/proton-c/bindings/python/proton.py
    qpid/proton/trunk/proton-c/include/proton/messenger.h
    qpid/proton/trunk/proton-c/src/messenger/messenger.c
    qpid/proton/trunk/proton-c/src/posix/driver.c
    qpid/proton/trunk/proton-c/src/ssl/openssl.c
    qpid/proton/trunk/proton-j/proton-api/src/main/java/org/apache/qpid/proton/engine/Delivery.java
    qpid/proton/trunk/proton-j/proton-api/src/main/resources/proton.py
    qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/engine/impl/ConnectionImpl.java
    qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/engine/impl/DeliveryImpl.java
    qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/engine/impl/LinkImpl.java
    qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/engine/impl/ReceiverImpl.java
    qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/engine/impl/SenderImpl.java
    qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/engine/impl/TransportDelivery.java
    qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/engine/impl/TransportImpl.java
    qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/engine/impl/TransportSession.java
    qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/messenger/impl/MessengerImpl.java
    qpid/proton/trunk/tests/python/proton_tests/common.py
    qpid/proton/trunk/tests/python/proton_tests/ssl.py
    qpid/proton/trunk/tests/tools/apps/c/msgr-recv.c

Modified: qpid/proton/trunk/proton-c/bindings/python/proton.py
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/bindings/python/proton.py?rev=1539013&r1=1539012&r2=1539013&view=diff
==============================================================================
--- qpid/proton/trunk/proton-c/bindings/python/proton.py (original)
+++ qpid/proton/trunk/proton-c/bindings/python/proton.py Tue Nov  5 14:47:07 2013
@@ -142,8 +142,10 @@ EXCEPTIONS = {
 PENDING = Constant("PENDING")
 ACCEPTED = Constant("ACCEPTED")
 REJECTED = Constant("REJECTED")
+ABORTED = Constant("ABORTED")
 
 STATUSES = {
+  PN_STATUS_ABORTED: ABORTED,
   PN_STATUS_ACCEPTED: ACCEPTED,
   PN_STATUS_REJECTED: REJECTED,
   PN_STATUS_PENDING: PENDING,
@@ -3095,6 +3097,7 @@ class Driver(object):
 __all__ = [
            "API_LANGUAGE",
            "IMPLEMENTATION_LANGUAGE",
+           "ABORTED",
            "ACCEPTED",
            "AUTOMATIC",
            "PENDING",

Modified: qpid/proton/trunk/proton-c/include/proton/messenger.h
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/include/proton/messenger.h?rev=1539013&r1=1539012&r2=1539013&view=diff
==============================================================================
--- qpid/proton/trunk/proton-c/include/proton/messenger.h (original)
+++ qpid/proton/trunk/proton-c/include/proton/messenger.h Tue Nov  5 14:47:07 2013
@@ -43,7 +43,8 @@ typedef enum {
   PN_STATUS_PENDING = 1,
   PN_STATUS_ACCEPTED = 2,
   PN_STATUS_REJECTED = 3,
-  PN_STATUS_MODIFIED = 4
+  PN_STATUS_MODIFIED = 4,
+  PN_STATUS_ABORTED = 5
 } pn_status_t;
 
 /** Construct a new Messenger with the given name. The name is global.

Modified: qpid/proton/trunk/proton-c/src/messenger/messenger.c
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/src/messenger/messenger.c?rev=1539013&r1=1539012&r2=1539013&view=diff
==============================================================================
--- qpid/proton/trunk/proton-c/src/messenger/messenger.c (original)
+++ qpid/proton/trunk/proton-c/src/messenger/messenger.c Tue Nov  5 14:47:07 2013
@@ -89,12 +89,14 @@ struct pn_messenger_t {
   pn_string_t *original;
   pn_string_t *rewritten;
   bool worked;
+  int connection_error;
 };
 
 typedef struct {
   char *host;
   char *port;
   pn_subscription_t *subscription;
+  pn_ssl_domain_t *domain;
 } pn_listener_ctx_t;
 
 static pn_listener_ctx_t *pn_listener_ctx(pn_listener_t *lnr,
@@ -106,6 +108,24 @@ static pn_listener_ctx_t *pn_listener_ct
   pn_listener_ctx_t *ctx = (pn_listener_ctx_t *) pn_listener_context(lnr);
   assert(!ctx);
   ctx = (pn_listener_ctx_t *) malloc(sizeof(pn_listener_ctx_t));
+  ctx->domain = pn_ssl_domain(PN_SSL_MODE_SERVER);
+  if (messenger->certificate) {
+    int err = pn_ssl_domain_set_credentials(ctx->domain, messenger->certificate,
+                                            messenger->private_key,
+                                            messenger->password);
+
+    if (err) {
+      pn_error_format(messenger->error, PN_ERR, "invalid credentials");
+      pn_ssl_domain_free(ctx->domain);
+      free(ctx);
+      return NULL;
+    }
+  }
+
+  if (!(scheme && !strcmp(scheme, "amqps"))) {
+    pn_ssl_domain_allow_unsecured_client(ctx->domain);
+  }
+
   pn_subscription_t *sub = pn_subscription(messenger, scheme);
   ctx->subscription = sub;
   ctx->host = pn_strdup(host);
@@ -120,6 +140,7 @@ static void pn_listener_ctx_free(pn_list
   // XXX: subscriptions are freed when the messenger is freed pn_subscription_free(ctx->subscription);
   free(ctx->host);
   free(ctx->port);
+  pn_ssl_domain_free(ctx->domain);
   free(ctx);
   pn_listener_set_context(lnr, NULL);
 }
@@ -265,6 +286,7 @@ pn_messenger_t *pn_messenger(const char 
     m->address.text = pn_string(NULL);
     m->original = pn_string(NULL);
     m->rewritten = pn_string(NULL);
+    m->connection_error = 0;
   }
 
   return m;
@@ -506,24 +528,44 @@ bool pn_messenger_flow(pn_messenger_t *m
   return updated;
 }
 
-static void pn_transport_config(pn_messenger_t *messenger,
-                                pn_connector_t *connector,
-                                pn_connection_t *connection)
+static void pn_error_report(const char *pfx, const char *error)
+{
+  fprintf(stderr, "%s ERROR %s\n", pfx, error);
+}
+
+static int pn_transport_config(pn_messenger_t *messenger,
+                               pn_connector_t *connector,
+                               pn_connection_t *connection)
 {
   pn_connection_ctx_t *ctx = (pn_connection_ctx_t *) pn_connection_get_context(connection);
   pn_transport_t *transport = pn_connector_transport(connector);
   if (ctx->scheme && !strcmp(ctx->scheme, "amqps")) {
-    pn_ssl_domain_t *d = pn_ssl_domain( PN_SSL_MODE_CLIENT );
+    pn_ssl_domain_t *d = pn_ssl_domain(PN_SSL_MODE_CLIENT);
     if (messenger->certificate && messenger->private_key) {
-      pn_ssl_domain_set_credentials( d, messenger->certificate,
-                                     messenger->private_key,
-                                     messenger->password);
+      int err = pn_ssl_domain_set_credentials( d, messenger->certificate,
+                                               messenger->private_key,
+                                               messenger->password);
+      if (err) {
+        pn_error_report("CONNECTION", "invalid credentials");
+        return err;
+      }
     }
     if (messenger->trusted_certificates) {
-      pn_ssl_domain_set_trusted_ca_db(d, messenger->trusted_certificates);
-      pn_ssl_domain_set_peer_authentication(d, PN_SSL_VERIFY_PEER, NULL);
+      int err = pn_ssl_domain_set_trusted_ca_db(d, messenger->trusted_certificates);
+      if (err) {
+        pn_error_report("CONNECTION", "invalid certificate db");
+        return err;
+      }
+      err = pn_ssl_domain_set_peer_authentication(d, PN_SSL_VERIFY_PEER_NAME, NULL);
+      if (err) {
+        pn_error_report("CONNECTION", "error configuring ssl to verify peer");
+      }
     } else {
-      pn_ssl_domain_set_peer_authentication(d, PN_SSL_ANONYMOUS_PEER, NULL);
+      int err = pn_ssl_domain_set_peer_authentication(d, PN_SSL_ANONYMOUS_PEER, NULL);
+      if (err) {
+        pn_error_report("CONNECTION", "error configuring ssl for anonymous peer");
+        return err;
+      }
     }
     pn_ssl_t *ssl = pn_ssl(transport);
     pn_ssl_init(ssl, d, NULL);
@@ -538,11 +580,8 @@ static void pn_transport_config(pn_messe
     pn_sasl_mechanisms(sasl, "ANONYMOUS");
     pn_sasl_client(sasl);
   }
-}
 
-static void pn_error_report(const char *pfx, const char *error)
-{
-  fprintf(stderr, "%s ERROR %s\n", pfx, error);
+  return 0;
 }
 
 static void pn_condition_report(const char *pfx, pn_condition_t *condition)
@@ -746,6 +785,9 @@ void pni_messenger_reclaim(pn_messenger_
       pni_entry_t *e = (pni_entry_t *) pn_delivery_get_context(d);
       if (e) {
         pni_entry_set_delivery(e, NULL);
+        if (pn_delivery_buffered(d)) {
+          pni_entry_set_status(e, PN_STATUS_ABORTED);
+        }
       }
       d = pn_unsettled_next(d);
     }
@@ -819,18 +861,8 @@ int pn_messenger_tsync(pn_messenger_t *m
       pn_connector_t *c = pn_listener_accept(l);
       pn_transport_t *t = pn_connector_transport(c);
 
-      pn_ssl_domain_t *d = pn_ssl_domain( PN_SSL_MODE_SERVER );
-      if (messenger->certificate) {
-        pn_ssl_domain_set_credentials(d, messenger->certificate,
-                                      messenger->private_key,
-                                      messenger->password);
-      }
-      if (!(scheme && !strcmp(scheme, "amqps"))) {
-        pn_ssl_domain_allow_unsecured_client(d);
-      }
       pn_ssl_t *ssl = pn_ssl(t);
-      pn_ssl_init(ssl, d, NULL);
-      pn_ssl_domain_free( d );
+      pn_ssl_init(ssl, ctx->domain, NULL);
 
       pn_sasl_t *sasl = pn_sasl(t);
       pn_sasl_mechanisms(sasl, "ANONYMOUS");
@@ -917,6 +949,7 @@ int pn_messenger_stop(pn_messenger_t *me
     pn_listener_t *prev = l;
     l = pn_listener_next(l);
     pn_listener_ctx_free(prev);
+    pn_listener_close(prev);
     pn_listener_free(prev);
   }
 
@@ -965,6 +998,8 @@ static int pni_route(pn_messenger_t *mes
 
 pn_connection_t *pn_messenger_resolve(pn_messenger_t *messenger, const char *address, char **name)
 {
+  assert(messenger);
+  messenger->connection_error = 0;
   char domain[1024];
   if (address && sizeof(domain) < strlen(address) + 1) {
     pn_error_format(messenger->error, PN_ERR,
@@ -995,7 +1030,11 @@ pn_connection_t *pn_messenger_resolve(pn
 
     lnr = pn_listener(messenger->driver, host, port ? port : default_port(scheme), NULL);
     if (lnr) {
-      pn_listener_ctx(lnr, messenger, scheme, host, port);
+      pn_listener_ctx_t *ctx = pn_listener_ctx(lnr, messenger, scheme, host, port);
+      if (!ctx) {
+        pn_listener_close(lnr);
+        pn_listener_free(lnr);
+      }
     } else {
       pn_error_format(messenger->error, PN_ERR,
                       "unable to bind to address %s: %s:%s", address, host, port,
@@ -1045,7 +1084,15 @@ pn_connection_t *pn_messenger_resolve(pn
 
   pn_connection_t *connection =
     pn_messenger_connection(messenger, connector, scheme, user, pass, host, port);
-  pn_transport_config(messenger, connector, connection);
+  err = pn_transport_config(messenger, connector, connection);
+  if (err) {
+    pni_messenger_reclaim(messenger, connection);
+    pn_connector_close(connector);
+    pn_connector_free(connector);
+    messenger->connection_error = err;
+    return NULL;
+  }
+
   pn_connection_open(connection);
   pn_connector_set_connection(connector, connection);
 
@@ -1117,7 +1164,13 @@ pn_subscription_t *pn_messenger_subscrib
                                      port ? port : default_port(scheme), NULL);
     if (lnr) {
       pn_listener_ctx_t *ctx = pn_listener_ctx(lnr, messenger, scheme, host, port);
-      return ctx->subscription;
+      if (ctx) {
+        return ctx->subscription;
+      } else {
+        pn_listener_close(lnr);
+        pn_listener_free(lnr);
+        return NULL;
+      }
     } else {
       pn_error_format(messenger->error, PN_ERR,
                       "unable to subscribe to address %s: %s", source,
@@ -1181,6 +1234,16 @@ static void outward_munge(pn_messenger_t
   if (heapbuf) free (heapbuf);
 }
 
+int pni_bump_out(pn_messenger_t *messenger, const char *address)
+{
+  pni_entry_t *entry = pni_store_get(messenger->outgoing, address);
+  if (!entry) return 0;
+
+  pni_entry_set_status(entry, PN_STATUS_ABORTED);
+  pni_entry_free(entry);
+  return 0;
+}
+
 int pni_pump_out(pn_messenger_t *messenger, const char *address, pn_link_t *sender)
 {
   pni_entry_t *entry = pni_store_get(messenger->outgoing, address);
@@ -1289,8 +1352,18 @@ int pn_messenger_put(pn_messenger_t *mes
       pni_restore(messenger, msg);
       pn_buffer_append(buf, encoded, size); // XXX
       pn_link_t *sender = pn_messenger_target(messenger, address);
-      if (!sender) return 0;
-      return pni_pump_out(messenger, address, sender);
+      if (!sender) {
+        int err = pn_error_code(messenger->error);
+        if (err) {
+          return err;
+        } else if (messenger->connection_error) {
+          return pni_bump_out(messenger, address);
+        } else {
+          return 0;
+        }
+      } else {
+        return pni_pump_out(messenger, address, sender);
+      }
     }
   }
 

Modified: qpid/proton/trunk/proton-c/src/posix/driver.c
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/src/posix/driver.c?rev=1539013&r1=1539012&r2=1539013&view=diff
==============================================================================
--- qpid/proton/trunk/proton-c/src/posix/driver.c (original)
+++ qpid/proton/trunk/proton-c/src/posix/driver.c Tue Nov  5 14:47:07 2013
@@ -101,6 +101,7 @@ struct pn_listener_t {
   int idx;
   bool pending;
   int fd;
+  bool closed;
   void *context;
 };
 
@@ -213,6 +214,7 @@ pn_listener_t *pn_listener_fd(pn_driver_
   l->idx = 0;
   l->pending = false;
   l->fd = fd;
+  l->closed = false;
   l->context = context;
 
   pn_driver_add_listener(driver, l);
@@ -305,9 +307,11 @@ pn_connector_t *pn_listener_accept(pn_li
 void pn_listener_close(pn_listener_t *l)
 {
   if (!l) return;
+  if (l->closed) return;
 
   if (close(l->fd) == -1)
     perror("close");
+  l->closed = true;
 }
 
 void pn_listener_free(pn_listener_t *l)

Modified: qpid/proton/trunk/proton-c/src/ssl/openssl.c
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/src/ssl/openssl.c?rev=1539013&r1=1539012&r2=1539013&view=diff
==============================================================================
--- qpid/proton/trunk/proton-c/src/ssl/openssl.c (original)
+++ qpid/proton/trunk/proton-c/src/ssl/openssl.c Tue Nov  5 14:47:07 2013
@@ -182,6 +182,7 @@ static void _log_clear_data(pn_ssl_t *ss
 // unrecoverable SSL failure occured, notify transport and generate error code.
 static int ssl_failed(pn_ssl_t *ssl)
 {
+    SSL_set_shutdown(ssl->ssl, SSL_SENT_SHUTDOWN|SSL_RECEIVED_SHUTDOWN);
   ssl->ssl_closed = true;
   ssl->app_input_closed = ssl->app_output_closed = PN_ERR;
   // fake a shutdown so the i/o processing code will close properly

Modified: qpid/proton/trunk/proton-j/proton-api/src/main/java/org/apache/qpid/proton/engine/Delivery.java
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-j/proton-api/src/main/java/org/apache/qpid/proton/engine/Delivery.java?rev=1539013&r1=1539012&r2=1539013&view=diff
==============================================================================
--- qpid/proton/trunk/proton-j/proton-api/src/main/java/org/apache/qpid/proton/engine/Delivery.java (original)
+++ qpid/proton/trunk/proton-j/proton-api/src/main/java/org/apache/qpid/proton/engine/Delivery.java Tue Nov  5 14:47:07 2013
@@ -98,6 +98,8 @@ public interface Delivery
      */
     public boolean isUpdated();
 
+    public void clear();
+
     public boolean isPartial();
 
     public int pending();

Modified: qpid/proton/trunk/proton-j/proton-api/src/main/resources/proton.py
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-j/proton-api/src/main/resources/proton.py?rev=1539013&r1=1539012&r2=1539013&view=diff
==============================================================================
--- qpid/proton/trunk/proton-j/proton-api/src/main/resources/proton.py (original)
+++ qpid/proton/trunk/proton-j/proton-api/src/main/resources/proton.py Tue Nov  5 14:47:07 2013
@@ -1405,6 +1405,12 @@ class Messenger(object):
     self.impl.setOutgoingWindow(window)
   outgoing_window = property(_get_outgoing_window, _set_outgoing_window)
 
+  def _get_certificate(self):
+    raise Skipped()
+  def _set_certificate(self, xxx):
+    raise Skipped()
+  certificate = property(_get_certificate, _set_certificate)
+
 
 class Message(object):
 

Modified: qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/engine/impl/ConnectionImpl.java
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/engine/impl/ConnectionImpl.java?rev=1539013&r1=1539012&r2=1539013&view=diff
==============================================================================
--- qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/engine/impl/ConnectionImpl.java (original)
+++ qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/engine/impl/ConnectionImpl.java Tue Nov  5 14:47:07 2013
@@ -384,35 +384,52 @@ public class ConnectionImpl extends Endp
 
     void removeWork(DeliveryImpl delivery)
     {
+        if (!delivery._work) return;
+
+        DeliveryImpl next = delivery.getWorkNext();
+        DeliveryImpl prev = delivery.getWorkPrev();
+
+        if (prev != null) {
+            prev.setWorkNext(next);
+        }
+
+        if (next != null) {
+            next.setWorkPrev(prev);
+        }
+
+
         if(_workHead == delivery)
         {
-            _workHead = delivery.getWorkNext();
+            _workHead = next;
 
         }
+
         if(_workTail == delivery)
         {
-            _workTail = delivery.getWorkPrev();
+            _workTail = prev;
         }
+
+        delivery._work = false;
     }
 
     void addWork(DeliveryImpl delivery)
     {
-        if(_workHead != delivery && delivery.getWorkNext() == null && delivery.getWorkPrev() == null)
-        {
-            if(_workTail == null)
-            {
-                delivery.setWorkNext(null);
-                delivery.setWorkPrev(null);
-                _workHead = _workTail = delivery;
-            }
-            else
-            {
-                _workTail.setWorkNext(delivery);
-                delivery.setWorkPrev(_workTail);
-                _workTail = delivery;
-                delivery.setWorkNext(null);
-            }
+        if (delivery._work) return;
+
+        delivery.setWorkNext(null);
+        delivery.setWorkPrev(_workTail);
+
+        if (_workTail != null) {
+            _workTail.setWorkNext(delivery);
+        }
+
+        _workTail = delivery;
+
+        if (_workHead == null) {
+            _workHead = delivery;
         }
+
+        delivery._work = true;
     }
 
     public Iterator<DeliveryImpl> getWorkSequence()
@@ -469,49 +486,68 @@ public class ConnectionImpl extends Endp
 
     public void removeTransportWork(DeliveryImpl delivery)
     {
-        DeliveryImpl oldHead = _transportWorkHead;
-        DeliveryImpl oldTail = _transportWorkTail;
+        if (!delivery._transportWork) return;
+
+        DeliveryImpl next = delivery.getTransportWorkNext();
+        DeliveryImpl prev = delivery.getTransportWorkPrev();
+
+        if (prev != null) {
+            prev.setTransportWorkNext(next);
+        }
+
+        if (next != null) {
+            next.setTransportWorkPrev(prev);
+        }
+
+
         if(_transportWorkHead == delivery)
         {
-            _transportWorkHead = delivery.getTransportWorkNext();
+            _transportWorkHead = next;
 
         }
+
         if(_transportWorkTail == delivery)
         {
-            _transportWorkTail = delivery.getTransportWorkPrev();
+            _transportWorkTail = prev;
         }
-    }
 
+        delivery._transportWork = false;
+    }
 
     void addTransportWork(DeliveryImpl delivery)
     {
-        if(_transportWorkTail == null)
-        {
-            delivery.setTransportWorkNext(null);
-            delivery.setTransportWorkPrev(null);
-            _transportWorkHead = _transportWorkTail = delivery;
-        }
-        else
-        {
+        if (delivery._transportWork) return;
+
+        delivery.setTransportWorkNext(null);
+        delivery.setTransportWorkPrev(_transportWorkTail);
+
+        if (_transportWorkTail != null) {
             _transportWorkTail.setTransportWorkNext(delivery);
-            delivery.setTransportWorkPrev(_transportWorkTail);
-            _transportWorkTail = delivery;
-            delivery.setTransportWorkNext(null);
         }
+
+        _transportWorkTail = delivery;
+
+        if (_transportWorkHead == null) {
+            _transportWorkHead = delivery;
+        }
+
+        delivery._transportWork = true;
     }
 
     void workUpdate(DeliveryImpl delivery)
     {
         if(delivery != null)
         {
-            LinkImpl link = delivery.getLink();
-            if(link.workUpdate(delivery))
+            if(!delivery.isSettled() &&
+               (delivery.isReadable() ||
+                delivery.isWritable() ||
+                delivery.isUpdated()))
             {
                 addWork(delivery);
             }
             else
             {
-                delivery.clearWork();
+                removeWork(delivery);
             }
         }
     }

Modified: qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/engine/impl/DeliveryImpl.java
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/engine/impl/DeliveryImpl.java?rev=1539013&r1=1539012&r2=1539013&view=diff
==============================================================================
--- qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/engine/impl/DeliveryImpl.java (original)
+++ qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/engine/impl/DeliveryImpl.java Tue Nov  5 14:47:07 2013
@@ -33,9 +33,11 @@ public class DeliveryImpl implements Del
 
     private DeliveryImpl _workNext;
     private DeliveryImpl _workPrev;
+    boolean _work;
 
     private DeliveryImpl _transportWorkNext;
     private DeliveryImpl _transportWorkPrev;
+    boolean _transportWork;
 
     private Object _context;
 
@@ -119,6 +121,10 @@ public class DeliveryImpl implements Del
 
     public void settle()
     {
+        if (_settled) {
+            return;
+        }
+
         _settled = true;
         _link.decrementUnsettled();
         if(!_remoteSettled)
@@ -143,7 +149,7 @@ public class DeliveryImpl implements Del
         {
             _linkNext._linkPrevious = _linkPrevious;
         }
-        clearWork();
+        updateWork();
     }
 
     DeliveryImpl getLinkNext()
@@ -199,102 +205,24 @@ public class DeliveryImpl implements Del
         {
             _dataSize =  consumed = 0;
         }
-        if(_dataSize == 0)
-        {
-            clearFlag(IO_WORK);
-        }
         return (_complete && consumed == 0) ? Transport.END_OF_STREAM : consumed;  //TODO - Implement
     }
 
-    private void clearFlag(int ioWork)
-    {
-        _flags = _flags & (~IO_WORK);
-        if(_flags == 0)
-        {
-            clearWork();
-        }
-    }
-
-    void clearWork()
-    {
-        getLink().getConnectionImpl().removeWork(this);
-        if(_workPrev != null)
-        {
-            _workPrev.setWorkNext(_workNext);
-        }
-        if(_workNext != null)
-        {
-            _workNext.setWorkPrev(_workPrev);
-
-        }
-        _workPrev = null;
-    }
-
-    void addToWorkList()
-    {
-        getLink().getConnectionImpl().addWork(this);
-    }
-
-    void addIOWork()
+    void updateWork()
     {
-        setFlag(IO_WORK);
-    }
-
-    private void setFlag(int flag)
-    {
-        boolean addWork;
-        if(flag == IO_WORK && (_flags & flag) == 0)
-        {
-            clearWork();
-            addWork = true;
-        }
-        else
-        {
-            addWork = (_flags == 0);
-        }
-        _flags = _flags | flag;
-        if(addWork)
-        {
-            addToWorkList();
-        }
-    }
-
-
-    private void clearTransportFlag(int ioWork)
-    {
-        _flags = _flags & (~IO_WORK);
-        if(_flags == 0)
-        {
-            clearTransportWork();
-        }
+        getLink().getConnectionImpl().workUpdate(this);
     }
 
     DeliveryImpl clearTransportWork()
     {
         DeliveryImpl next = _transportWorkNext;
         getLink().getConnectionImpl().removeTransportWork(this);
-        if(_transportWorkPrev != null)
-        {
-            _transportWorkPrev.setTransportWorkNext(_transportWorkNext);
-        }
-        if(_transportWorkNext != null)
-        {
-            _transportWorkNext.setTransportWorkPrev(_transportWorkPrev);
-
-        }
-        _transportWorkNext = null;
-        _transportWorkPrev = null;
         return next;
     }
 
     void addToTransportWorkList()
     {
-        if(_transportWorkNext == null
-           && _transportWorkPrev == null
-           && getLink().getConnectionImpl().getTransportWorkHead() != this)
-        {
-            getLink().getConnectionImpl().addTransportWork(this);
-        }
+        getLink().getConnectionImpl().addTransportWork(this);
     }
 
 
@@ -393,8 +321,7 @@ public class DeliveryImpl implements Del
     public boolean isReadable()
     {
         return getLink() instanceof ReceiverImpl
-                && getLink().current() == this
-                && _dataSize > 0;
+            && getLink().current() == this;
     }
 
     void setComplete()
@@ -418,6 +345,12 @@ public class DeliveryImpl implements Del
         return _updated;
     }
 
+    public void clear()
+    {
+        _updated = false;
+        getLink().getConnectionImpl().workUpdate(this);
+    }
+
 
     void setDone()
     {
@@ -435,6 +368,20 @@ public class DeliveryImpl implements Del
         _updated = true;
     }
 
+    boolean isBuffered()
+    {
+        if (_remoteSettled) return false;
+        if (getLink() instanceof SenderImpl) {
+            if (isDone()) {
+                return false;
+            } else {
+                return _complete || _dataSize > 0;
+            }
+        } else {
+            return false;
+        }
+    }
+
     public Object getContext()
     {
         return _context;

Modified: qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/engine/impl/LinkImpl.java
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/engine/impl/LinkImpl.java?rev=1539013&r1=1539012&r2=1539013&view=diff
==============================================================================
--- qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/engine/impl/LinkImpl.java (original)
+++ qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/engine/impl/LinkImpl.java Tue Nov  5 14:47:07 2013
@@ -231,12 +231,6 @@ public abstract class LinkImpl extends E
 
     abstract TransportLink getTransportLink();
 
-    /**
-     * TODO:  Confirm What does this method does.  It seems to
-     * merely make an observation rather than mutate state.  Rename???
-     */
-    abstract boolean workUpdate(DeliveryImpl delivery);
-
     public int getCredit()
     {
         return _credit;

Modified: qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/engine/impl/ReceiverImpl.java
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/engine/impl/ReceiverImpl.java?rev=1539013&r1=1539012&r2=1539013&view=diff
==============================================================================
--- qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/engine/impl/ReceiverImpl.java (original)
+++ qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/engine/impl/ReceiverImpl.java Tue Nov  5 14:47:07 2013
@@ -120,12 +120,6 @@ public class ReceiverImpl extends LinkIm
         return _transportReceiver;
     }
 
-    @Override
-    boolean workUpdate(DeliveryImpl delivery)
-    {
-        return (delivery == current());
-    }
-
     public void drain(int credit)
     {
         flow(credit);

Modified: qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/engine/impl/SenderImpl.java
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/engine/impl/SenderImpl.java?rev=1539013&r1=1539012&r2=1539013&view=diff
==============================================================================
--- qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/engine/impl/SenderImpl.java (original)
+++ qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/engine/impl/SenderImpl.java Tue Nov  5 14:47:07 2013
@@ -80,6 +80,9 @@ public class SenderImpl  extends LinkImp
     public boolean advance()
     {
         DeliveryImpl delivery = current();
+        if (delivery != null) {
+            delivery.setComplete();
+        }
 
         boolean advance = super.advance();
         if(advance && _offered > 0)
@@ -114,12 +117,6 @@ public class SenderImpl  extends LinkImp
 
 
     @Override
-    boolean workUpdate(DeliveryImpl delivery)
-    {
-        return (delivery == current()) && hasCredit();
-    }
-
-    @Override
     public void setCredit(int credit)
     {
         super.setCredit(credit);

Modified: qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/engine/impl/TransportDelivery.java
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/engine/impl/TransportDelivery.java?rev=1539013&r1=1539012&r2=1539013&view=diff
==============================================================================
--- qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/engine/impl/TransportDelivery.java (original)
+++ qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/engine/impl/TransportDelivery.java Tue Nov  5 14:47:07 2013
@@ -60,6 +60,6 @@ public class TransportDelivery
     void settled()
     {
         _transportLink.settled(this);
-        _delivery.clearWork();
+        _delivery.updateWork();
     }
 }

Modified: qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/engine/impl/TransportImpl.java
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/engine/impl/TransportImpl.java?rev=1539013&r1=1539012&r2=1539013&view=diff
==============================================================================
--- qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/engine/impl/TransportImpl.java (original)
+++ qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/engine/impl/TransportImpl.java Tue Nov  5 14:47:07 2013
@@ -420,7 +420,6 @@ public class TransportImpl extends Endpo
         if(_connectionEndpoint != null)
         {
             DeliveryImpl delivery = _connectionEndpoint.getTransportWorkHead();
-
             while(delivery != null)
             {
                 LinkImpl link = delivery.getLink();
@@ -513,7 +512,7 @@ public class TransportImpl extends Endpo
             }
         }
 
-        if(wasDone)
+        if(wasDone && delivery.getLocalState() != null)
         {
             TransportDelivery tpDelivery = delivery.getTransportDelivery();
             Disposition disposition = new Disposition();
@@ -531,7 +530,7 @@ public class TransportImpl extends Endpo
                        null);
         }
 
-        return delivery.isDone();
+        return !delivery.isBuffered();
     }
 
     private boolean processTransportWorkReceiver(DeliveryImpl delivery,

Modified: qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/engine/impl/TransportSession.java
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/engine/impl/TransportSession.java?rev=1539013&r1=1539012&r2=1539013&view=diff
==============================================================================
--- qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/engine/impl/TransportSession.java (original)
+++ qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/engine/impl/TransportSession.java Tue Nov  5 14:47:07 2013
@@ -286,7 +286,7 @@ class TransportSession
             }
             getSession().incrementIncomingBytes(payload.getLength());
         }
-        delivery.addIOWork();
+        delivery.updateWork();
 
 
         if(!(transfer.getMore() || transfer.getAborted()))
@@ -386,7 +386,7 @@ class TransportSession
                     delivery.setRemoteSettled(true);
                     unsettledDeliveries.remove(id);
                 }
-                delivery.addToWorkList();
+                delivery.updateWork();
             }
             id = id.add(UnsignedInteger.ONE);
         }

Modified: qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/messenger/impl/MessengerImpl.java
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/messenger/impl/MessengerImpl.java?rev=1539013&r1=1539012&r2=1539013&view=diff
==============================================================================
--- qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/messenger/impl/MessengerImpl.java (original)
+++ qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/messenger/impl/MessengerImpl.java Tue Nov  5 14:47:07 2013
@@ -352,6 +352,7 @@ public class MessengerImpl implements Me
                 }
             }
         }
+
         return null;
     }
 
@@ -609,8 +610,9 @@ public class MessengerImpl implements Me
             {
                 delivery.disposition(delivery.getRemoteState());
             }
-            //TODO: delivery.clear(); What's the equivalent in java?
-            delivery = delivery.getWorkNext();
+            Delivery next = delivery.getWorkNext();
+            delivery.clear();
+            delivery = next;
         }
         _outgoing.slide();
 

Modified: qpid/proton/trunk/tests/python/proton_tests/common.py
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/tests/python/proton_tests/common.py?rev=1539013&r1=1539012&r2=1539013&view=diff
==============================================================================
--- qpid/proton/trunk/tests/python/proton_tests/common.py (original)
+++ qpid/proton/trunk/tests/python/proton_tests/common.py Tue Nov  5 14:47:07 2013
@@ -20,7 +20,7 @@
 from random import randint
 from threading import Thread
 from socket import socket, AF_INET, SOCK_STREAM
-from subprocess import Popen,PIPE
+from subprocess import Popen,PIPE,STDOUT
 import sys, os
 from proton import Driver, Connection, Transport, SASL, Endpoint, Delivery
 
@@ -333,7 +333,7 @@ class MessengerApp(object):
             print("COMMAND='%s'" % str(cmd))
         #print("ENV='%s'" % str(os.environ.copy()))
         try:
-            self._process = Popen(cmd, stdout=PIPE, bufsize=4096)
+            self._process = Popen(cmd, stdout=PIPE, stderr=STDOUT, bufsize=4096)
         except OSError, e:
             assert False, "Unable to execute command '%s', is it in your PATH?" % cmd[0]
         self._ready()  # wait for it to initialize

Modified: qpid/proton/trunk/tests/python/proton_tests/ssl.py
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/tests/python/proton_tests/ssl.py?rev=1539013&r1=1539012&r2=1539013&view=diff
==============================================================================
--- qpid/proton/trunk/tests/python/proton_tests/ssl.py (original)
+++ qpid/proton/trunk/tests/python/proton_tests/ssl.py Tue Nov  5 14:47:07 2013
@@ -23,12 +23,17 @@ from proton import *
 from common import Skipped, pump
 
 
-class SslTest(common.Test):
+def _testpath(file):
+    """ Set the full path to the certificate,keyfile, etc. for the test.
+    """
+    return os.path.join(os.path.dirname(__file__),
+                        "ssl_db/%s" % file)
 
-    _timeout = 60
+class SslTest(common.Test):
 
     def __init__(self, *args):
         common.Test.__init__(self, *args)
+        self._testpath = _testpath
 
     def setup(self):
         try:
@@ -59,12 +64,6 @@ class SslTest(common.Test):
     def _pump(self, ssl_client, ssl_server, buffer_size=1024):
         pump(ssl_client.transport, ssl_server.transport, buffer_size)
 
-    def _testpath(self, file):
-        """ Set the full path to the certificate,keyfile, etc. for the test.
-        """
-        return os.path.join(os.path.dirname(__file__),
-                            "ssl_db/%s" % file)
-
     def _do_handshake(self, client, server):
         """ Attempt to connect client to server. Will throw a TransportException if the SSL
         handshake fails.
@@ -695,13 +694,13 @@ class SslTest(common.Test):
         receiver = common.MessengerReceiverC()
         receiver.subscriptions = ["amqps://~0.0.0.0:%s" % port]
         receiver.receive_count = 1
-        receiver.timeout = SslTest._timeout
+        receiver.timeout = self.timeout
         receiver.start()
 
         sender = common.MessengerSenderC()
         sender.targets = ["amqps://0.0.0.0:%s/X" % port]
         sender.send_count = 1
-        sender.timeout = SslTest._timeout
+        sender.timeout = self.timeout
         sender.start()
         sender.wait()
         assert sender.status() == 0, "Command '%s' failed" % str(sender.cmdline())
@@ -717,10 +716,11 @@ class SslTest(common.Test):
         receiver = common.MessengerReceiverC()
         receiver.subscriptions = ["amqps://~0.0.0.0:%s" % port]
         receiver.receive_count = 1
-        receiver.timeout = SslTest._timeout
-        # Note hack - we use the client-certificate for the _server_ because
-        # the client-certificate's common name field is "127.0.0.1", which will
-        # match the target address used by the sender.
+        receiver.timeout = self.timeout
+        # Note hack - by default we use the client-certificate for the
+        # _server_ because the client-certificate's common name field
+        # is "127.0.0.1", which will match the target address used by
+        # the sender.
         receiver.certificate = self._testpath("client-certificate.pem")
         receiver.privatekey = self._testpath("client-private-key.pem")
         receiver.password = "client-password"
@@ -729,7 +729,7 @@ class SslTest(common.Test):
         sender = common.MessengerSenderC()
         sender.targets = ["amqps://127.0.0.1:%s/X" % port]
         sender.send_count = 1
-        sender.timeout = SslTest._timeout
+        sender.timeout = self.timeout
         sender.ca_db = self._testpath("ca-certificate.pem")
         sender.start()
         sender.wait()
@@ -738,7 +738,6 @@ class SslTest(common.Test):
         receiver.wait()
         assert receiver.status() == 0, "Command '%s' failed" % str(receiver.cmdline())
 
-
     def DISABLED_test_defaults_valgrind(self):
         """ Run valgrind over a simple SSL connection (no certificates)
         """
@@ -751,13 +750,13 @@ class SslTest(common.Test):
         receiver = common.MessengerReceiverValgrind()
         receiver.subscriptions = ["amqps://~127.0.0.1:%s" % port]
         receiver.receive_count = 1
-        receiver.timeout = SslTest._timeout
+        receiver.timeout = self.timeout
         receiver.start()
 
         sender = common.MessengerSenderValgrind()
         sender.targets = ["amqps://127.0.0.1:%s/X" % port]
         sender.send_count = 1
-        sender.timeout = SslTest._timeout
+        sender.timeout = self.timeout
         sender.start()
         sender.wait()
         assert sender.status() == 0, "Command '%s' failed" % str(sender.cmdline())
@@ -773,3 +772,116 @@ class SslTest(common.Test):
         # self.client_domain.set_peer_authentication( SSLDomain.VERIFY_PEER )
 
 
+class MessengerSSLTests(common.Test):
+
+    def setup(self):
+        self.server = Messenger()
+        self.client = Messenger()
+        self.server.blocking = False
+        self.client.blocking = False
+
+    def teardown(self):
+        self.server.stop()
+        self.client.stop()
+        self.pump()
+        assert self.server.stopped
+        assert self.client.stopped
+
+    def pump(self, timeout=0):
+        while self.client.work(0) or self.server.work(0): pass
+        self.client.work(timeout)
+        self.server.work(timeout)
+        while self.client.work(0) or self.server.work(0): pass
+
+    def test_server_credentials(self,
+                                cert="server-certificate.pem",
+                                key="server-private-key.pem",
+                                password="server-password",
+                                exception=None):
+        self.server.certificate = _testpath(cert)
+        self.server.private_key = _testpath(key)
+        self.server.password = password
+        try:
+            self.server.start()
+            self.server.subscribe("amqps://~0.0.0.0:12345")
+            if exception is not None:
+                assert False, "expected failure did not occur"
+        except MessengerException, e:
+            if exception:
+                assert exception in str(e), str(e)
+            else:
+                raise e
+
+    def test_server_credentials_bad_cert(self):
+        self.test_server_credentials(cert="bad",
+                                     exception="invalid credentials")
+
+    def test_server_credentials_bad_key(self):
+        self.test_server_credentials(key="bad",
+                                     exception="invalid credentials")
+
+    def test_server_credentials_bad_password(self):
+        self.test_server_credentials(password="bad",
+                                     exception="invalid credentials")
+
+    def test_client_credentials(self,
+                                trusted="ca-certificate.pem",
+                                cert="client-certificate.pem",
+                                key="client-private-key.pem",
+                                password="client-password",
+                                altserv=False,
+                                fail=False):
+        if altserv:
+            self.server.certificate = _testpath("bad-server-certificate.pem")
+            self.server.private_key = _testpath("bad-server-private-key.pem")
+            self.server.password = "server-password"
+        else:
+            self.server.certificate = _testpath("client-certificate.pem")
+            self.server.private_key = _testpath("client-private-key.pem")
+            self.server.password = "client-password"
+        self.server.start()
+        self.server.subscribe("amqps://~0.0.0.0:12345")
+        self.server.incoming_window = 10
+
+        self.client.trusted_certificates = _testpath(trusted)
+        self.client.certificate = _testpath(cert)
+        self.client.private_key = _testpath(key)
+        self.client.password = password
+        self.client.outgoing_window = 10
+        self.client.start()
+
+        self.server.recv()
+
+        msg = Message()
+        msg.address = "amqps://127.0.0.1:12345"
+        msg.body = "Hello World!"
+        trk = self.client.put(msg)
+        self.client.send()
+
+        self.pump()
+
+        if fail:
+            assert self.server.incoming == 0, self.server.incoming
+            assert self.client.status(trk) == ABORTED, self.client.status(trk)
+        else:
+            assert self.server.incoming == 1, self.server.incoming
+
+            rmsg = Message()
+            self.server.get(rmsg)
+            assert rmsg.body == msg.body
+            self.server.accept()
+            self.pump()
+
+            assert self.client.status(trk) == ACCEPTED, self.client.status(trk)
+
+    def test_client_credentials_bad_cert(self):
+        self.test_client_credentials(cert="bad", fail=True)
+
+    def test_client_credentials_bad_trusted(self):
+        self.test_client_credentials(trusted="bad", fail=True)
+
+    def test_client_credentials_bad_password(self):
+        self.test_client_credentials(password="bad", fail=True)
+
+    def test_client_credentials_untrusted(self):
+        self.test_client_credentials(altserv=True, fail=True)

Modified: qpid/proton/trunk/tests/tools/apps/c/msgr-recv.c
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/tests/tools/apps/c/msgr-recv.c?rev=1539013&r1=1539012&r2=1539013&view=diff
==============================================================================
--- qpid/proton/trunk/tests/tools/apps/c/msgr-recv.c (original)
+++ qpid/proton/trunk/tests/tools/apps/c/msgr-recv.c Tue Nov  5 14:47:07 2013
@@ -167,21 +167,25 @@ int main(int argc, char** argv)
     /* load the various command line options if they're set */
     if (opts.certificate) {
         rc = pn_messenger_set_certificate(messenger, opts.certificate);
+        check_messenger(messenger);
         check( rc == 0, "Failed to set certificate" );
     }
 
     if (opts.privatekey) {
         rc = pn_messenger_set_private_key(messenger, opts.privatekey);
+        check_messenger(messenger);
         check( rc == 0, "Failed to set private key" );
     }
 
     if (opts.password) {
         rc = pn_messenger_set_password(messenger, opts.password);
+        check_messenger(messenger);
         check( rc == 0, "Failed to set password" );
     }
 
     if (opts.ca_db) {
         rc = pn_messenger_set_trusted_certificates(messenger, opts.ca_db);
+        check_messenger(messenger);
         check( rc == 0, "Failed to set trusted CA database" );
     }
 
@@ -213,8 +217,8 @@ int main(int argc, char** argv)
 
         LOG("Calling pn_messenger_recv(%d)\n", opts.recv_count);
         rc = pn_messenger_recv(messenger, opts.recv_count);
-        check(rc == 0 || (opts.timeout == 0 && rc == PN_TIMEOUT), "pn_messenger_recv() failed");
         check_messenger(messenger);
+        check(rc == 0 || (opts.timeout == 0 && rc == PN_TIMEOUT), "pn_messenger_recv() failed");
 
         // start the timer only after receiving the first msg
         if (received == 0) statistics_start( &stats );
@@ -258,6 +262,7 @@ int main(int argc, char** argv)
     if (pn_messenger_outgoing(messenger) > 0) {
         LOG("Calling pn_messenger_send()\n");
         rc = pn_messenger_send(messenger, -1);
+        check_messenger(messenger);
         check(rc == 0, "pn_messenger_send() failed");
     }
 



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org