You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rh...@apache.org on 2014/07/17 21:53:15 UTC
svn commit: r1611461 - in /qpid/proton/trunk: proton-c/bindings/python/
proton-c/src/engine/ proton-c/src/sasl/ proton-c/src/ssl/
proton-c/src/transport/ proton-j/src/main/java/org/apache/qpid/proton/engine/
proton-j/src/main/java/org/apache/qpid/proto...
Author: rhs
Date: Thu Jul 17 19:53:14 2014
New Revision: 1611461
URL: http://svn.apache.org/r1611461
Log:
streamlined error semantics of transport interface
Modified:
qpid/proton/trunk/proton-c/bindings/python/proton.py
qpid/proton/trunk/proton-c/src/engine/engine-internal.h
qpid/proton/trunk/proton-c/src/sasl/sasl.c
qpid/proton/trunk/proton-c/src/ssl/openssl.c
qpid/proton/trunk/proton-c/src/transport/transport.c
qpid/proton/trunk/proton-j/src/main/java/org/apache/qpid/proton/engine/Transport.java
qpid/proton/trunk/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/FrameParser.java
qpid/proton/trunk/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportImpl.java
qpid/proton/trunk/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/ssl/SimpleSslTransportWrapper.java
qpid/proton/trunk/proton-j/src/main/resources/cengine.py
qpid/proton/trunk/proton-j/src/test/java/org/apache/qpid/proton/engine/impl/FrameParserTest.java
qpid/proton/trunk/proton-j/src/test/java/org/apache/qpid/proton/engine/impl/ssl/SimpleSslTransportWrapperTest.java
qpid/proton/trunk/proton-j/src/test/java/org/apache/qpid/proton/systemtests/engine/ConnectionTest.java
qpid/proton/trunk/tests/python/proton_tests/common.py
qpid/proton/trunk/tests/python/proton_tests/engine.py
qpid/proton/trunk/tests/python/proton_tests/ssl.py
Modified: qpid/proton/trunk/proton-c/bindings/python/proton.py
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/bindings/python/proton.py?rev=1611461&r1=1611460&r2=1611461&view=diff
==============================================================================
--- qpid/proton/trunk/proton-c/bindings/python/proton.py (original)
+++ qpid/proton/trunk/proton-c/bindings/python/proton.py Thu Jul 17 19:53:14 2014
@@ -3051,6 +3051,10 @@ class Transport(object):
def close_head(self):
self._check(pn_transport_close_head(self._trans))
+ @property
+ def closed(self):
+ return pn_transport_closed(self._trans)
+
# AMQP 1.0 max-frame-size
def _get_max_frame_size(self):
return pn_transport_get_max_frame(self._trans)
Modified: qpid/proton/trunk/proton-c/src/engine/engine-internal.h
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/src/engine/engine-internal.h?rev=1611461&r1=1611460&r2=1611461&view=diff
==============================================================================
--- qpid/proton/trunk/proton-c/src/engine/engine-internal.h (original)
+++ qpid/proton/trunk/proton-c/src/engine/engine-internal.h Thu Jul 17 19:53:14 2014
@@ -144,7 +144,6 @@ struct pn_transport_t {
pn_timestamp_t keepalive_deadline;
uint64_t last_bytes_output;
- pn_error_t *error;
pn_hash_t *local_channels;
pn_hash_t *remote_channels;
pn_string_t *scratch;
@@ -174,6 +173,7 @@ struct pn_transport_t {
bool close_rcvd;
bool tail_closed; // input stream closed by driver
bool head_closed;
+ bool done_processing; // if true, don't call pn_process again
};
struct pn_connection_t {
@@ -310,5 +310,6 @@ void pn_clear_tpwork(pn_delivery_t *deli
void pn_work_update(pn_connection_t *connection, pn_delivery_t *delivery);
void pn_clear_modified(pn_connection_t *connection, pn_endpoint_t *endpoint);
void pn_connection_unbound(pn_connection_t *conn);
+int pn_do_error(pn_transport_t *transport, const char *condition, const char *fmt, ...);
#endif /* engine-internal.h */
Modified: qpid/proton/trunk/proton-c/src/sasl/sasl.c
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/src/sasl/sasl.c?rev=1611461&r1=1611460&r2=1611461&view=diff
==============================================================================
--- qpid/proton/trunk/proton-c/src/sasl/sasl.c (original)
+++ qpid/proton/trunk/proton-c/src/sasl/sasl.c Thu Jul 17 19:53:14 2014
@@ -19,6 +19,7 @@
*
*/
+#include <assert.h>
#include <stdlib.h>
#include <stdio.h>
#include <string.h>
@@ -405,8 +406,9 @@ static ssize_t pn_input_read_sasl_header
if (!available || memcmp(bytes, point, delta)) {
char quoted[1024];
pn_quote_data(quoted, 1024, bytes, available);
- return pn_error_format(sasl->transport->error, PN_ERR,
- "%s header mismatch: '%s'", "SASL", quoted);
+ pn_do_error(sasl->transport, "amqp:connection:framing-error",
+ "%s header mismatch: '%s'", "SASL", quoted);
+ return PN_EOS;
} else {
sasl->header_count += delta;
if (sasl->header_count == SASL_HEADER_LEN) {
@@ -435,13 +437,10 @@ static ssize_t pn_output_write_sasl_head
pn_sasl_t *sasl = (pn_sasl_t *)io_layer->context;
if (sasl->disp->trace & PN_TRACE_FRM)
pn_transport_logf(sasl->transport, " -> %s", "SASL");
- if (size >= SASL_HEADER_LEN) {
- memmove(bytes, SASL_HEADER, SASL_HEADER_LEN);
- sasl->io_layer->process_output = pn_output_write_sasl;
- return SASL_HEADER_LEN;
- } else {
- return pn_error_format(sasl->transport->error, PN_UNDERFLOW, "underflow writing %s header", "SASL");
- }
+ assert(size >= SASL_HEADER_LEN);
+ memmove(bytes, SASL_HEADER, SASL_HEADER_LEN);
+ sasl->io_layer->process_output = pn_output_write_sasl;
+ return SASL_HEADER_LEN;
}
static ssize_t pn_output_write_sasl(pn_io_layer_t *io_layer, char *bytes, size_t size)
Modified: qpid/proton/trunk/proton-c/src/ssl/openssl.c
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/src/ssl/openssl.c?rev=1611461&r1=1611460&r2=1611461&view=diff
==============================================================================
--- qpid/proton/trunk/proton-c/src/ssl/openssl.c (original)
+++ qpid/proton/trunk/proton-c/src/ssl/openssl.c Thu Jul 17 19:53:14 2014
@@ -187,7 +187,7 @@ static int ssl_failed(pn_ssl_t *ssl)
{
SSL_set_shutdown(ssl->ssl, SSL_SENT_SHUTDOWN|SSL_RECEIVED_SHUTDOWN);
ssl->ssl_closed = true;
- ssl->app_input_closed = ssl->app_output_closed = PN_ERR;
+ ssl->app_input_closed = ssl->app_output_closed = PN_EOS;
// fake a shutdown so the i/o processing code will close properly
SSL_set_shutdown(ssl->ssl, SSL_SENT_SHUTDOWN|SSL_RECEIVED_SHUTDOWN);
// try to grab the first SSL error to add to the failure log
@@ -198,7 +198,8 @@ static int ssl_failed(pn_ssl_t *ssl)
}
_log_ssl_error(NULL); // spit out any remaining errors to the log file
ssl->transport->tail_closed = true;
- return pn_error_format( ssl->transport->error, PN_ERR, "SSL Failure: %s", buf );
+ pn_do_error(ssl->transport, "amqp:connection:framing-error", "SSL Failure: %s", buf);
+ return PN_EOS;
}
/* match the DNS name pattern from the peer certificate against our configured peer
@@ -810,7 +811,7 @@ static int setup_ssl_connection( pn_ssl_
static ssize_t process_input_ssl( pn_io_layer_t *io_layer, const char *input_data, size_t available)
{
pn_ssl_t *ssl = (pn_ssl_t *)io_layer->context;
- if (ssl->ssl == NULL && init_ssl_socket(ssl)) return PN_ERR;
+ if (ssl->ssl == NULL && init_ssl_socket(ssl)) return PN_EOS;
_log( ssl, "process_input_ssl( data size=%d )\n",available );
@@ -954,8 +955,8 @@ static ssize_t process_input_ssl( pn_io_
static ssize_t process_output_ssl( pn_io_layer_t *io_layer, char *buffer, size_t max_len)
{
pn_ssl_t *ssl = (pn_ssl_t *)io_layer->context;
- if (!ssl) return PN_ERR;
- if (ssl->ssl == NULL && init_ssl_socket(ssl)) return PN_ERR;
+ if (!ssl) return PN_EOS;
+ if (ssl->ssl == NULL && init_ssl_socket(ssl)) return PN_EOS;
ssize_t written = 0;
bool work_pending;
Modified: qpid/proton/trunk/proton-c/src/transport/transport.c
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/src/transport/transport.c?rev=1611461&r1=1611460&r2=1611461&view=diff
==============================================================================
--- qpid/proton/trunk/proton-c/src/transport/transport.c (original)
+++ qpid/proton/trunk/proton-c/src/transport/transport.c Thu Jul 17 19:53:14 2014
@@ -163,7 +163,6 @@ static void pn_transport_initialize(void
transport->remote_desired_capabilities = pn_data(0);
transport->remote_properties = pn_data(0);
transport->disp_data = pn_data(0);
- transport->error = pn_error();
pn_condition_init(&transport->remote_condition);
transport->local_channels = pn_hash(0, 0.75, PN_REFCOUNT);
@@ -174,6 +173,8 @@ static void pn_transport_initialize(void
transport->input_pending = 0;
transport->output_pending = 0;
+
+ transport->done_processing = false;
}
pn_session_t *pn_channel_state(pn_transport_t *transport, uint16_t channel)
@@ -246,7 +247,6 @@ static void pn_transport_finalize(void *
pn_free(transport->remote_desired_capabilities);
pn_free(transport->remote_properties);
pn_free(transport->disp_data);
- pn_error_free(transport->error);
pn_condition_tini(&transport->remote_condition);
pn_free(transport->local_channels);
pn_free(transport->remote_channels);
@@ -266,10 +266,8 @@ int pn_transport_bind(pn_transport_t *tr
if (transport->open_rcvd) {
PN_SET_REMOTE(connection->endpoint.state, PN_REMOTE_ACTIVE);
pn_collector_put(connection->collector, PN_CONNECTION_REMOTE_OPEN, connection);
- if (!pn_error_code(transport->error)) {
- transport->disp->halt = false;
- transport_consume(transport); // blech - testBindAfterOpen
- }
+ transport->disp->halt = false;
+ transport_consume(transport); // blech - testBindAfterOpen
}
return 0;
}
@@ -326,7 +324,7 @@ int pn_transport_unbind(pn_transport_t *
pn_error_t *pn_transport_error(pn_transport_t *transport)
{
- return transport->error;
+ return NULL;
}
static void pn_map_handle(pn_session_t *ssn, uint32_t handle, pn_link_t *link)
@@ -392,13 +390,12 @@ void pni_disposition_encode(pn_dispositi
}
}
-int pn_post_close(pn_transport_t *transport, const char *condition)
+int pn_post_close(pn_transport_t *transport, const char *condition, const char *description)
{
pn_condition_t *cond = NULL;
if (transport->connection) {
cond = pn_connection_condition(transport->connection);
}
- const char *description = NULL;
pn_data_t *info = NULL;
if (!condition && pn_condition_is_set(cond)) {
condition = pn_condition_get_name(cond);
@@ -418,13 +415,16 @@ int pn_do_error(pn_transport_t *transpor
// XXX: result
vsnprintf(buf, 1024, fmt, ap);
va_end(ap);
- pn_error_set(transport->error, PN_ERR, buf);
if (!transport->close_sent) {
- pn_post_close(transport, condition);
+ if (!transport->open_sent) {
+ pn_post_frame(transport->disp, 0, "DL[S]", OPEN, "");
+ }
+
+ pn_post_close(transport, condition, buf);
transport->close_sent = true;
}
transport->disp->halt = true;
- pn_transport_logf(transport, "ERROR %s %s", condition, pn_error_text(transport->error));
+ pn_transport_logf(transport, "ERROR %s %s", condition, buf);
return PN_ERR;
}
@@ -1004,11 +1004,7 @@ static ssize_t transport_consume(pn_tran
} else if (n == 0) {
break;
} else {
- if (n != PN_EOS) {
- pn_transport_logf(transport, "ERROR[%i] %s\n",
- pn_error_code(transport->error),
- pn_error_text(transport->error));
- }
+ assert(n == PN_EOS);
if (transport->disp->trace & (PN_TRACE_RAW | PN_TRACE_FRM))
pn_transport_log(transport, " <- EOS");
transport->input_pending = 0; // XXX ???
@@ -1032,9 +1028,10 @@ static ssize_t pn_input_read_header(pn_t
if (!available || memcmp(bytes, point, delta)) {
char quoted[1024];
pn_quote_data(quoted, 1024, bytes, available);
- return pn_error_format(transport->error, PN_ERR,
- "%s header mismatch: '%s'%s", protocol, quoted,
- available ? "" : " (connection aborted)");
+ pn_do_error(transport, "amqp:connection:framing-error",
+ "%s header mismatch: '%s'%s", protocol, quoted,
+ available ? "" : " (connection aborted)");
+ return PN_EOS;
} else {
transport->header_count += delta;
if (transport->header_count == size) {
@@ -1063,21 +1060,20 @@ static ssize_t pn_input_read_amqp(pn_io_
if (transport->close_rcvd) {
if (available > 0) {
pn_do_error(transport, "amqp:connection:framing-error", "data after close");
- return PN_ERR;
- } else {
return PN_EOS;
}
}
if (!available) {
pn_do_error(transport, "amqp:connection:framing-error", "connection aborted");
- return PN_ERR;
+ return PN_EOS;
}
ssize_t n = pn_dispatcher_input(transport->disp, bytes, available);
if (n < 0) {
- return pn_error_set(transport->error, n, "dispatch error");
+ //return pn_error_set(transport->error, n, "dispatch error");
+ return PN_EOS;
} else if (transport->close_rcvd) {
return PN_EOS;
} else {
@@ -1636,7 +1632,7 @@ int pn_process_conn_teardown(pn_transpor
{
if (endpoint->state & PN_LOCAL_CLOSED && !transport->close_sent) {
if (pn_pointful_buffering(transport, NULL)) return 0;
- int err = pn_post_close(transport, NULL);
+ int err = pn_post_close(transport, NULL, NULL);
if (err) return err;
transport->close_sent = true;
}
@@ -1696,13 +1692,10 @@ static ssize_t pn_output_write_header(pn
{
if (transport->disp->trace & PN_TRACE_FRM)
pn_transport_logf(transport, " -> %s", protocol);
- if (size >= hdrsize) {
- memmove(bytes, header, hdrsize);
- transport->io_layers[PN_IO_AMQP].process_output = next;
- return hdrsize;
- } else {
- return pn_error_format(transport->error, PN_UNDERFLOW, "underflow writing %s header", protocol);
- }
+ assert(size >= hdrsize);
+ memmove(bytes, header, hdrsize);
+ transport->io_layers[PN_IO_AMQP].process_output = next;
+ return hdrsize;
}
static ssize_t pn_output_write_amqp_header(pn_io_layer_t *io_layer, char *bytes, size_t size)
@@ -1715,22 +1708,19 @@ static ssize_t pn_output_write_amqp_head
static ssize_t pn_output_write_amqp(pn_io_layer_t *io_layer, char *bytes, size_t size)
{
pn_transport_t *transport = (pn_transport_t *)io_layer->context;
- if (!transport->connection) {
- return 0;
- }
-
- if (!pn_error_code(transport->error)) {
- pn_error_set(transport->error, pn_process(transport), "process error");
+ if (transport->connection && !transport->done_processing) {
+ int err = pn_process(transport);
+ if (err) {
+ pn_transport_logf(transport, "process error %i", err);
+ transport->done_processing = true;
+ }
}
- // write out any buffered data _before_ returning an error code,
- // else we could truncate an outgoing Close frame containing a
- // useful error status
- if (!transport->disp->available && (transport->close_sent || pn_error_code(transport->error))) {
- if (pn_error_code(transport->error))
- return pn_error_code(transport->error);
- else
- return PN_EOS;
+ // write out any buffered data _before_ returning PN_EOS, else we
+ // could truncate an outgoing Close frame containing a useful error
+ // status
+ if (!transport->disp->available && transport->close_sent) {
+ return PN_EOS;
}
return pn_dispatcher_output(transport->disp, bytes, size);
@@ -1774,11 +1764,12 @@ static ssize_t transport_produce(pn_tran
if (transport->output_pending)
break; // return what is available
if (transport->disp->trace & (PN_TRACE_RAW | PN_TRACE_FRM)) {
- if (n == PN_EOS)
+ if (n < 0) {
pn_transport_log(transport, " -> EOS");
- else
+ }
+ /*else
pn_transport_logf(transport, " -> EOS (%" PN_ZI ") %s", n,
- pn_error_text(transport->error));
+ pn_error_text(transport->error));*/
}
return n;
}
@@ -2030,8 +2021,7 @@ int pn_transport_process(pn_transport_t
int pn_transport_close_tail(pn_transport_t *transport)
{
transport->tail_closed = true;
- ssize_t x = transport_consume( transport );
- if (x < 0) return (int) x;
+ transport_consume( transport );
return 0;
// XXX: what if not all input processed at this point? do we care???
}
@@ -2088,11 +2078,7 @@ void pn_transport_pop(pn_transport_t *tr
int pn_transport_close_head(pn_transport_t *transport)
{
transport->head_closed = true;
- if (transport->close_sent && transport->output_pending == 0) {
- return 0;
- } else {
- return pn_error_set(transport->error, PN_ERR, "connection aborted");
- }
+ return 0;
}
// true if the transport will not generate further output
Modified: qpid/proton/trunk/proton-j/src/main/java/org/apache/qpid/proton/engine/Transport.java
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-j/src/main/java/org/apache/qpid/proton/engine/Transport.java?rev=1611461&r1=1611460&r2=1611461&view=diff
==============================================================================
--- qpid/proton/trunk/proton-j/src/main/java/org/apache/qpid/proton/engine/Transport.java (original)
+++ qpid/proton/trunk/proton-j/src/main/java/org/apache/qpid/proton/engine/Transport.java Thu Jul 17 19:53:14 2014
@@ -94,6 +94,8 @@ public interface Transport extends Endpo
public void pop(int bytes);
public void close_head();
+ public boolean isClosed();
+
/**
* Processes the provided input.
*
Modified: qpid/proton/trunk/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/FrameParser.java
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/FrameParser.java?rev=1611461&r1=1611460&r2=1611461&view=diff
==============================================================================
--- qpid/proton/trunk/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/FrameParser.java (original)
+++ qpid/proton/trunk/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/FrameParser.java Thu Jul 17 19:53:14 2014
@@ -487,11 +487,7 @@ class FrameParser implements TransportIn
public ByteBuffer tail()
{
if (_tail_closed) {
- if (_parsingError != null) {
- throw new TransportException(_parsingError.getMessage());
- } else {
- throw new TransportException("tail closed");
- }
+ throw new TransportException("tail closed");
}
if (_inputBuffer == null) {
Modified: qpid/proton/trunk/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportImpl.java
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportImpl.java?rev=1611461&r1=1611460&r2=1611461&view=diff
==============================================================================
--- qpid/proton/trunk/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportImpl.java (original)
+++ qpid/proton/trunk/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportImpl.java Thu Jul 17 19:53:14 2014
@@ -1339,6 +1339,12 @@ public class TransportImpl extends Endpo
_outputProcessor.close_head();
}
+ public boolean isClosed() {
+ int p = pending();
+ int c = capacity();
+ return p == END_OF_STREAM && c == END_OF_STREAM;
+ }
+
@Override
public String toString()
{
Modified: qpid/proton/trunk/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/ssl/SimpleSslTransportWrapper.java
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/ssl/SimpleSslTransportWrapper.java?rev=1611461&r1=1611460&r2=1611461&view=diff
==============================================================================
--- qpid/proton/trunk/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/ssl/SimpleSslTransportWrapper.java (original)
+++ qpid/proton/trunk/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/ssl/SimpleSslTransportWrapper.java Thu Jul 17 19:53:14 2014
@@ -338,22 +338,13 @@ public class SimpleSslTransportWrapper i
_inputBuffer.flip();
- try
- {
- try {
- unwrapInput();
- } catch (SSLException e) {
- throw new TransportException(e);
- }
- }
- catch (TransportException e)
- {
+ try {
+ unwrapInput();
+ } catch (SSLException e) {
+ _logger.log(Level.WARNING, e.getMessage());
_inputBuffer.position(_inputBuffer.limit());
_tail_closed = true;
- throw e;
- }
- finally
- {
+ } finally {
_inputBuffer.compact();
}
}
@@ -374,17 +365,17 @@ public class SimpleSslTransportWrapper i
try {
wrapOutput();
} catch (SSLException e) {
- throw new TransportException(e);
+ _logger.log(Level.WARNING, e.getMessage());
+ _head_closed = true;
}
_head.limit(_outputBuffer.position());
- if (_head_closed && _outputBuffer.position() == 0)
- {
+ if (_head_closed && _outputBuffer.position() == 0) {
return Transport.END_OF_STREAM;
- } else {
- return _outputBuffer.position();
}
+
+ return _outputBuffer.position();
}
@Override
@@ -408,6 +399,10 @@ public class SimpleSslTransportWrapper i
public void close_head()
{
_underlyingOutput.close_head();
+ int p = pending();
+ if (p > 0) {
+ pop(p);
+ }
}
Modified: qpid/proton/trunk/proton-j/src/main/resources/cengine.py
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-j/src/main/resources/cengine.py?rev=1611461&r1=1611460&r2=1611461&view=diff
==============================================================================
--- qpid/proton/trunk/proton-j/src/main/resources/cengine.py (original)
+++ qpid/proton/trunk/proton-j/src/main/resources/cengine.py Thu Jul 17 19:53:14 2014
@@ -863,7 +863,6 @@ class pn_transport_wrapper:
def __init__(self, impl):
self.impl = impl
- self.error = pn_error(0, None)
def pn_transport():
return wrap(Proton.transport(), pn_transport_wrapper)
@@ -902,10 +901,7 @@ def pn_transport_trace(trans, n):
pass
def pn_transport_pending(trans):
- try:
- return trans.impl.pending()
- except TransportException, e:
- return trans.error.set(PN_ERR, str(e))
+ return trans.impl.pending()
def pn_transport_peek(trans, size):
size = min(trans.impl.pending(), size)
@@ -931,31 +927,19 @@ def pn_transport_push(trans, input):
bb = trans.impl.tail()
bb.put(array(input, 'b'))
- try:
- trans.impl.process()
- return len(input)
- except TransportException, e:
- trans.error = pn_error(PN_ERR, str(e))
- return PN_ERR
+ trans.impl.process()
+ return len(input)
def pn_transport_close_head(trans):
- try:
- trans.impl.close_head()
- return 0
- except TransportException, e:
- trans.error = pn_error(PN_ERR, str(e))
- return PN_ERR
+ trans.impl.close_head()
+ return 0
def pn_transport_close_tail(trans):
- try:
- trans.impl.close_tail()
- return 0
- except TransportException, e:
- trans.error = pn_error(PN_ERR, str(e))
- return PN_ERR
+ trans.impl.close_tail()
+ return 0
-def pn_transport_error(trans):
- return trans.error
+def pn_transport_closed(trans):
+ return trans.impl.isClosed()
from org.apache.qpid.proton.engine import Event
Modified: qpid/proton/trunk/proton-j/src/test/java/org/apache/qpid/proton/engine/impl/FrameParserTest.java
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-j/src/test/java/org/apache/qpid/proton/engine/impl/FrameParserTest.java?rev=1611461&r1=1611460&r2=1611461&view=diff
==============================================================================
--- qpid/proton/trunk/proton-j/src/test/java/org/apache/qpid/proton/engine/impl/FrameParserTest.java (original)
+++ qpid/proton/trunk/proton-j/src/test/java/org/apache/qpid/proton/engine/impl/FrameParserTest.java Thu Jul 17 19:53:14 2014
@@ -41,6 +41,7 @@ import org.apache.qpid.proton.amqp.trans
import org.apache.qpid.proton.codec.AMQPDefinedTypes;
import org.apache.qpid.proton.codec.DecoderImpl;
import org.apache.qpid.proton.codec.EncoderImpl;
+import org.apache.qpid.proton.engine.Transport;
import org.apache.qpid.proton.engine.TransportException;
import org.apache.qpid.proton.engine.TransportResult;
import org.apache.qpid.proton.engine.TransportResult.Status;
@@ -63,9 +64,6 @@ public class FrameParserTest
private final AmqpFramer _amqpFramer = new AmqpFramer();
- @Rule
- public ExpectedException _expectedException = ExpectedException.none();
-
@Before
public void setUp()
{
@@ -80,16 +78,8 @@ public class FrameParserTest
String headerMismatchMessage = "AMQP header mismatch";
ByteBuffer buffer = _frameParser.tail();
buffer.put("hello".getBytes());
- try {
- _frameParser.process();
- fail("expected exception");
- } catch (TransportException e) {
- assertThat(e.getMessage(), containsString(headerMismatchMessage));
- }
-
- _expectedException.expect(TransportException.class);
- _expectedException.expectMessage(headerMismatchMessage);
- _frameParser.tail();
+ _frameParser.process();
+ assertEquals(_frameParser.capacity(), Transport.END_OF_STREAM);
}
@Test
Modified: qpid/proton/trunk/proton-j/src/test/java/org/apache/qpid/proton/engine/impl/ssl/SimpleSslTransportWrapperTest.java
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-j/src/test/java/org/apache/qpid/proton/engine/impl/ssl/SimpleSslTransportWrapperTest.java?rev=1611461&r1=1611460&r2=1611461&view=diff
==============================================================================
--- qpid/proton/trunk/proton-j/src/test/java/org/apache/qpid/proton/engine/impl/ssl/SimpleSslTransportWrapperTest.java (original)
+++ qpid/proton/trunk/proton-j/src/test/java/org/apache/qpid/proton/engine/impl/ssl/SimpleSslTransportWrapperTest.java Thu Jul 17 19:53:14 2014
@@ -32,6 +32,7 @@ import java.nio.ByteBuffer;
import javax.net.ssl.SSLException;
+import org.apache.qpid.proton.engine.Transport;
import org.apache.qpid.proton.engine.TransportException;
import org.junit.Before;
import org.junit.Rule;
@@ -132,16 +133,8 @@ public class SimpleSslTransportWrapperTe
_dummySslEngine.rejectNextEncodedPacket(sslException);
_sslWrapper.tail().put("<-A->".getBytes());
- try {
- _sslWrapper.process();
- fail("no exception");
- } catch (TransportException e) {
- assertSame(sslException, e.getCause());
- assertEquals("", _underlyingInput.getAcceptedInput());
- }
-
- _expectedException.expect(TransportException.class);
- _sslWrapper.tail();
+ _sslWrapper.process();
+ assertEquals(_sslWrapper.capacity(), Transport.END_OF_STREAM);
}
@Test
Modified: qpid/proton/trunk/proton-j/src/test/java/org/apache/qpid/proton/systemtests/engine/ConnectionTest.java
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-j/src/test/java/org/apache/qpid/proton/systemtests/engine/ConnectionTest.java?rev=1611461&r1=1611460&r2=1611461&view=diff
==============================================================================
--- qpid/proton/trunk/proton-j/src/test/java/org/apache/qpid/proton/systemtests/engine/ConnectionTest.java (original)
+++ qpid/proton/trunk/proton-j/src/test/java/org/apache/qpid/proton/systemtests/engine/ConnectionTest.java Thu Jul 17 19:53:14 2014
@@ -80,7 +80,7 @@ public class ConnectionTest
/** Container id is a mandatory field so this should cause an error */
- @Test(expected=TransportException.class)
+ @Test
public void testReceiptOfOpenWithoutContainerId_causesTODO()
{
_pumper.pumpAll();
@@ -90,6 +90,7 @@ public class ConnectionTest
int serverConsumed = _serverTransport.input(openFrameBuffer, 0, openFrameBuffer.length);
assertEquals(openFrameBuffer.length, serverConsumed);
+ assertEquals(_serverTransport.capacity(), Transport.END_OF_STREAM);
}
/**
Modified: qpid/proton/trunk/tests/python/proton_tests/common.py
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/tests/python/proton_tests/common.py?rev=1611461&r1=1611460&r2=1611461&view=diff
==============================================================================
--- qpid/proton/trunk/tests/python/proton_tests/common.py (original)
+++ qpid/proton/trunk/tests/python/proton_tests/common.py Thu Jul 17 19:53:14 2014
@@ -50,20 +50,21 @@ def pump_uni(src, dst, buffer_size=1024)
p = src.pending()
c = dst.capacity()
- if p < 0 and c < 0:
- return False
+ if c < 0:
+ if p < 0:
+ return False
+ else:
+ src.close_head()
+ return True
if p < 0:
dst.close_tail()
elif p == 0 or c == 0:
return False
else:
- if c < 0:
- src.close_head()
- else:
- bytes = src.peek(min(c, buffer_size))
- dst.push(bytes)
- src.pop(len(bytes))
+ bytes = src.peek(min(c, buffer_size))
+ dst.push(bytes)
+ src.pop(len(bytes))
return True
Modified: qpid/proton/trunk/tests/python/proton_tests/engine.py
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/tests/python/proton_tests/engine.py?rev=1611461&r1=1611460&r2=1611461&view=diff
==============================================================================
--- qpid/proton/trunk/tests/python/proton_tests/engine.py (original)
+++ qpid/proton/trunk/tests/python/proton_tests/engine.py Thu Jul 17 19:53:14 2014
@@ -1081,11 +1081,9 @@ class IdleTimeoutTest(Test):
# now expire sndr
clock = 1.499
t_snd.tick(clock)
- try:
- self.pump()
- assert False, "Expected connection timeout did not happen!"
- except TransportException:
- pass
+ self.pump()
+ assert self.c2.state & Endpoint.REMOTE_CLOSED
+ assert self.c2.remote_condition.name == "amqp:resource-limit-exceeded"
class CreditTest(Test):
Modified: qpid/proton/trunk/tests/python/proton_tests/ssl.py
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/tests/python/proton_tests/ssl.py?rev=1611461&r1=1611460&r2=1611461&view=diff
==============================================================================
--- qpid/proton/trunk/tests/python/proton_tests/ssl.py (original)
+++ qpid/proton/trunk/tests/python/proton_tests/ssl.py Thu Jul 17 19:53:14 2014
@@ -73,6 +73,8 @@ class SslTest(common.Test):
client.connection.open()
server.connection.open()
self._pump(client, server)
+ if client.transport.closed:
+ return
assert client.ssl.protocol_name() is not None
client.connection.close()
server.connection.close()
@@ -214,11 +216,11 @@ class SslTest(common.Test):
client.connection.open()
server.connection.open()
- try:
- self._pump( client, server )
- assert False, "Server failed to reject bad certificate."
- except TransportException, e:
- pass
+ self._pump( client, server )
+ assert client.transport.closed
+ assert server.transport.closed
+ assert client.connection.state & Endpoint.REMOTE_UNINIT
+ assert server.connection.state & Endpoint.REMOTE_UNINIT
def test_client_authentication_fail_no_cert(self):
""" Ensure that the server will fail a client that does not provide a
@@ -239,11 +241,11 @@ class SslTest(common.Test):
client.connection.open()
server.connection.open()
- try:
- self._pump( client, server )
- assert False, "Server failed to reject bad certificate."
- except TransportException, e:
- pass
+ self._pump( client, server )
+ assert client.transport.closed
+ assert server.transport.closed
+ assert client.connection.state & Endpoint.REMOTE_UNINIT
+ assert server.connection.state & Endpoint.REMOTE_UNINIT
def test_client_server_authentication(self):
""" Require both client and server to mutually identify themselves.
@@ -314,11 +316,11 @@ class SslTest(common.Test):
client.connection.open()
server.connection.open()
- try:
- self._pump( client, server )
- assert False, "Client failed to reject bad certificate."
- except TransportException, e:
- pass
+ self._pump( client, server )
+ assert client.transport.closed
+ assert server.transport.closed
+ assert client.connection.state & Endpoint.REMOTE_UNINIT
+ assert server.connection.state & Endpoint.REMOTE_UNINIT
del server
del client
@@ -409,11 +411,11 @@ class SslTest(common.Test):
client.connection.open()
server.connection.open()
- try:
- self._pump( client, server )
- assert False, "Server did not reject client as expected."
- except TransportException:
- pass
+ self._pump( client, server )
+ assert client.transport.closed
+ assert server.transport.closed
+ assert client.connection.state & Endpoint.REMOTE_UNINIT
+ assert server.connection.state & Endpoint.REMOTE_UNINIT
def test_session_resume(self):
""" Test resume of client session.
@@ -563,11 +565,11 @@ class SslTest(common.Test):
client = SslTest.SslTestConnection( self.client_domain )
client.ssl.peer_hostname = "A1.Good.Server.domain.comX"
- try:
- self._do_handshake( client, server )
- assert False, "Expected connection to fail due to hostname mismatch"
- except TransportException:
- pass
+ self._do_handshake( client, server )
+ assert client.transport.closed
+ assert server.transport.closed
+ assert client.connection.state & Endpoint.REMOTE_UNINIT
+ assert server.connection.state & Endpoint.REMOTE_UNINIT
del server
del client
self.teardown()
@@ -659,11 +661,11 @@ class SslTest(common.Test):
client = SslTest.SslTestConnection( self.client_domain )
client.ssl.peer_hostname = "FOO.PREfi.domain.com"
- try:
- self._do_handshake( client, server )
- assert False, "Expected connection to fail due to hostname mismatch"
- except TransportException:
- pass
+ self._do_handshake( client, server )
+ assert client.transport.closed
+ assert server.transport.closed
+ assert client.connection.state & Endpoint.REMOTE_UNINIT
+ assert server.connection.state & Endpoint.REMOTE_UNINIT
del server
del client
self.teardown()
@@ -680,11 +682,11 @@ class SslTest(common.Test):
client = SslTest.SslTestConnection( self.client_domain )
client.ssl.peer_hostname = "PREfix.domain.COM"
- try:
- self._do_handshake( client, server )
- assert False, "Expected connection to fail due to hostname mismatch"
- except TransportException:
- pass
+ self._do_handshake( client, server )
+ assert client.transport.closed
+ assert server.transport.closed
+ assert client.connection.state & Endpoint.REMOTE_UNINIT
+ assert server.connection.state & Endpoint.REMOTE_UNINIT
self.teardown()
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org