You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rh...@apache.org on 2014/07/17 21:53:15 UTC

svn commit: r1611461 - in /qpid/proton/trunk: proton-c/bindings/python/ proton-c/src/engine/ proton-c/src/sasl/ proton-c/src/ssl/ proton-c/src/transport/ proton-j/src/main/java/org/apache/qpid/proton/engine/ proton-j/src/main/java/org/apache/qpid/proto...

Author: rhs
Date: Thu Jul 17 19:53:14 2014
New Revision: 1611461

URL: http://svn.apache.org/r1611461
Log:
streamlined error semantics of transport interface

Modified:
    qpid/proton/trunk/proton-c/bindings/python/proton.py
    qpid/proton/trunk/proton-c/src/engine/engine-internal.h
    qpid/proton/trunk/proton-c/src/sasl/sasl.c
    qpid/proton/trunk/proton-c/src/ssl/openssl.c
    qpid/proton/trunk/proton-c/src/transport/transport.c
    qpid/proton/trunk/proton-j/src/main/java/org/apache/qpid/proton/engine/Transport.java
    qpid/proton/trunk/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/FrameParser.java
    qpid/proton/trunk/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportImpl.java
    qpid/proton/trunk/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/ssl/SimpleSslTransportWrapper.java
    qpid/proton/trunk/proton-j/src/main/resources/cengine.py
    qpid/proton/trunk/proton-j/src/test/java/org/apache/qpid/proton/engine/impl/FrameParserTest.java
    qpid/proton/trunk/proton-j/src/test/java/org/apache/qpid/proton/engine/impl/ssl/SimpleSslTransportWrapperTest.java
    qpid/proton/trunk/proton-j/src/test/java/org/apache/qpid/proton/systemtests/engine/ConnectionTest.java
    qpid/proton/trunk/tests/python/proton_tests/common.py
    qpid/proton/trunk/tests/python/proton_tests/engine.py
    qpid/proton/trunk/tests/python/proton_tests/ssl.py

Modified: qpid/proton/trunk/proton-c/bindings/python/proton.py
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/bindings/python/proton.py?rev=1611461&r1=1611460&r2=1611461&view=diff
==============================================================================
--- qpid/proton/trunk/proton-c/bindings/python/proton.py (original)
+++ qpid/proton/trunk/proton-c/bindings/python/proton.py Thu Jul 17 19:53:14 2014
@@ -3051,6 +3051,10 @@ class Transport(object):
   def close_head(self):
     self._check(pn_transport_close_head(self._trans))
 
+  @property
+  def closed(self):
+    return pn_transport_closed(self._trans)
+
   # AMQP 1.0 max-frame-size
   def _get_max_frame_size(self):
     return pn_transport_get_max_frame(self._trans)

