You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rh...@apache.org on 2013/11/05 15:47:07 UTC
svn commit: r1539013 - in /qpid/proton/trunk: proton-c/bindings/python/
proton-c/include/proton/ proton-c/src/messenger/ proton-c/src/posix/
proton-c/src/ssl/
proton-j/proton-api/src/main/java/org/apache/qpid/proton/engine/
proton-j/proton-api/src/main...
Author: rhs
Date: Tue Nov 5 14:47:07 2013
New Revision: 1539013
URL: http://svn.apache.org/r1539013
Log:
PROTON-302: added negative testing for messenger ssl; added proper validation of messenger credentials; fixed the java work queue and transport work queue implementation; added the missing Delivery.clear() method to proton-j
Modified:
qpid/proton/trunk/proton-c/bindings/python/proton.py
qpid/proton/trunk/proton-c/include/proton/messenger.h
qpid/proton/trunk/proton-c/src/messenger/messenger.c
qpid/proton/trunk/proton-c/src/posix/driver.c
qpid/proton/trunk/proton-c/src/ssl/openssl.c
qpid/proton/trunk/proton-j/proton-api/src/main/java/org/apache/qpid/proton/engine/Delivery.java
qpid/proton/trunk/proton-j/proton-api/src/main/resources/proton.py
qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/engine/impl/ConnectionImpl.java
qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/engine/impl/DeliveryImpl.java
qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/engine/impl/LinkImpl.java
qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/engine/impl/ReceiverImpl.java
qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/engine/impl/SenderImpl.java
qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/engine/impl/TransportDelivery.java
qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/engine/impl/TransportImpl.java
qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/engine/impl/TransportSession.java
qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/messenger/impl/MessengerImpl.java
qpid/proton/trunk/tests/python/proton_tests/common.py
qpid/proton/trunk/tests/python/proton_tests/ssl.py
qpid/proton/trunk/tests/tools/apps/c/msgr-recv.c
Modified: qpid/proton/trunk/proton-c/bindings/python/proton.py
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/bindings/python/proton.py?rev=1539013&r1=1539012&r2=1539013&view=diff
==============================================================================
--- qpid/proton/trunk/proton-c/bindings/python/proton.py (original)
+++ qpid/proton/trunk/proton-c/bindings/python/proton.py Tue Nov 5 14:47:07 2013
@@ -142,8 +142,10 @@ EXCEPTIONS = {
PENDING = Constant("PENDING")
ACCEPTED = Constant("ACCEPTED")
REJECTED = Constant("REJECTED")
+ABORTED = Constant("ABORTED")
STATUSES = {
+ PN_STATUS_ABORTED: ABORTED,
PN_STATUS_ACCEPTED: ACCEPTED,
PN_STATUS_REJECTED: REJECTED,
PN_STATUS_PENDING: PENDING,
@@ -3095,6 +3097,7 @@ class Driver(object):
__all__ = [
"API_LANGUAGE",
"IMPLEMENTATION_LANGUAGE",
+ "ABORTED",
"ACCEPTED",
"AUTOMATIC",
"PENDING",
Modified: qpid/proton/trunk/proton-c/include/proton/messenger.h
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/include/proton/messenger.h?rev=1539013&r1=1539012&r2=1539013&view=diff
==============================================================================
--- qpid/proton/trunk/proton-c/include/proton/messenger.h (original)
+++ qpid/proton/trunk/proton-c/include/proton/messenger.h Tue Nov 5 14:47:07 2013
@@ -43,7 +43,8 @@ typedef enum {
PN_STATUS_PENDING = 1,
PN_STATUS_ACCEPTED = 2,
PN_STATUS_REJECTED = 3,
- PN_STATUS_MODIFIED = 4
+ PN_STATUS_MODIFIED = 4,
+ PN_STATUS_ABORTED = 5
} pn_status_t;
/** Construct a new Messenger with the given name. The name is global.
Modified: qpid/proton/trunk/proton-c/src/messenger/messenger.c
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/src/messenger/messenger.c?rev=1539013&r1=1539012&r2=1539013&view=diff
==============================================================================
--- qpid/proton/trunk/proton-c/src/messenger/messenger.c (original)
+++ qpid/proton/trunk/proton-c/src/messenger/messenger.c Tue Nov 5 14:47:07 2013
@@ -89,12 +89,14 @@ struct pn_messenger_t {
pn_string_t *original;
pn_string_t *rewritten;
bool worked;
+ int connection_error;
};
typedef struct {
char *host;
char *port;
pn_subscription_t *subscription;
+ pn_ssl_domain_t *domain;
} pn_listener_ctx_t;
static pn_listener_ctx_t *pn_listener_ctx(pn_listener_t *lnr,
@@ -106,6 +108,24 @@ static pn_listener_ctx_t *pn_listener_ct
pn_listener_ctx_t *ctx = (pn_listener_ctx_t *) pn_listener_context(lnr);
assert(!ctx);
ctx = (pn_listener_ctx_t *) malloc(sizeof(pn_listener_ctx_t));
+ ctx->domain = pn_ssl_domain(PN_SSL_MODE_SERVER);
+ if (messenger->certificate) {
+ int err = pn_ssl_domain_set_credentials(ctx->domain, messenger->certificate,
+ messenger->private_key,
+ messenger->password);
+
+ if (err) {
+ pn_error_format(messenger->error, PN_ERR, "invalid credentials");
+ pn_ssl_domain_free(ctx->domain);
+ free(ctx);
+ return NULL;
+ }
+ }
+
+ if (!(scheme && !strcmp(scheme, "amqps"))) {
+ pn_ssl_domain_allow_unsecured_client(ctx->domain);
+ }
+
pn_subscription_t *sub = pn_subscription(messenger, scheme);
ctx->subscription = sub;
ctx->host = pn_strdup(host);
@@ -120,6 +140,7 @@ static void pn_listener_ctx_free(pn_list
// XXX: subscriptions are freed when the messenger is freed pn_subscription_free(ctx->subscription);
free(ctx->host);
free(ctx->port);
+ pn_ssl_domain_free(ctx->domain);
free(ctx);
pn_listener_set_context(lnr, NULL);
}
@@ -265,6 +286,7 @@ pn_messenger_t *pn_messenger(const char
m->address.text = pn_string(NULL);
m->original = pn_string(NULL);
m->rewritten = pn_string(NULL);
+ m->connection_error = 0;
}
return m;
@@ -506,24 +528,44 @@ bool pn_messenger_flow(pn_messenger_t *m
return updated;
}
-static void pn_transport_config(pn_messenger_t *messenger,
- pn_connector_t *connector,
- pn_connection_t *connection)
+static void pn_error_report(const char *pfx, const char *error)
+{
+ fprintf(stderr, "%s ERROR %s\n", pfx, error);
+}
+
+static int pn_transport_config(pn_messenger_t *messenger,
+ pn_connector_t *connector,
+ pn_connection_t *connection)
{
pn_connection_ctx_t *ctx = (pn_connection_ctx_t *) pn_connection_get_context(connection);
pn_transport_t *transport = pn_connector_transport(connector);
if (ctx->scheme && !strcmp(ctx->scheme, "amqps")) {
- pn_ssl_domain_t *d = pn_ssl_domain( PN_SSL_MODE_CLIENT );
+ pn_ssl_domain_t *d = pn_ssl_domain(PN_SSL_MODE_CLIENT);
if (messenger->certificate && messenger->private_key) {
- pn_ssl_domain_set_credentials( d, messenger->certificate,
- messenger->private_key,
- messenger->password);
+ int err = pn_ssl_domain_set_credentials( d, messenger->certificate,
+ messenger->private_key,
+ messenger->password);
+ if (err) {
+ pn_error_report("CONNECTION", "invalid credentials");
+ return err;
+ }
}
if (messenger->trusted_certificates) {
- pn_ssl_domain_set_trusted_ca_db(d, messenger->trusted_certificates);
- pn_ssl_domain_set_peer_authentication(d, PN_SSL_VERIFY_PEER, NULL);
+ int err = pn_ssl_domain_set_trusted_ca_db(d, messenger->trusted_certificates);
+ if (err) {
+ pn_error_report("CONNECTION", "invalid certificate db");
+ return err;
+ }
+ err = pn_ssl_domain_set_peer_authentication(d, PN_SSL_VERIFY_PEER_NAME, NULL);
+ if (err) {
+ pn_error_report("CONNECTION", "error configuring ssl to verify peer");
+ }
} else {
- pn_ssl_domain_set_peer_authentication(d, PN_SSL_ANONYMOUS_PEER, NULL);
+ int err = pn_ssl_domain_set_peer_authentication(d, PN_SSL_ANONYMOUS_PEER, NULL);
+ if (err) {
+ pn_error_report("CONNECTION", "error configuring ssl for anonymous peer");
+ return err;
+ }
}
pn_ssl_t *ssl = pn_ssl(transport);
pn_ssl_init(ssl, d, NULL);
@@ -538,11 +580,8 @@ static void pn_transport_config(pn_messe
pn_sasl_mechanisms(sasl, "ANONYMOUS");
pn_sasl_client(sasl);
}
-}
-static void pn_error_report(const char *pfx, const char *error)
-{
- fprintf(stderr, "%s ERROR %s\n", pfx, error);
+ return 0;
}
static void pn_condition_report(const char *pfx, pn_condition_t *condition)
@@ -746,6 +785,9 @@ void pni_messenger_reclaim(pn_messenger_
pni_entry_t *e = (pni_entry_t *) pn_delivery_get_context(d);
if (e) {
pni_entry_set_delivery(e, NULL);
+ if (pn_delivery_buffered(d)) {
+ pni_entry_set_status(e, PN_STATUS_ABORTED);
+ }
}
d = pn_unsettled_next(d);
}
@@ -819,18 +861,8 @@ int pn_messenger_tsync(pn_messenger_t *m
pn_connector_t *c = pn_listener_accept(l);
pn_transport_t *t = pn_connector_transport(c);
- pn_ssl_domain_t *d = pn_ssl_domain( PN_SSL_MODE_SERVER );
- if (messenger->certificate) {
- pn_ssl_domain_set_credentials(d, messenger->certificate,
- messenger->private_key,
- messenger->password);
- }
- if (!(scheme && !strcmp(scheme, "amqps"))) {
- pn_ssl_domain_allow_unsecured_client(d);
- }
pn_ssl_t *ssl = pn_ssl(t);
- pn_ssl_init(ssl, d, NULL);
- pn_ssl_domain_free( d );
+ pn_ssl_init(ssl, ctx->domain, NULL);
pn_sasl_t *sasl = pn_sasl(t);
pn_sasl_mechanisms(sasl, "ANONYMOUS");
@@ -917,6 +949,7 @@ int pn_messenger_stop(pn_messenger_t *me
pn_listener_t *prev = l;
l = pn_listener_next(l);
pn_listener_ctx_free(prev);
+ pn_listener_close(prev);
pn_listener_free(prev);
}
@@ -965,6 +998,8 @@ static int pni_route(pn_messenger_t *mes
pn_connection_t *pn_messenger_resolve(pn_messenger_t *messenger, const char *address, char **name)
{
+ assert(messenger);
+ messenger->connection_error = 0;
char domain[1024];
if (address && sizeof(domain) < strlen(address) + 1) {
pn_error_format(messenger->error, PN_ERR,
@@ -995,7 +1030,11 @@ pn_connection_t *pn_messenger_resolve(pn
lnr = pn_listener(messenger->driver, host, port ? port : default_port(scheme), NULL);
if (lnr) {
- pn_listener_ctx(lnr, messenger, scheme, host, port);
+ pn_listener_ctx_t *ctx = pn_listener_ctx(lnr, messenger, scheme, host, port);
+ if (!ctx) {
+ pn_listener_close(lnr);
+ pn_listener_free(lnr);
+ }
} else {
pn_error_format(messenger->error, PN_ERR,
"unable to bind to address %s: %s:%s", address, host, port,
@@ -1045,7 +1084,15 @@ pn_connection_t *pn_messenger_resolve(pn
pn_connection_t *connection =
pn_messenger_connection(messenger, connector, scheme, user, pass, host, port);
- pn_transport_config(messenger, connector, connection);
+ err = pn_transport_config(messenger, connector, connection);
+ if (err) {
+ pni_messenger_reclaim(messenger, connection);
+ pn_connector_close(connector);
+ pn_connector_free(connector);
+ messenger->connection_error = err;
+ return NULL;
+ }
+
pn_connection_open(connection);
pn_connector_set_connection(connector, connection);
@@ -1117,7 +1164,13 @@ pn_subscription_t *pn_messenger_subscrib
port ? port : default_port(scheme), NULL);
if (lnr) {
pn_listener_ctx_t *ctx = pn_listener_ctx(lnr, messenger, scheme, host, port);
- return ctx->subscription;
+ if (ctx) {
+ return ctx->subscription;
+ } else {
+ pn_listener_close(lnr);
+ pn_listener_free(lnr);
+ return NULL;
+ }
} else {
pn_error_format(messenger->error, PN_ERR,
"unable to subscribe to address %s: %s", source,
@@ -1181,6 +1234,16 @@ static void outward_munge(pn_messenger_t
if (heapbuf) free (heapbuf);
}
+int pni_bump_out(pn_messenger_t *messenger, const char *address)
+{
+ pni_entry_t *entry = pni_store_get(messenger->outgoing, address);
+ if (!entry) return 0;
+
+ pni_entry_set_status(entry, PN_STATUS_ABORTED);
+ pni_entry_free(entry);
+ return 0;
+}
+
int pni_pump_out(pn_messenger_t *messenger, const char *address, pn_link_t *sender)
{
pni_entry_t *entry = pni_store_get(messenger->outgoing, address);
@@ -1289,8 +1352,18 @@ int pn_messenger_put(pn_messenger_t *mes
pni_restore(messenger, msg);
pn_buffer_append(buf, encoded, size); // XXX
pn_link_t *sender = pn_messenger_target(messenger, address);
- if (!sender) return 0;
- return pni_pump_out(messenger, address, sender);
+ if (!sender) {
+ int err = pn_error_code(messenger->error);
+ if (err) {
+ return err;
+ } else if (messenger->connection_error) {
+ return pni_bump_out(messenger, address);
+ } else {
+ return 0;
+ }
+ } else {
+ return pni_pump_out(messenger, address, sender);
+ }
}
}
Modified: qpid/proton/trunk/proton-c/src/posix/driver.c
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/src/posix/driver.c?rev=1539013&r1=1539012&r2=1539013&view=diff
==============================================================================
--- qpid/proton/trunk/proton-c/src/posix/driver.c (original)
+++ qpid/proton/trunk/proton-c/src/posix/driver.c Tue Nov 5 14:47:07 2013
@@ -101,6 +101,7 @@ struct pn_listener_t {
int idx;
bool pending;
int fd;
+ bool closed;
void *context;
};
@@ -213,6 +214,7 @@ pn_listener_t *pn_listener_fd(pn_driver_
l->idx = 0;
l->pending = false;
l->fd = fd;
+ l->closed = false;
l->context = context;
pn_driver_add_listener(driver, l);
@@ -305,9 +307,11 @@ pn_connector_t *pn_listener_accept(pn_li
void pn_listener_close(pn_listener_t *l)
{
if (!l) return;
+ if (l->closed) return;
if (close(l->fd) == -1)
perror("close");
+ l->closed = true;
}
void pn_listener_free(pn_listener_t *l)
Modified: qpid/proton/trunk/proton-c/src/ssl/openssl.c
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/src/ssl/openssl.c?rev=1539013&r1=1539012&r2=1539013&view=diff
==============================================================================
--- qpid/proton/trunk/proton-c/src/ssl/openssl.c (original)
+++ qpid/proton/trunk/proton-c/src/ssl/openssl.c Tue Nov 5 14:47:07 2013
@@ -182,6 +182,7 @@ static void _log_clear_data(pn_ssl_t *ss
// unrecoverable SSL failure occured, notify transport and generate error code.
static int ssl_failed(pn_ssl_t *ssl)
{
+ SSL_set_shutdown(ssl->ssl, SSL_SENT_SHUTDOWN|SSL_RECEIVED_SHUTDOWN);
ssl->ssl_closed = true;
ssl->app_input_closed = ssl->app_output_closed = PN_ERR;
// fake a shutdown so the i/o processing code will close properly
Modified: qpid/proton/trunk/proton-j/proton-api/src/main/java/org/apache/qpid/proton/engine/Delivery.java
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-j/proton-api/src/main/java/org/apache/qpid/proton/engine/Delivery.java?rev=1539013&r1=1539012&r2=1539013&view=diff
==============================================================================
--- qpid/proton/trunk/proton-j/proton-api/src/main/java/org/apache/qpid/proton/engine/Delivery.java (original)
+++ qpid/proton/trunk/proton-j/proton-api/src/main/java/org/apache/qpid/proton/engine/Delivery.java Tue Nov 5 14:47:07 2013
@@ -98,6 +98,8 @@ public interface Delivery
*/
public boolean isUpdated();
+ public void clear();
+
public boolean isPartial();
public int pending();
Modified: qpid/proton/trunk/proton-j/proton-api/src/main/resources/proton.py
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-j/proton-api/src/main/resources/proton.py?rev=1539013&r1=1539012&r2=1539013&view=diff
==============================================================================
--- qpid/proton/trunk/proton-j/proton-api/src/main/resources/proton.py (original)
+++ qpid/proton/trunk/proton-j/proton-api/src/main/resources/proton.py Tue Nov 5 14:47:07 2013
@@ -1405,6 +1405,12 @@ class Messenger(object):
self.impl.setOutgoingWindow(window)
outgoing_window = property(_get_outgoing_window, _set_outgoing_window)
+ def _get_certificate(self):
+ raise Skipped()
+ def _set_certificate(self, xxx):
+ raise Skipped()
+ certificate = property(_get_certificate, _set_certificate)
+
class Message(object):
Modified: qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/engine/impl/ConnectionImpl.java
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/engine/impl/ConnectionImpl.java?rev=1539013&r1=1539012&r2=1539013&view=diff
==============================================================================
--- qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/engine/impl/ConnectionImpl.java (original)
+++ qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/engine/impl/ConnectionImpl.java Tue Nov 5 14:47:07 2013
@@ -384,35 +384,52 @@ public class ConnectionImpl extends Endp
void removeWork(DeliveryImpl delivery)
{
+ if (!delivery._work) return;
+
+ DeliveryImpl next = delivery.getWorkNext();
+ DeliveryImpl prev = delivery.getWorkPrev();
+
+ if (prev != null) {
+ prev.setWorkNext(next);
+ }
+
+ if (next != null) {
+ next.setWorkPrev(prev);
+ }
+
+
if(_workHead == delivery)
{
- _workHead = delivery.getWorkNext();
+ _workHead = next;
}
+
if(_workTail == delivery)
{
- _workTail = delivery.getWorkPrev();
+ _workTail = prev;
}
+
+ delivery._work = false;
}
void addWork(DeliveryImpl delivery)
{
- if(_workHead != delivery && delivery.getWorkNext() == null && delivery.getWorkPrev() == null)
- {
- if(_workTail == null)
- {
- delivery.setWorkNext(null);
- delivery.setWorkPrev(null);
- _workHead = _workTail = delivery;
- }
- else
- {
- _workTail.setWorkNext(delivery);
- delivery.setWorkPrev(_workTail);
- _workTail = delivery;
- delivery.setWorkNext(null);
- }
+ if (delivery._work) return;
+
+ delivery.setWorkNext(null);
+ delivery.setWorkPrev(_workTail);
+
+ if (_workTail != null) {
+ _workTail.setWorkNext(delivery);
+ }
+
+ _workTail = delivery;
+
+ if (_workHead == null) {
+ _workHead = delivery;
}
+
+ delivery._work = true;
}
public Iterator<DeliveryImpl> getWorkSequence()
@@ -469,49 +486,68 @@ public class ConnectionImpl extends Endp
public void removeTransportWork(DeliveryImpl delivery)
{
- DeliveryImpl oldHead = _transportWorkHead;
- DeliveryImpl oldTail = _transportWorkTail;
+ if (!delivery._transportWork) return;
+
+ DeliveryImpl next = delivery.getTransportWorkNext();
+ DeliveryImpl prev = delivery.getTransportWorkPrev();
+
+ if (prev != null) {
+ prev.setTransportWorkNext(next);
+ }
+
+ if (next != null) {
+ next.setTransportWorkPrev(prev);
+ }
+
+
if(_transportWorkHead == delivery)
{
- _transportWorkHead = delivery.getTransportWorkNext();
+ _transportWorkHead = next;
}
+
if(_transportWorkTail == delivery)
{
- _transportWorkTail = delivery.getTransportWorkPrev();
+ _transportWorkTail = prev;
}
- }
+ delivery._transportWork = false;
+ }
void addTransportWork(DeliveryImpl delivery)
{
- if(_transportWorkTail == null)
- {
- delivery.setTransportWorkNext(null);
- delivery.setTransportWorkPrev(null);
- _transportWorkHead = _transportWorkTail = delivery;
- }
- else
- {
+ if (delivery._transportWork) return;
+
+ delivery.setTransportWorkNext(null);
+ delivery.setTransportWorkPrev(_transportWorkTail);
+
+ if (_transportWorkTail != null) {
_transportWorkTail.setTransportWorkNext(delivery);
- delivery.setTransportWorkPrev(_transportWorkTail);
- _transportWorkTail = delivery;
- delivery.setTransportWorkNext(null);
}
+
+ _transportWorkTail = delivery;
+
+ if (_transportWorkHead == null) {
+ _transportWorkHead = delivery;
+ }
+
+ delivery._transportWork = true;
}
void workUpdate(DeliveryImpl delivery)
{
if(delivery != null)
{
- LinkImpl link = delivery.getLink();
- if(link.workUpdate(delivery))
+ if(!delivery.isSettled() &&
+ (delivery.isReadable() ||
+ delivery.isWritable() ||
+ delivery.isUpdated()))
{
addWork(delivery);
}
else
{
- delivery.clearWork();
+ removeWork(delivery);
}
}
}
Modified: qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/engine/impl/DeliveryImpl.java
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/engine/impl/DeliveryImpl.java?rev=1539013&r1=1539012&r2=1539013&view=diff
==============================================================================
--- qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/engine/impl/DeliveryImpl.java (original)
+++ qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/engine/impl/DeliveryImpl.java Tue Nov 5 14:47:07 2013
@@ -33,9 +33,11 @@ public class DeliveryImpl implements Del
private DeliveryImpl _workNext;
private DeliveryImpl _workPrev;
+ boolean _work;
private DeliveryImpl _transportWorkNext;
private DeliveryImpl _transportWorkPrev;
+ boolean _transportWork;
private Object _context;
@@ -119,6 +121,10 @@ public class DeliveryImpl implements Del
public void settle()
{
+ if (_settled) {
+ return;
+ }
+
_settled = true;
_link.decrementUnsettled();
if(!_remoteSettled)
@@ -143,7 +149,7 @@ public class DeliveryImpl implements Del
{
_linkNext._linkPrevious = _linkPrevious;
}
- clearWork();
+ updateWork();
}
DeliveryImpl getLinkNext()
@@ -199,102 +205,24 @@ public class DeliveryImpl implements Del
{
_dataSize = consumed = 0;
}
- if(_dataSize == 0)
- {
- clearFlag(IO_WORK);
- }
return (_complete && consumed == 0) ? Transport.END_OF_STREAM : consumed; //TODO - Implement
}
- private void clearFlag(int ioWork)
- {
- _flags = _flags & (~IO_WORK);
- if(_flags == 0)
- {
- clearWork();
- }
- }
-
- void clearWork()
- {
- getLink().getConnectionImpl().removeWork(this);
- if(_workPrev != null)
- {
- _workPrev.setWorkNext(_workNext);
- }
- if(_workNext != null)
- {
- _workNext.setWorkPrev(_workPrev);
-
- }
- _workPrev = null;
- }
-
- void addToWorkList()
- {
- getLink().getConnectionImpl().addWork(this);
- }
-
- void addIOWork()
+ void updateWork()
{
- setFlag(IO_WORK);
- }
-
- private void setFlag(int flag)
- {
- boolean addWork;
- if(flag == IO_WORK && (_flags & flag) == 0)
- {
- clearWork();
- addWork = true;
- }
- else
- {
- addWork = (_flags == 0);
- }
- _flags = _flags | flag;
- if(addWork)
- {
- addToWorkList();
- }
- }
-
-
- private void clearTransportFlag(int ioWork)
- {
- _flags = _flags & (~IO_WORK);
- if(_flags == 0)
- {
- clearTransportWork();
- }
+ getLink().getConnectionImpl().workUpdate(this);
}
DeliveryImpl clearTransportWork()
{
DeliveryImpl next = _transportWorkNext;
getLink().getConnectionImpl().removeTransportWork(this);
- if(_transportWorkPrev != null)
- {
- _transportWorkPrev.setTransportWorkNext(_transportWorkNext);
- }
- if(_transportWorkNext != null)
- {
- _transportWorkNext.setTransportWorkPrev(_transportWorkPrev);
-
- }
- _transportWorkNext = null;
- _transportWorkPrev = null;
return next;
}
void addToTransportWorkList()
{
- if(_transportWorkNext == null
- && _transportWorkPrev == null
- && getLink().getConnectionImpl().getTransportWorkHead() != this)
- {
- getLink().getConnectionImpl().addTransportWork(this);
- }
+ getLink().getConnectionImpl().addTransportWork(this);
}
@@ -393,8 +321,7 @@ public class DeliveryImpl implements Del
public boolean isReadable()
{
return getLink() instanceof ReceiverImpl
- && getLink().current() == this
- && _dataSize > 0;
+ && getLink().current() == this;
}
void setComplete()
@@ -418,6 +345,12 @@ public class DeliveryImpl implements Del
return _updated;
}
+ public void clear()
+ {
+ _updated = false;
+ getLink().getConnectionImpl().workUpdate(this);
+ }
+
void setDone()
{
@@ -435,6 +368,20 @@ public class DeliveryImpl implements Del
_updated = true;
}
+ boolean isBuffered()
+ {
+ if (_remoteSettled) return false;
+ if (getLink() instanceof SenderImpl) {
+ if (isDone()) {
+ return false;
+ } else {
+ return _complete || _dataSize > 0;
+ }
+ } else {
+ return false;
+ }
+ }
+
public Object getContext()
{
return _context;
Modified: qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/engine/impl/LinkImpl.java
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/engine/impl/LinkImpl.java?rev=1539013&r1=1539012&r2=1539013&view=diff
==============================================================================
--- qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/engine/impl/LinkImpl.java (original)
+++ qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/engine/impl/LinkImpl.java Tue Nov 5 14:47:07 2013
@@ -231,12 +231,6 @@ public abstract class LinkImpl extends E
abstract TransportLink getTransportLink();
- /**
- * TODO: Confirm What does this method does. It seems to
- * merely make an observation rather than mutate state. Rename???
- */
- abstract boolean workUpdate(DeliveryImpl delivery);
-
public int getCredit()
{
return _credit;
Modified: qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/engine/impl/ReceiverImpl.java
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/engine/impl/ReceiverImpl.java?rev=1539013&r1=1539012&r2=1539013&view=diff
==============================================================================
--- qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/engine/impl/ReceiverImpl.java (original)
+++ qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/engine/impl/ReceiverImpl.java Tue Nov 5 14:47:07 2013
@@ -120,12 +120,6 @@ public class ReceiverImpl extends LinkIm
return _transportReceiver;
}
- @Override
- boolean workUpdate(DeliveryImpl delivery)
- {
- return (delivery == current());
- }
-
public void drain(int credit)
{
flow(credit);
Modified: qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/engine/impl/SenderImpl.java
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/engine/impl/SenderImpl.java?rev=1539013&r1=1539012&r2=1539013&view=diff
==============================================================================
--- qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/engine/impl/SenderImpl.java (original)
+++ qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/engine/impl/SenderImpl.java Tue Nov 5 14:47:07 2013
@@ -80,6 +80,9 @@ public class SenderImpl extends LinkImp
public boolean advance()
{
DeliveryImpl delivery = current();
+ if (delivery != null) {
+ delivery.setComplete();
+ }
boolean advance = super.advance();
if(advance && _offered > 0)
@@ -114,12 +117,6 @@ public class SenderImpl extends LinkImp
@Override
- boolean workUpdate(DeliveryImpl delivery)
- {
- return (delivery == current()) && hasCredit();
- }
-
- @Override
public void setCredit(int credit)
{
super.setCredit(credit);
Modified: qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/engine/impl/TransportDelivery.java
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/engine/impl/TransportDelivery.java?rev=1539013&r1=1539012&r2=1539013&view=diff
==============================================================================
--- qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/engine/impl/TransportDelivery.java (original)
+++ qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/engine/impl/TransportDelivery.java Tue Nov 5 14:47:07 2013
@@ -60,6 +60,6 @@ public class TransportDelivery
void settled()
{
_transportLink.settled(this);
- _delivery.clearWork();
+ _delivery.updateWork();
}
}
Modified: qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/engine/impl/TransportImpl.java
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/engine/impl/TransportImpl.java?rev=1539013&r1=1539012&r2=1539013&view=diff
==============================================================================
--- qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/engine/impl/TransportImpl.java (original)
+++ qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/engine/impl/TransportImpl.java Tue Nov 5 14:47:07 2013
@@ -420,7 +420,6 @@ public class TransportImpl extends Endpo
if(_connectionEndpoint != null)
{
DeliveryImpl delivery = _connectionEndpoint.getTransportWorkHead();
-
while(delivery != null)
{
LinkImpl link = delivery.getLink();
@@ -513,7 +512,7 @@ public class TransportImpl extends Endpo
}
}
- if(wasDone)
+ if(wasDone && delivery.getLocalState() != null)
{
TransportDelivery tpDelivery = delivery.getTransportDelivery();
Disposition disposition = new Disposition();
@@ -531,7 +530,7 @@ public class TransportImpl extends Endpo
null);
}
- return delivery.isDone();
+ return !delivery.isBuffered();
}
private boolean processTransportWorkReceiver(DeliveryImpl delivery,
Modified: qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/engine/impl/TransportSession.java
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/engine/impl/TransportSession.java?rev=1539013&r1=1539012&r2=1539013&view=diff
==============================================================================
--- qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/engine/impl/TransportSession.java (original)
+++ qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/engine/impl/TransportSession.java Tue Nov 5 14:47:07 2013
@@ -286,7 +286,7 @@ class TransportSession
}
getSession().incrementIncomingBytes(payload.getLength());
}
- delivery.addIOWork();
+ delivery.updateWork();
if(!(transfer.getMore() || transfer.getAborted()))
@@ -386,7 +386,7 @@ class TransportSession
delivery.setRemoteSettled(true);
unsettledDeliveries.remove(id);
}
- delivery.addToWorkList();
+ delivery.updateWork();
}
id = id.add(UnsignedInteger.ONE);
}
Modified: qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/messenger/impl/MessengerImpl.java
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/messenger/impl/MessengerImpl.java?rev=1539013&r1=1539012&r2=1539013&view=diff
==============================================================================
--- qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/messenger/impl/MessengerImpl.java (original)
+++ qpid/proton/trunk/proton-j/proton/src/main/java/org/apache/qpid/proton/messenger/impl/MessengerImpl.java Tue Nov 5 14:47:07 2013
@@ -352,6 +352,7 @@ public class MessengerImpl implements Me
}
}
}
+
return null;
}
@@ -609,8 +610,9 @@ public class MessengerImpl implements Me
{
delivery.disposition(delivery.getRemoteState());
}
- //TODO: delivery.clear(); What's the equivalent in java?
- delivery = delivery.getWorkNext();
+ Delivery next = delivery.getWorkNext();
+ delivery.clear();
+ delivery = next;
}
_outgoing.slide();
Modified: qpid/proton/trunk/tests/python/proton_tests/common.py
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/tests/python/proton_tests/common.py?rev=1539013&r1=1539012&r2=1539013&view=diff
==============================================================================
--- qpid/proton/trunk/tests/python/proton_tests/common.py (original)
+++ qpid/proton/trunk/tests/python/proton_tests/common.py Tue Nov 5 14:47:07 2013
@@ -20,7 +20,7 @@
from random import randint
from threading import Thread
from socket import socket, AF_INET, SOCK_STREAM
-from subprocess import Popen,PIPE
+from subprocess import Popen,PIPE,STDOUT
import sys, os
from proton import Driver, Connection, Transport, SASL, Endpoint, Delivery
@@ -333,7 +333,7 @@ class MessengerApp(object):
print("COMMAND='%s'" % str(cmd))
#print("ENV='%s'" % str(os.environ.copy()))
try:
- self._process = Popen(cmd, stdout=PIPE, bufsize=4096)
+ self._process = Popen(cmd, stdout=PIPE, stderr=STDOUT, bufsize=4096)
except OSError, e:
assert False, "Unable to execute command '%s', is it in your PATH?" % cmd[0]
self._ready() # wait for it to initialize
Modified: qpid/proton/trunk/tests/python/proton_tests/ssl.py
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/tests/python/proton_tests/ssl.py?rev=1539013&r1=1539012&r2=1539013&view=diff
==============================================================================
--- qpid/proton/trunk/tests/python/proton_tests/ssl.py (original)
+++ qpid/proton/trunk/tests/python/proton_tests/ssl.py Tue Nov 5 14:47:07 2013
@@ -23,12 +23,17 @@ from proton import *
from common import Skipped, pump
-class SslTest(common.Test):
+def _testpath(file):
+ """ Set the full path to the certificate,keyfile, etc. for the test.
+ """
+ return os.path.join(os.path.dirname(__file__),
+ "ssl_db/%s" % file)
- _timeout = 60
+class SslTest(common.Test):
def __init__(self, *args):
common.Test.__init__(self, *args)
+ self._testpath = _testpath
def setup(self):
try:
@@ -59,12 +64,6 @@ class SslTest(common.Test):
def _pump(self, ssl_client, ssl_server, buffer_size=1024):
pump(ssl_client.transport, ssl_server.transport, buffer_size)
- def _testpath(self, file):
- """ Set the full path to the certificate,keyfile, etc. for the test.
- """
- return os.path.join(os.path.dirname(__file__),
- "ssl_db/%s" % file)
-
def _do_handshake(self, client, server):
""" Attempt to connect client to server. Will throw a TransportException if the SSL
handshake fails.
@@ -695,13 +694,13 @@ class SslTest(common.Test):
receiver = common.MessengerReceiverC()
receiver.subscriptions = ["amqps://~0.0.0.0:%s" % port]
receiver.receive_count = 1
- receiver.timeout = SslTest._timeout
+ receiver.timeout = self.timeout
receiver.start()
sender = common.MessengerSenderC()
sender.targets = ["amqps://0.0.0.0:%s/X" % port]
sender.send_count = 1
- sender.timeout = SslTest._timeout
+ sender.timeout = self.timeout
sender.start()
sender.wait()
assert sender.status() == 0, "Command '%s' failed" % str(sender.cmdline())
@@ -717,10 +716,11 @@ class SslTest(common.Test):
receiver = common.MessengerReceiverC()
receiver.subscriptions = ["amqps://~0.0.0.0:%s" % port]
receiver.receive_count = 1
- receiver.timeout = SslTest._timeout
- # Note hack - we use the client-certificate for the _server_ because
- # the client-certificate's common name field is "127.0.0.1", which will
- # match the target address used by the sender.
+ receiver.timeout = self.timeout
+ # Note hack - by default we use the client-certificate for the
+ # _server_ because the client-certificate's common name field
+ # is "127.0.0.1", which will match the target address used by
+ # the sender.
receiver.certificate = self._testpath("client-certificate.pem")
receiver.privatekey = self._testpath("client-private-key.pem")
receiver.password = "client-password"
@@ -729,7 +729,7 @@ class SslTest(common.Test):
sender = common.MessengerSenderC()
sender.targets = ["amqps://127.0.0.1:%s/X" % port]
sender.send_count = 1
- sender.timeout = SslTest._timeout
+ sender.timeout = self.timeout
sender.ca_db = self._testpath("ca-certificate.pem")
sender.start()
sender.wait()
@@ -738,7 +738,6 @@ class SslTest(common.Test):
receiver.wait()
assert receiver.status() == 0, "Command '%s' failed" % str(receiver.cmdline())
-
def DISABLED_test_defaults_valgrind(self):
""" Run valgrind over a simple SSL connection (no certificates)
"""
@@ -751,13 +750,13 @@ class SslTest(common.Test):
receiver = common.MessengerReceiverValgrind()
receiver.subscriptions = ["amqps://~127.0.0.1:%s" % port]
receiver.receive_count = 1
- receiver.timeout = SslTest._timeout
+ receiver.timeout = self.timeout
receiver.start()
sender = common.MessengerSenderValgrind()
sender.targets = ["amqps://127.0.0.1:%s/X" % port]
sender.send_count = 1
- sender.timeout = SslTest._timeout
+ sender.timeout = self.timeout
sender.start()
sender.wait()
assert sender.status() == 0, "Command '%s' failed" % str(sender.cmdline())
@@ -773,3 +772,116 @@ class SslTest(common.Test):
# self.client_domain.set_peer_authentication( SSLDomain.VERIFY_PEER )
+class MessengerSSLTests(common.Test):
+
+ def setup(self):
+ self.server = Messenger()
+ self.client = Messenger()
+ self.server.blocking = False
+ self.client.blocking = False
+
+ def teardown(self):
+ self.server.stop()
+ self.client.stop()
+ self.pump()
+ assert self.server.stopped
+ assert self.client.stopped
+
+ def pump(self, timeout=0):
+ while self.client.work(0) or self.server.work(0): pass
+ self.client.work(timeout)
+ self.server.work(timeout)
+ while self.client.work(0) or self.server.work(0): pass
+
+ def test_server_credentials(self,
+ cert="server-certificate.pem",
+ key="server-private-key.pem",
+ password="server-password",
+ exception=None):
+ self.server.certificate = _testpath(cert)
+ self.server.private_key = _testpath(key)
+ self.server.password = password
+ try:
+ self.server.start()
+ self.server.subscribe("amqps://~0.0.0.0:12345")
+ if exception is not None:
+ assert False, "expected failure did not occur"
+ except MessengerException, e:
+ if exception:
+ assert exception in str(e), str(e)
+ else:
+ raise e
+
+ def test_server_credentials_bad_cert(self):
+ self.test_server_credentials(cert="bad",
+ exception="invalid credentials")
+
+ def test_server_credentials_bad_key(self):
+ self.test_server_credentials(key="bad",
+ exception="invalid credentials")
+
+ def test_server_credentials_bad_password(self):
+ self.test_server_credentials(password="bad",
+ exception="invalid credentials")
+
+ def test_client_credentials(self,
+ trusted="ca-certificate.pem",
+ cert="client-certificate.pem",
+ key="client-private-key.pem",
+ password="client-password",
+ altserv=False,
+ fail=False):
+ if altserv:
+ self.server.certificate = _testpath("bad-server-certificate.pem")
+ self.server.private_key = _testpath("bad-server-private-key.pem")
+ self.server.password = "server-password"
+ else:
+ self.server.certificate = _testpath("client-certificate.pem")
+ self.server.private_key = _testpath("client-private-key.pem")
+ self.server.password = "client-password"
+ self.server.start()
+ self.server.subscribe("amqps://~0.0.0.0:12345")
+ self.server.incoming_window = 10
+
+ self.client.trusted_certificates = _testpath(trusted)
+ self.client.certificate = _testpath(cert)
+ self.client.private_key = _testpath(key)
+ self.client.password = password
+ self.client.outgoing_window = 10
+ self.client.start()
+
+ self.server.recv()
+
+ msg = Message()
+ msg.address = "amqps://127.0.0.1:12345"
+ msg.body = "Hello World!"
+ trk = self.client.put(msg)
+ self.client.send()
+
+ self.pump()
+
+ if fail:
+ assert self.server.incoming == 0, self.server.incoming
+ assert self.client.status(trk) == ABORTED, self.client.status(trk)
+ else:
+ assert self.server.incoming == 1, self.server.incoming
+
+ rmsg = Message()
+ self.server.get(rmsg)
+ assert rmsg.body == msg.body
+ self.server.accept()
+ self.pump()
+
+ assert self.client.status(trk) == ACCEPTED, self.client.status(trk)
+
+ def test_client_credentials_bad_cert(self):
+ self.test_client_credentials(cert="bad", fail=True)
+
+ def test_client_credentials_bad_trusted(self):
+ self.test_client_credentials(trusted="bad", fail=True)
+
+ def test_client_credentials_bad_password(self):
+ self.test_client_credentials(password="bad", fail=True)
+
+ def test_client_credentials_untrusted(self):
+ self.test_client_credentials(altserv=True, fail=True)
Modified: qpid/proton/trunk/tests/tools/apps/c/msgr-recv.c
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/tests/tools/apps/c/msgr-recv.c?rev=1539013&r1=1539012&r2=1539013&view=diff
==============================================================================
--- qpid/proton/trunk/tests/tools/apps/c/msgr-recv.c (original)
+++ qpid/proton/trunk/tests/tools/apps/c/msgr-recv.c Tue Nov 5 14:47:07 2013
@@ -167,21 +167,25 @@ int main(int argc, char** argv)
/* load the various command line options if they're set */
if (opts.certificate) {
rc = pn_messenger_set_certificate(messenger, opts.certificate);
+ check_messenger(messenger);
check( rc == 0, "Failed to set certificate" );
}
if (opts.privatekey) {
rc = pn_messenger_set_private_key(messenger, opts.privatekey);
+ check_messenger(messenger);
check( rc == 0, "Failed to set private key" );
}
if (opts.password) {
rc = pn_messenger_set_password(messenger, opts.password);
+ check_messenger(messenger);
check( rc == 0, "Failed to set password" );
}
if (opts.ca_db) {
rc = pn_messenger_set_trusted_certificates(messenger, opts.ca_db);
+ check_messenger(messenger);
check( rc == 0, "Failed to set trusted CA database" );
}
@@ -213,8 +217,8 @@ int main(int argc, char** argv)
LOG("Calling pn_messenger_recv(%d)\n", opts.recv_count);
rc = pn_messenger_recv(messenger, opts.recv_count);
- check(rc == 0 || (opts.timeout == 0 && rc == PN_TIMEOUT), "pn_messenger_recv() failed");
check_messenger(messenger);
+ check(rc == 0 || (opts.timeout == 0 && rc == PN_TIMEOUT), "pn_messenger_recv() failed");
// start the timer only after receiving the first msg
if (received == 0) statistics_start( &stats );
@@ -258,6 +262,7 @@ int main(int argc, char** argv)
if (pn_messenger_outgoing(messenger) > 0) {
LOG("Calling pn_messenger_send()\n");
rc = pn_messenger_send(messenger, -1);
+ check_messenger(messenger);
check(rc == 0, "pn_messenger_send() failed");
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org