Modified: qpid/proton/trunk/proton-c/src/engine/engine-internal.h
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/src/engine/engine-internal.h?rev=1611461&r1=1611460&r2=1611461&view=diff
==============================================================================
--- qpid/proton/trunk/proton-c/src/engine/engine-internal.h (original)
+++ qpid/proton/trunk/proton-c/src/engine/engine-internal.h Thu Jul 17 19:53:14 2014
@@ -144,7 +144,6 @@ struct pn_transport_t {
   pn_timestamp_t keepalive_deadline;
   uint64_t last_bytes_output;
 
-  pn_error_t *error;
   pn_hash_t *local_channels;
   pn_hash_t *remote_channels;
   pn_string_t *scratch;
@@ -174,6 +173,7 @@ struct pn_transport_t {
   bool close_rcvd;
   bool tail_closed;      // input stream closed by driver
   bool head_closed;
+  bool done_processing; // if true, don't call pn_process again
 };
 
 struct pn_connection_t {
@@ -310,5 +310,6 @@ void pn_clear_tpwork(pn_delivery_t *deli
 void pn_work_update(pn_connection_t *connection, pn_delivery_t *delivery);
 void pn_clear_modified(pn_connection_t *connection, pn_endpoint_t *endpoint);
 void pn_connection_unbound(pn_connection_t *conn);
+int pn_do_error(pn_transport_t *transport, const char *condition, const char *fmt, ...);
 
 #endif /* engine-internal.h */

Modified: qpid/proton/trunk/proton-c/src/sasl/sasl.c
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/src/sasl/sasl.c?rev=1611461&r1=1611460&r2=1611461&view=diff
==============================================================================
--- qpid/proton/trunk/proton-c/src/sasl/sasl.c (original)
+++ qpid/proton/trunk/proton-c/src/sasl/sasl.c Thu Jul 17 19:53:14 2014
@@ -19,6 +19,7 @@
  *
  */
 
+#include <assert.h>
 #include <stdlib.h>
 #include <stdio.h>
 #include <string.h>
@@ -405,8 +406,9 @@ static ssize_t pn_input_read_sasl_header
   if (!available || memcmp(bytes, point, delta)) {
     char quoted[1024];
     pn_quote_data(quoted, 1024, bytes, available);
-    return pn_error_format(sasl->transport->error, PN_ERR,
-                           "%s header mismatch: '%s'", "SASL", quoted);
+    pn_do_error(sasl->transport, "amqp:connection:framing-error",
+                "%s header mismatch: '%s'", "SASL", quoted);
+    return PN_EOS;
   } else {
     sasl->header_count += delta;
     if (sasl->header_count == SASL_HEADER_LEN) {
@@ -435,13 +437,10 @@ static ssize_t pn_output_write_sasl_head
   pn_sasl_t *sasl = (pn_sasl_t *)io_layer->context;
   if (sasl->disp->trace & PN_TRACE_FRM)
     pn_transport_logf(sasl->transport, "  -> %s", "SASL");
-  if (size >= SASL_HEADER_LEN) {
-    memmove(bytes, SASL_HEADER, SASL_HEADER_LEN);
-    sasl->io_layer->process_output = pn_output_write_sasl;
-    return SASL_HEADER_LEN;
-  } else {
-    return pn_error_format(sasl->transport->error, PN_UNDERFLOW, "underflow writing %s header", "SASL");
-  }
+  assert(size >= SASL_HEADER_LEN);
+  memmove(bytes, SASL_HEADER, SASL_HEADER_LEN);
+  sasl->io_layer->process_output = pn_output_write_sasl;
+  return SASL_HEADER_LEN;
 }
 
 static ssize_t pn_output_write_sasl(pn_io_layer_t *io_layer, char *bytes, size_t size)

Modified: qpid/proton/trunk/proton-c/src/ssl/openssl.c
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/src/ssl/openssl.c?rev=1611461&r1=1611460&r2=1611461&view=diff
==============================================================================
--- qpid/proton/trunk/proton-c/src/ssl/openssl.c (original)
+++ qpid/proton/trunk/proton-c/src/ssl/openssl.c Thu Jul 17 19:53:14 2014
@@ -187,7 +187,7 @@ static int ssl_failed(pn_ssl_t *ssl)
 {
     SSL_set_shutdown(ssl->ssl, SSL_SENT_SHUTDOWN|SSL_RECEIVED_SHUTDOWN);
   ssl->ssl_closed = true;
-  ssl->app_input_closed = ssl->app_output_closed = PN_ERR;
+  ssl->app_input_closed = ssl->app_output_closed = PN_EOS;
   // fake a shutdown so the i/o processing code will close properly
   SSL_set_shutdown(ssl->ssl, SSL_SENT_SHUTDOWN|SSL_RECEIVED_SHUTDOWN);
   // try to grab the first SSL error to add to the failure log
@@ -198,7 +198,8 @@ static int ssl_failed(pn_ssl_t *ssl)
   }
   _log_ssl_error(NULL);    // spit out any remaining errors to the log file
   ssl->transport->tail_closed = true;
-  return pn_error_format( ssl->transport->error, PN_ERR, "SSL Failure: %s", buf );
+  pn_do_error(ssl->transport, "amqp:connection:framing-error", "SSL Failure: %s", buf);
+  return PN_EOS;
 }
 
 /* match the DNS name pattern from the peer certificate against our configured peer
@@ -810,7 +811,7 @@ static int setup_ssl_connection( pn_ssl_
 static ssize_t process_input_ssl( pn_io_layer_t *io_layer, const char *input_data, size_t available)
 {
   pn_ssl_t *ssl = (pn_ssl_t *)io_layer->context;
-  if (ssl->ssl == NULL && init_ssl_socket(ssl)) return PN_ERR;
+  if (ssl->ssl == NULL && init_ssl_socket(ssl)) return PN_EOS;
 
   _log( ssl, "process_input_ssl( data size=%d )\n",available );
 
@@ -954,8 +955,8 @@ static ssize_t process_input_ssl( pn_io_
 static ssize_t process_output_ssl( pn_io_layer_t *io_layer, char *buffer, size_t max_len)
 {
   pn_ssl_t *ssl = (pn_ssl_t *)io_layer->context;
-  if (!ssl) return PN_ERR;
-  if (ssl->ssl == NULL && init_ssl_socket(ssl)) return PN_ERR;
+  if (!ssl) return PN_EOS;
+  if (ssl->ssl == NULL && init_ssl_socket(ssl)) return PN_EOS;
 
   ssize_t written = 0;
   bool work_pending;

Modified: qpid/proton/trunk/proton-c/src/transport/transport.c
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/src/transport/transport.c?rev=1611461&r1=1611460&r2=1611461&view=diff
==============================================================================
--- qpid/proton/trunk/proton-c/src/transport/transport.c (original)
+++ qpid/proton/trunk/proton-c/src/transport/transport.c Thu Jul 17 19:53:14 2014
@@ -163,7 +163,6 @@ static void pn_transport_initialize(void
   transport->remote_desired_capabilities = pn_data(0);
   transport->remote_properties = pn_data(0);
   transport->disp_data = pn_data(0);
-  transport->error = pn_error();
   pn_condition_init(&transport->remote_condition);
 
   transport->local_channels = pn_hash(0, 0.75, PN_REFCOUNT);
@@ -174,6 +173,8 @@ static void pn_transport_initialize(void
 
   transport->input_pending = 0;
   transport->output_pending = 0;
+
+  transport->done_processing = false;
 }
 
 pn_session_t *pn_channel_state(pn_transport_t *transport, uint16_t channel)
@@ -246,7 +247,6 @@ static void pn_transport_finalize(void *
   pn_free(transport->remote_desired_capabilities);
   pn_free(transport->remote_properties);
   pn_free(transport->disp_data);
-  pn_error_free(transport->error);
   pn_condition_tini(&transport->remote_condition);
   pn_free(transport->local_channels);
   pn_free(transport->remote_channels);
@@ -266,10 +266,8 @@ int pn_transport_bind(pn_transport_t *tr
   if (transport->open_rcvd) {
     PN_SET_REMOTE(connection->endpoint.state, PN_REMOTE_ACTIVE);
     pn_collector_put(connection->collector, PN_CONNECTION_REMOTE_OPEN, connection);
-    if (!pn_error_code(transport->error)) {
-      transport->disp->halt = false;
-      transport_consume(transport);        // blech - testBindAfterOpen
-    }
+    transport->disp->halt = false;
+    transport_consume(transport);        // blech - testBindAfterOpen
   }
   return 0;
 }
@@ -326,7 +324,7 @@ int pn_transport_unbind(pn_transport_t *
 
 pn_error_t *pn_transport_error(pn_transport_t *transport)
 {
-  return transport->error;
+  return NULL;
 }
 
 static void pn_map_handle(pn_session_t *ssn, uint32_t handle, pn_link_t *link)
@@ -392,13 +390,12 @@ void pni_disposition_encode(pn_dispositi
   }
 }
 
-int pn_post_close(pn_transport_t *transport, const char *condition)
+int pn_post_close(pn_transport_t *transport, const char *condition, const char *description)
 {
   pn_condition_t *cond = NULL;
   if (transport->connection) {
     cond = pn_connection_condition(transport->connection);
   }
-  const char *description = NULL;
   pn_data_t *info = NULL;
   if (!condition && pn_condition_is_set(cond)) {
     condition = pn_condition_get_name(cond);
@@ -418,13 +415,16 @@ int pn_do_error(pn_transport_t *transpor
   // XXX: result
   vsnprintf(buf, 1024, fmt, ap);
   va_end(ap);
-  pn_error_set(transport->error, PN_ERR, buf);
   if (!transport->close_sent) {
-    pn_post_close(transport, condition);
+    if (!transport->open_sent) {
+      pn_post_frame(transport->disp, 0, "DL[S]", OPEN, "");
+    }
+
+    pn_post_close(transport, condition, buf);
     transport->close_sent = true;
   }
   transport->disp->halt = true;
-  pn_transport_logf(transport, "ERROR %s %s", condition, pn_error_text(transport->error));
+  pn_transport_logf(transport, "ERROR %s %s", condition, buf);
   return PN_ERR;
 }
 
@@ -1004,11 +1004,7 @@ static ssize_t transport_consume(pn_tran
     } else if (n == 0) {
       break;
     } else {
-      if (n != PN_EOS) {
-        pn_transport_logf(transport, "ERROR[%i] %s\n",
-                          pn_error_code(transport->error),
-                          pn_error_text(transport->error));
-      }
+      assert(n == PN_EOS);
       if (transport->disp->trace & (PN_TRACE_RAW | PN_TRACE_FRM))
         pn_transport_log(transport, "  <- EOS");
       transport->input_pending = 0;  // XXX ???
@@ -1032,9 +1028,10 @@ static ssize_t pn_input_read_header(pn_t
   if (!available || memcmp(bytes, point, delta)) {
     char quoted[1024];
     pn_quote_data(quoted, 1024, bytes, available);
-    return pn_error_format(transport->error, PN_ERR,
-                           "%s header mismatch: '%s'%s", protocol, quoted,
-                           available ? "" : " (connection aborted)");
+    pn_do_error(transport, "amqp:connection:framing-error",
+                "%s header mismatch: '%s'%s", protocol, quoted,
+                available ? "" : " (connection aborted)");
+    return PN_EOS;
   } else {
     transport->header_count += delta;
     if (transport->header_count == size) {
@@ -1063,21 +1060,20 @@ static ssize_t pn_input_read_amqp(pn_io_
   if (transport->close_rcvd) {
     if (available > 0) {
       pn_do_error(transport, "amqp:connection:framing-error", "data after close");
-      return PN_ERR;
-    } else {
       return PN_EOS;
     }
   }
 
   if (!available) {
     pn_do_error(transport, "amqp:connection:framing-error", "connection aborted");
-    return PN_ERR;
+    return PN_EOS;
   }
 
 
   ssize_t n = pn_dispatcher_input(transport->disp, bytes, available);
   if (n < 0) {
-    return pn_error_set(transport->error, n, "dispatch error");
+    //return pn_error_set(transport->error, n, "dispatch error");
+    return PN_EOS;
   } else if (transport->close_rcvd) {
     return PN_EOS;
   } else {
@@ -1636,7 +1632,7 @@ int pn_process_conn_teardown(pn_transpor
   {
     if (endpoint->state & PN_LOCAL_CLOSED && !transport->close_sent) {
       if (pn_pointful_buffering(transport, NULL)) return 0;
-      int err = pn_post_close(transport, NULL);
+      int err = pn_post_close(transport, NULL, NULL);
       if (err) return err;
       transport->close_sent = true;
     }
@@ -1696,13 +1692,10 @@ static ssize_t pn_output_write_header(pn
 {
   if (transport->disp->trace & PN_TRACE_FRM)
     pn_transport_logf(transport, "  -> %s", protocol);
-  if (size >= hdrsize) {
-    memmove(bytes, header, hdrsize);
-    transport->io_layers[PN_IO_AMQP].process_output = next;
-    return hdrsize;
-  } else {
-    return pn_error_format(transport->error, PN_UNDERFLOW, "underflow writing %s header", protocol);
-  }
+  assert(size >= hdrsize);
+  memmove(bytes, header, hdrsize);
+  transport->io_layers[PN_IO_AMQP].process_output = next;
+  return hdrsize;
 }
 
 static ssize_t pn_output_write_amqp_header(pn_io_layer_t *io_layer, char *bytes, size_t size)
@@ -1715,22 +1708,19 @@ static ssize_t pn_output_write_amqp_head
 static ssize_t pn_output_write_amqp(pn_io_layer_t *io_layer, char *bytes, size_t size)
 {
   pn_transport_t *transport = (pn_transport_t *)io_layer->context;
-  if (!transport->connection) {
-    return 0;
-  }
-
-  if (!pn_error_code(transport->error)) {
-    pn_error_set(transport->error, pn_process(transport), "process error");
+  if (transport->connection && !transport->done_processing) {
+    int err = pn_process(transport);
+    if (err) {
+      pn_transport_logf(transport, "process error %i", err);
+      transport->done_processing = true;
+    }
   }
 
-  // write out any buffered data _before_ returning an error code,
-  // else we could truncate an outgoing Close frame containing a
-  // useful error status
-  if (!transport->disp->available && (transport->close_sent || pn_error_code(transport->error))) {
-    if (pn_error_code(transport->error))
-      return pn_error_code(transport->error);
-    else
-      return PN_EOS;
+  // write out any buffered data _before_ returning PN_EOS, else we
+  // could truncate an outgoing Close frame containing a useful error
+  // status
+  if (!transport->disp->available && transport->close_sent) {
+    return PN_EOS;
   }
 
   return pn_dispatcher_output(transport->disp, bytes, size);
@@ -1774,11 +1764,12 @@ static ssize_t transport_produce(pn_tran
       if (transport->output_pending)
         break;   // return what is available
       if (transport->disp->trace & (PN_TRACE_RAW | PN_TRACE_FRM)) {
-        if (n == PN_EOS)
+        if (n < 0) {
           pn_transport_log(transport, "  -> EOS");
-        else
+        }
+        /*else
           pn_transport_logf(transport, "  -> EOS (%" PN_ZI ") %s", n,
-                            pn_error_text(transport->error));
+          pn_error_text(transport->error));*/
       }
       return n;
     }
@@ -2030,8 +2021,7 @@ int pn_transport_process(pn_transport_t 
 int pn_transport_close_tail(pn_transport_t *transport)
 {
   transport->tail_closed = true;
-  ssize_t x = transport_consume( transport );
-  if (x < 0) return (int) x;
+  transport_consume( transport );
   return 0;
   // XXX: what if not all input processed at this point?  do we care???
 }
@@ -2088,11 +2078,7 @@ void pn_transport_pop(pn_transport_t *tr
 int pn_transport_close_head(pn_transport_t *transport)
 {
   transport->head_closed = true;
-  if (transport->close_sent && transport->output_pending == 0) {
-    return 0;
-  } else {
-    return pn_error_set(transport->error, PN_ERR, "connection aborted");
-  }
+  return 0;
 }
 
 // true if the transport will not generate further output

Modified: qpid/proton/trunk/proton-j/src/main/java/org/apache/qpid/proton/engine/Transport.java
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-j/src/main/java/org/apache/qpid/proton/engine/Transport.java?rev=1611461&r1=1611460&r2=1611461&view=diff
==============================================================================
--- qpid/proton/trunk/proton-j/src/main/java/org/apache/qpid/proton/engine/Transport.java (original)
+++ qpid/proton/trunk/proton-j/src/main/java/org/apache/qpid/proton/engine/Transport.java Thu Jul 17 19:53:14 2014
@@ -94,6 +94,8 @@ public interface Transport extends Endpo
     public void pop(int bytes);
     public void close_head();
 
+    public boolean isClosed();
+
     /**
      * Processes the provided input.
      *

Modified: qpid/proton/trunk/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/FrameParser.java
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/FrameParser.java?rev=1611461&r1=1611460&r2=1611461&view=diff
==============================================================================
--- qpid/proton/trunk/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/FrameParser.java (original)
+++ qpid/proton/trunk/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/FrameParser.java Thu Jul 17 19:53:14 2014
@@ -487,11 +487,7 @@ class FrameParser implements TransportIn
     public ByteBuffer tail()
     {
         if (_tail_closed) {
-            if (_parsingError != null) {
-                throw new TransportException(_parsingError.getMessage());
-            } else {
-                throw new TransportException("tail closed");
-            }
+            throw new TransportException("tail closed");
         }
 
         if (_inputBuffer == null) {

Modified: qpid/proton/trunk/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportImpl.java
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportImpl.java?rev=1611461&r1=1611460&r2=1611461&view=diff
==============================================================================
--- qpid/proton/trunk/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportImpl.java (original)
+++ qpid/proton/trunk/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportImpl.java Thu Jul 17 19:53:14 2014
@@ -1339,6 +1339,12 @@ public class TransportImpl extends Endpo
         _outputProcessor.close_head();
     }
 
+    public boolean isClosed() {
+        int p = pending();
+        int c = capacity();
+        return  p == END_OF_STREAM && c == END_OF_STREAM;
+    }
+
     @Override
     public String toString()
     {

Modified: qpid/proton/trunk/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/ssl/SimpleSslTransportWrapper.java
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/ssl/SimpleSslTransportWrapper.java?rev=1611461&r1=1611460&r2=1611461&view=diff
==============================================================================
--- qpid/proton/trunk/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/ssl/SimpleSslTransportWrapper.java (original)
+++ qpid/proton/trunk/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/ssl/SimpleSslTransportWrapper.java Thu Jul 17 19:53:14 2014
@@ -338,22 +338,13 @@ public class SimpleSslTransportWrapper i
 
         _inputBuffer.flip();
 
-        try
-        {
-            try {
-                unwrapInput();
-            } catch (SSLException e) {
-                throw new TransportException(e);
-            }
-        }
-        catch (TransportException e)
-        {
+        try {
+            unwrapInput();
+        } catch (SSLException e) {
+            _logger.log(Level.WARNING, e.getMessage());
             _inputBuffer.position(_inputBuffer.limit());
             _tail_closed = true;
-            throw e;
-        }
-        finally
-        {
+        } finally {
             _inputBuffer.compact();
         }
     }
@@ -374,17 +365,17 @@ public class SimpleSslTransportWrapper i
         try {
             wrapOutput();
         } catch (SSLException e) {
-            throw new TransportException(e);
+            _logger.log(Level.WARNING, e.getMessage());
+            _head_closed = true;
         }
 
         _head.limit(_outputBuffer.position());
 
-        if (_head_closed && _outputBuffer.position() == 0)
-        {
+        if (_head_closed && _outputBuffer.position() == 0) {
             return Transport.END_OF_STREAM;
-        } else {
-            return _outputBuffer.position();
         }
+
+        return _outputBuffer.position();
     }
 
     @Override
@@ -408,6 +399,10 @@ public class SimpleSslTransportWrapper i
     public void close_head()
     {
         _underlyingOutput.close_head();
+        int p = pending();
+        if (p > 0) {
+            pop(p);
+        }
     }
 
 

Modified: qpid/proton/trunk/proton-j/src/main/resources/cengine.py
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-j/src/main/resources/cengine.py?rev=1611461&r1=1611460&r2=1611461&view=diff
==============================================================================
--- qpid/proton/trunk/proton-j/src/main/resources/cengine.py (original)
+++ qpid/proton/trunk/proton-j/src/main/resources/cengine.py Thu Jul 17 19:53:14 2014
@@ -863,7 +863,6 @@ class pn_transport_wrapper:
 
   def __init__(self, impl):
     self.impl = impl
-    self.error = pn_error(0, None)
 
 def pn_transport():
   return wrap(Proton.transport(), pn_transport_wrapper)
@@ -902,10 +901,7 @@ def pn_transport_trace(trans, n):
   pass
 
 def pn_transport_pending(trans):
-  try:
-    return trans.impl.pending()
-  except TransportException, e:
-    return trans.error.set(PN_ERR, str(e))
+  return trans.impl.pending()
 
 def pn_transport_peek(trans, size):
   size = min(trans.impl.pending(), size)
@@ -931,31 +927,19 @@ def pn_transport_push(trans, input):
 
   bb = trans.impl.tail()
   bb.put(array(input, 'b'))
-  try:
-    trans.impl.process()
-    return len(input)
-  except TransportException, e:
-    trans.error = pn_error(PN_ERR, str(e))
-    return PN_ERR
+  trans.impl.process()
+  return len(input)
 
 def pn_transport_close_head(trans):
-  try:
-    trans.impl.close_head()
-    return 0
-  except TransportException, e:
-    trans.error = pn_error(PN_ERR, str(e))
-    return PN_ERR
+  trans.impl.close_head()
+  return 0
 
 def pn_transport_close_tail(trans):
-  try:
-    trans.impl.close_tail()
-    return 0
-  except TransportException, e:
-    trans.error = pn_error(PN_ERR, str(e))
-    return PN_ERR
+  trans.impl.close_tail()
+  return 0
 
-def pn_transport_error(trans):
-  return trans.error
+def pn_transport_closed(trans):
+  return trans.impl.isClosed()
 
 from org.apache.qpid.proton.engine import Event
 

Modified: qpid/proton/trunk/proton-j/src/test/java/org/apache/qpid/proton/engine/impl/FrameParserTest.java
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-j/src/test/java/org/apache/qpid/proton/engine/impl/FrameParserTest.java?rev=1611461&r1=1611460&r2=1611461&view=diff
==============================================================================
--- qpid/proton/trunk/proton-j/src/test/java/org/apache/qpid/proton/engine/impl/FrameParserTest.java (original)
+++ qpid/proton/trunk/proton-j/src/test/java/org/apache/qpid/proton/engine/impl/FrameParserTest.java Thu Jul 17 19:53:14 2014
@@ -41,6 +41,7 @@ import org.apache.qpid.proton.amqp.trans
 import org.apache.qpid.proton.codec.AMQPDefinedTypes;
 import org.apache.qpid.proton.codec.DecoderImpl;
 import org.apache.qpid.proton.codec.EncoderImpl;
+import org.apache.qpid.proton.engine.Transport;
 import org.apache.qpid.proton.engine.TransportException;
 import org.apache.qpid.proton.engine.TransportResult;
 import org.apache.qpid.proton.engine.TransportResult.Status;
@@ -63,9 +64,6 @@ public class FrameParserTest
 
     private final AmqpFramer _amqpFramer = new AmqpFramer();
 
-    @Rule
-    public ExpectedException _expectedException = ExpectedException.none();
-
     @Before
     public void setUp()
     {
@@ -80,16 +78,8 @@ public class FrameParserTest
         String headerMismatchMessage = "AMQP header mismatch";
         ByteBuffer buffer = _frameParser.tail();
         buffer.put("hello".getBytes());
-        try {
-            _frameParser.process();
-            fail("expected exception");
-        } catch (TransportException e) {
-            assertThat(e.getMessage(), containsString(headerMismatchMessage));
-        }
-
-        _expectedException.expect(TransportException.class);
-        _expectedException.expectMessage(headerMismatchMessage);
-        _frameParser.tail();
+        _frameParser.process();
+        assertEquals(_frameParser.capacity(), Transport.END_OF_STREAM);
     }
 
     @Test

Modified: qpid/proton/trunk/proton-j/src/test/java/org/apache/qpid/proton/engine/impl/ssl/SimpleSslTransportWrapperTest.java
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-j/src/test/java/org/apache/qpid/proton/engine/impl/ssl/SimpleSslTransportWrapperTest.java?rev=1611461&r1=1611460&r2=1611461&view=diff
==============================================================================
--- qpid/proton/trunk/proton-j/src/test/java/org/apache/qpid/proton/engine/impl/ssl/SimpleSslTransportWrapperTest.java (original)
+++ qpid/proton/trunk/proton-j/src/test/java/org/apache/qpid/proton/engine/impl/ssl/SimpleSslTransportWrapperTest.java Thu Jul 17 19:53:14 2014
@@ -32,6 +32,7 @@ import java.nio.ByteBuffer;
 
 import javax.net.ssl.SSLException;
 
+import org.apache.qpid.proton.engine.Transport;
 import org.apache.qpid.proton.engine.TransportException;
 import org.junit.Before;
 import org.junit.Rule;
@@ -132,16 +133,8 @@ public class SimpleSslTransportWrapperTe
         _dummySslEngine.rejectNextEncodedPacket(sslException);
 
         _sslWrapper.tail().put("<-A->".getBytes());
-        try {
-            _sslWrapper.process();
-            fail("no exception");
-        } catch (TransportException e) {
-            assertSame(sslException, e.getCause());
-            assertEquals("", _underlyingInput.getAcceptedInput());
-        }
-
-        _expectedException.expect(TransportException.class);
-        _sslWrapper.tail();
+        _sslWrapper.process();
+        assertEquals(_sslWrapper.capacity(), Transport.END_OF_STREAM);
     }
 
     @Test

Modified: qpid/proton/trunk/proton-j/src/test/java/org/apache/qpid/proton/systemtests/engine/ConnectionTest.java
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-j/src/test/java/org/apache/qpid/proton/systemtests/engine/ConnectionTest.java?rev=1611461&r1=1611460&r2=1611461&view=diff
==============================================================================
--- qpid/proton/trunk/proton-j/src/test/java/org/apache/qpid/proton/systemtests/engine/ConnectionTest.java (original)
+++ qpid/proton/trunk/proton-j/src/test/java/org/apache/qpid/proton/systemtests/engine/ConnectionTest.java Thu Jul 17 19:53:14 2014
@@ -80,7 +80,7 @@ public class ConnectionTest
 
 
     /** Container id is a mandatory field so this should cause an error */
-    @Test(expected=TransportException.class)
+    @Test
     public void testReceiptOfOpenWithoutContainerId_causesTODO()
     {
         _pumper.pumpAll();
@@ -90,6 +90,7 @@ public class ConnectionTest
 
         int serverConsumed = _serverTransport.input(openFrameBuffer, 0, openFrameBuffer.length);
         assertEquals(openFrameBuffer.length, serverConsumed);
+        assertEquals(_serverTransport.capacity(), Transport.END_OF_STREAM);
     }
 
     /**

Modified: qpid/proton/trunk/tests/python/proton_tests/common.py
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/tests/python/proton_tests/common.py?rev=1611461&r1=1611460&r2=1611461&view=diff
==============================================================================
--- qpid/proton/trunk/tests/python/proton_tests/common.py (original)
+++ qpid/proton/trunk/tests/python/proton_tests/common.py Thu Jul 17 19:53:14 2014
@@ -50,20 +50,21 @@ def pump_uni(src, dst, buffer_size=1024)
   p = src.pending()
   c = dst.capacity()
 
-  if p < 0 and c < 0:
-    return False
+  if c < 0:
+    if p < 0:
+      return False
+    else:
+      src.close_head()
+      return True
 
   if p < 0:
     dst.close_tail()
   elif p == 0 or c == 0:
     return False
   else:
-    if c < 0:
-      src.close_head()
-    else:
-      bytes = src.peek(min(c, buffer_size))
-      dst.push(bytes)
-      src.pop(len(bytes))
+    bytes = src.peek(min(c, buffer_size))
+    dst.push(bytes)
+    src.pop(len(bytes))
 
   return True
 

Modified: qpid/proton/trunk/tests/python/proton_tests/engine.py
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/tests/python/proton_tests/engine.py?rev=1611461&r1=1611460&r2=1611461&view=diff
==============================================================================
--- qpid/proton/trunk/tests/python/proton_tests/engine.py (original)
+++ qpid/proton/trunk/tests/python/proton_tests/engine.py Thu Jul 17 19:53:14 2014
@@ -1081,11 +1081,9 @@ class IdleTimeoutTest(Test):
     # now expire sndr
     clock = 1.499
     t_snd.tick(clock)
-    try:
-      self.pump()
-      assert False, "Expected connection timeout did not happen!"
-    except TransportException:
-      pass
+    self.pump()
+    assert self.c2.state & Endpoint.REMOTE_CLOSED
+    assert self.c2.remote_condition.name == "amqp:resource-limit-exceeded"
 
 class CreditTest(Test):
 

Modified: qpid/proton/trunk/tests/python/proton_tests/ssl.py
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/tests/python/proton_tests/ssl.py?rev=1611461&r1=1611460&r2=1611461&view=diff
==============================================================================
--- qpid/proton/trunk/tests/python/proton_tests/ssl.py (original)
+++ qpid/proton/trunk/tests/python/proton_tests/ssl.py Thu Jul 17 19:53:14 2014
@@ -73,6 +73,8 @@ class SslTest(common.Test):
         client.connection.open()
         server.connection.open()
         self._pump(client, server)
+        if client.transport.closed:
+            return
         assert client.ssl.protocol_name() is not None
         client.connection.close()
         server.connection.close()
@@ -214,11 +216,11 @@ class SslTest(common.Test):
 
         client.connection.open()
         server.connection.open()
-        try:
-            self._pump( client, server )
-            assert False, "Server failed to reject bad certificate."
-        except TransportException, e:
-            pass
+        self._pump( client, server )
+        assert client.transport.closed
+        assert server.transport.closed
+        assert client.connection.state & Endpoint.REMOTE_UNINIT
+        assert server.connection.state & Endpoint.REMOTE_UNINIT
 
     def test_client_authentication_fail_no_cert(self):
         """ Ensure that the server will fail a client that does not provide a
@@ -239,11 +241,11 @@ class SslTest(common.Test):
 
         client.connection.open()
         server.connection.open()
-        try:
-            self._pump( client, server )
-            assert False, "Server failed to reject bad certificate."
-        except TransportException, e:
-            pass
+        self._pump( client, server )
+        assert client.transport.closed
+        assert server.transport.closed
+        assert client.connection.state & Endpoint.REMOTE_UNINIT
+        assert server.connection.state & Endpoint.REMOTE_UNINIT
 
     def test_client_server_authentication(self):
         """ Require both client and server to mutually identify themselves.
@@ -314,11 +316,11 @@ class SslTest(common.Test):
 
         client.connection.open()
         server.connection.open()
-        try:
-            self._pump( client, server )
-            assert False, "Client failed to reject bad certificate."
-        except TransportException, e:
-            pass
+        self._pump( client, server )
+        assert client.transport.closed
+        assert server.transport.closed
+        assert client.connection.state & Endpoint.REMOTE_UNINIT
+        assert server.connection.state & Endpoint.REMOTE_UNINIT
 
         del server
         del client
@@ -409,11 +411,11 @@ class SslTest(common.Test):
 
         client.connection.open()
         server.connection.open()
-        try:
-            self._pump( client, server )
-            assert False, "Server did not reject client as expected."
-        except TransportException:
-            pass
+        self._pump( client, server )
+        assert client.transport.closed
+        assert server.transport.closed
+        assert client.connection.state & Endpoint.REMOTE_UNINIT
+        assert server.connection.state & Endpoint.REMOTE_UNINIT
 
     def test_session_resume(self):
         """ Test resume of client session.
@@ -563,11 +565,11 @@ class SslTest(common.Test):
         client = SslTest.SslTestConnection( self.client_domain )
 
         client.ssl.peer_hostname = "A1.Good.Server.domain.comX"
-        try:
-            self._do_handshake( client, server )
-            assert False, "Expected connection to fail due to hostname mismatch"
-        except TransportException:
-            pass
+        self._do_handshake( client, server )
+        assert client.transport.closed
+        assert server.transport.closed
+        assert client.connection.state & Endpoint.REMOTE_UNINIT
+        assert server.connection.state & Endpoint.REMOTE_UNINIT
         del server
         del client
         self.teardown()
@@ -659,11 +661,11 @@ class SslTest(common.Test):
         client = SslTest.SslTestConnection( self.client_domain )
 
         client.ssl.peer_hostname = "FOO.PREfi.domain.com"
-        try:
-            self._do_handshake( client, server )
-            assert False, "Expected connection to fail due to hostname mismatch"
-        except TransportException:
-            pass
+        self._do_handshake( client, server )
+        assert client.transport.closed
+        assert server.transport.closed
+        assert client.connection.state & Endpoint.REMOTE_UNINIT
+        assert server.connection.state & Endpoint.REMOTE_UNINIT
         del server
         del client
         self.teardown()
@@ -680,11 +682,11 @@ class SslTest(common.Test):
         client = SslTest.SslTestConnection( self.client_domain )
 
         client.ssl.peer_hostname = "PREfix.domain.COM"
-        try:
-            self._do_handshake( client, server )
-            assert False, "Expected connection to fail due to hostname mismatch"
-        except TransportException:
-            pass
+        self._do_handshake( client, server )
+        assert client.transport.closed
+        assert server.transport.closed
+        assert client.connection.state & Endpoint.REMOTE_UNINIT
+        assert server.connection.state & Endpoint.REMOTE_UNINIT
         self.teardown()
 
 



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org