You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by kg...@apache.org on 2013/01/29 15:38:13 UTC

svn commit: r1439912 - in /qpid/proton/trunk/proton-c/src: engine/engine-internal.h engine/engine.c sasl/sasl.c ssl/openssl.c ssl/ssl-internal.h

Author: kgiusti
Date: Tue Jan 29 14:38:13 2013
New Revision: 1439912

URL: http://svn.apache.org/viewvc?rev=1439912&view=rev
Log:
PROTON-152: explicitly separate the I/O layers

Modified:
    qpid/proton/trunk/proton-c/src/engine/engine-internal.h
    qpid/proton/trunk/proton-c/src/engine/engine.c
    qpid/proton/trunk/proton-c/src/sasl/sasl.c
    qpid/proton/trunk/proton-c/src/ssl/openssl.c
    qpid/proton/trunk/proton-c/src/ssl/ssl-internal.h

Modified: qpid/proton/trunk/proton-c/src/engine/engine-internal.h
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/src/engine/engine-internal.h?rev=1439912&r1=1439911&r2=1439912&view=diff
==============================================================================
--- qpid/proton/trunk/proton-c/src/engine/engine-internal.h (original)
+++ qpid/proton/trunk/proton-c/src/engine/engine-internal.h Tue Jan 29 14:38:13 2013
@@ -107,10 +107,15 @@ typedef struct {
 #include <proton/sasl.h>
 #include <proton/ssl.h>
 
+typedef struct pn_io_layer_t {
+  void *context;
+  struct pn_io_layer_t *next;
+  ssize_t (*process_input)(struct pn_io_layer_t *io_layer, const char *, size_t);
+  ssize_t (*process_output)(struct pn_io_layer_t *io_layer, char *, size_t);
+  pn_timestamp_t (*process_tick)(struct pn_io_layer_t *io_layer, pn_timestamp_t);
+} pn_io_layer_t;
+
 struct pn_transport_t {
-  ssize_t (*process_input)(pn_transport_t *, const char *, size_t);
-  ssize_t (*process_output)(pn_transport_t *, char *, size_t);
-  pn_timestamp_t (*process_tick)(pn_transport_t *, pn_timestamp_t);
   size_t header_count;
   pn_sasl_t *sasl;
   pn_ssl_t *ssl;
@@ -128,6 +133,12 @@ struct pn_transport_t {
   uint32_t   remote_max_frame;
   pn_condition_t remote_condition;
 
+#define PN_IO_SSL  0
+#define PN_IO_SASL 1
+#define PN_IO_AMQP 2
+#define PN_IO_LAYER_CT (PN_IO_AMQP+1)
+  pn_io_layer_t io_layers[PN_IO_LAYER_CT];
+
   /* dead remote detection */
   pn_millis_t local_idle_timeout;
   pn_timestamp_t dead_remote_deadline;
@@ -253,4 +264,8 @@ void pn_link_dump(pn_link_t *link);
 void pn_dump(pn_connection_t *conn);
 void pn_transport_sasl_init(pn_transport_t *transport);
 
+ssize_t pn_io_layer_input_passthru(pn_io_layer_t *, const char *, size_t );
+ssize_t pn_io_layer_output_passthru(pn_io_layer_t *, char *, size_t );
+pn_timestamp_t pn_io_layer_tick_passthru(pn_io_layer_t *, pn_timestamp_t);
+
 #endif /* engine-internal.h */

Modified: qpid/proton/trunk/proton-c/src/engine/engine.c
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/src/engine/engine.c?rev=1439912&r1=1439911&r2=1439912&view=diff
==============================================================================
--- qpid/proton/trunk/proton-c/src/engine/engine.c (original)
+++ qpid/proton/trunk/proton-c/src/engine/engine.c Tue Jan 29 14:38:13 2013
@@ -728,26 +728,36 @@ int pn_do_detach(pn_dispatcher_t *disp);
 int pn_do_end(pn_dispatcher_t *disp);
 int pn_do_close(pn_dispatcher_t *disp);
 
-static ssize_t pn_input_read_sasl_header(pn_transport_t *transport, const char *bytes, size_t available);
-static ssize_t pn_input_read_sasl(pn_transport_t *transport, const char *bytes, size_t available);
-static ssize_t pn_input_read_amqp_header(pn_transport_t *transport, const char *bytes, size_t available);
-static ssize_t pn_input_read_amqp(pn_transport_t *transport, const char *bytes, size_t available);
-static ssize_t pn_output_write_sasl_header(pn_transport_t *transport, char *bytes, size_t available);
-static ssize_t pn_output_write_sasl(pn_transport_t *transport, char *bytes, size_t available);
-static ssize_t pn_output_write_amqp_header(pn_transport_t *transport, char *bytes, size_t available);
-static ssize_t pn_output_write_amqp(pn_transport_t *transport, char *bytes, size_t available);
-static pn_timestamp_t pn_process_tick(pn_transport_t *transport, pn_timestamp_t now);
+static ssize_t pn_input_read_amqp_header(pn_io_layer_t *io_layer, const char *bytes, size_t available);
+static ssize_t pn_input_read_amqp(pn_io_layer_t *io_layer, const char *bytes, size_t available);
+static ssize_t pn_output_write_amqp_header(pn_io_layer_t *io_layer, char *bytes, size_t available);
+static ssize_t pn_output_write_amqp(pn_io_layer_t *io_layer, char *bytes, size_t available);
+static pn_timestamp_t pn_tick_amqp(pn_io_layer_t *io_layer, pn_timestamp_t now);
 
 void pn_transport_init(pn_transport_t *transport)
 {
-  transport->process_input = pn_input_read_amqp_header;
-  transport->process_output = pn_output_write_amqp_header;
-  transport->process_tick = NULL;
   transport->header_count = 0;
   transport->sasl = NULL;
   transport->ssl = NULL;
   transport->disp = pn_dispatcher(0, transport);
 
+  pn_io_layer_t *io_layer = transport->io_layers;
+  while (io_layer != &transport->io_layers[PN_IO_AMQP]) {
+    io_layer->context = NULL;
+    io_layer->next = io_layer + 1;
+    io_layer->process_input = pn_io_layer_input_passthru;
+    io_layer->process_output = pn_io_layer_output_passthru;
+    io_layer->process_tick = pn_io_layer_tick_passthru;
+    ++io_layer;
+  }
+
+  pn_io_layer_t *amqp = &transport->io_layers[PN_IO_AMQP];
+  amqp->context = transport;
+  amqp->process_input = pn_input_read_amqp_header;
+  amqp->process_output = pn_output_write_amqp_header;
+  amqp->process_tick = pn_io_layer_tick_passthru;
+  amqp->next = NULL;
+
   pn_dispatcher_action(transport->disp, OPEN, "OPEN", pn_do_open);
   pn_dispatcher_action(transport->disp, BEGIN, "BEGIN", pn_do_begin);
   pn_dispatcher_action(transport->disp, ATTACH, "ATTACH", pn_do_attach);
@@ -827,12 +837,6 @@ pn_transport_t *pn_transport()
   return transport;
 }
 
-void pn_transport_sasl_init(pn_transport_t *transport)
-{
-  transport->process_input = pn_input_read_sasl_header;
-  transport->process_output = pn_output_write_sasl_header;
-}
-
 int pn_transport_bind(pn_transport_t *transport, pn_connection_t *connection)
 {
   if (!transport) return PN_ARG_ERR;
@@ -1422,7 +1426,7 @@ int pn_do_open(pn_dispatcher_t *disp)
     transport->disp->halt = true;
   }
   if (transport->remote_idle_timeout)
-    transport->process_tick = pn_process_tick;  // enable timeouts
+    transport->io_layers[PN_IO_AMQP].process_tick = pn_tick_amqp;  // enable timeouts
   transport->open_rcvd = true;
   return 0;
 }
@@ -1812,15 +1816,12 @@ ssize_t pn_transport_input(pn_transport_
 {
   if (!transport) return PN_ARG_ERR;
 
+  pn_io_layer_t *io_layer = transport->io_layers;
   size_t consumed = 0;
 
-  const bool use_ssl = transport->ssl != NULL;
   while (true) {
     ssize_t n;
-    if (use_ssl)
-      n = pn_ssl_input( transport->ssl, bytes + consumed, available - consumed);
-    else
-      n = transport->process_input(transport, bytes + consumed, available - consumed);
+    n = io_layer->process_input( io_layer, bytes + consumed, available - consumed);
     if (n > 0) {
       consumed += n;
       if (consumed >= available) {
@@ -1844,11 +1845,9 @@ ssize_t pn_transport_input(pn_transport_
   return consumed;
 }
 
-#define SASL_HEADER ("AMQP\x03\x01\x00\x00")
-
 static ssize_t pn_input_read_header(pn_transport_t *transport, const char *bytes, size_t available,
                                     const char *header, size_t size, const char *protocol,
-                                    ssize_t (*next)(pn_transport_t *, const char *, size_t))
+                                    ssize_t (*next)(pn_io_layer_t *, const char *, size_t))
 {
   const char *point = header + transport->header_count;
   int delta = pn_min(available, size - transport->header_count);
@@ -1861,7 +1860,7 @@ static ssize_t pn_input_read_header(pn_t
     transport->header_count += delta;
     if (transport->header_count == size) {
       transport->header_count = 0;
-      transport->process_input = next;
+      transport->io_layers[PN_IO_AMQP].process_input = next;
 
       if (transport->disp->trace & PN_TRACE_FRM)
         fprintf(stderr, "    <- %s\n", protocol);
@@ -1870,33 +1869,18 @@ static ssize_t pn_input_read_header(pn_t
   }
 }
 
-static ssize_t pn_input_read_sasl_header(pn_transport_t *transport, const char *bytes, size_t available)
-{
-  return pn_input_read_header(transport, bytes, available, SASL_HEADER, 8, "SASL", pn_input_read_sasl);
-}
-
-static ssize_t pn_input_read_sasl(pn_transport_t *transport, const char *bytes, size_t available)
-{
-  pn_sasl_t *sasl = transport->sasl;
-  ssize_t n = pn_sasl_input(sasl, bytes, available);
-  if (n == PN_EOS) {
-    transport->process_input = pn_input_read_amqp_header;
-    return transport->process_input(transport, bytes, available);
-  } else {
-    return n;
-  }
-}
-
 #define AMQP_HEADER ("AMQP\x00\x01\x00\x00")
 
-static ssize_t pn_input_read_amqp_header(pn_transport_t *transport, const char *bytes, size_t available)
+static ssize_t pn_input_read_amqp_header(pn_io_layer_t *io_layer, const char *bytes, size_t available)
 {
+  pn_transport_t *transport = (pn_transport_t *)io_layer->context;
   return pn_input_read_header(transport, bytes, available, AMQP_HEADER, 8,
                               "AMQP", pn_input_read_amqp);
 }
 
-static ssize_t pn_input_read_amqp(pn_transport_t *transport, const char *bytes, size_t available)
+static ssize_t pn_input_read_amqp(pn_io_layer_t *io_layer, const char *bytes, size_t available)
 {
+  pn_transport_t *transport = (pn_transport_t *)io_layer->context;
   if (transport->close_rcvd) {
     if (available > 0) {
       pn_do_error(transport, "amqp:connection:framing-error", "data after close");
@@ -1923,9 +1907,10 @@ static ssize_t pn_input_read_amqp(pn_tra
 }
 
 /* process AMQP related timer events */
-static pn_timestamp_t pn_process_tick(pn_transport_t *transport, pn_timestamp_t now)
+static pn_timestamp_t pn_tick_amqp(pn_io_layer_t *io_layer, pn_timestamp_t now)
 {
   pn_timestamp_t timeout = 0;
+  pn_transport_t *transport = (pn_transport_t *)io_layer->context;
 
   if (transport->local_idle_timeout) {
     if (transport->dead_remote_deadline == 0 ||
@@ -2492,45 +2477,29 @@ static ssize_t pn_output_write_header(pn
                                       char *bytes, size_t size,
                                       const char *header, size_t hdrsize,
                                       const char *protocol,
-                                      ssize_t (*next)(pn_transport_t *, char *, size_t))
+                                      ssize_t (*next)(pn_io_layer_t *, char *, size_t))
 {
   if (transport->disp->trace & PN_TRACE_FRM)
     fprintf(stderr, "    -> %s\n", protocol);
   if (size >= hdrsize) {
     memmove(bytes, header, hdrsize);
-    transport->process_output = next;
+    transport->io_layers[PN_IO_AMQP].process_output = next;
     return hdrsize;
   } else {
     return pn_error_format(transport->error, PN_UNDERFLOW, "underflow writing %s header", protocol);
   }
 }
 
-static ssize_t pn_output_write_sasl_header(pn_transport_t *transport, char *bytes, size_t size)
-{
-  return pn_output_write_header(transport, bytes, size, SASL_HEADER, 8, "SASL",
-                                pn_output_write_sasl);
-}
-
-static ssize_t pn_output_write_sasl(pn_transport_t *transport, char *bytes, size_t size)
-{
-  pn_sasl_t *sasl = transport->sasl;
-  ssize_t n = pn_sasl_output(sasl, bytes, size);
-  if (n == PN_EOS) {
-    transport->process_output = pn_output_write_amqp_header;
-    return transport->process_output(transport, bytes, size);
-  } else {
-    return n;
-  }
-}
-
-static ssize_t pn_output_write_amqp_header(pn_transport_t *transport, char *bytes, size_t size)
+static ssize_t pn_output_write_amqp_header(pn_io_layer_t *io_layer, char *bytes, size_t size)
 {
+  pn_transport_t *transport = (pn_transport_t *)io_layer->context;
   return pn_output_write_header(transport, bytes, size, AMQP_HEADER, 8, "AMQP",
                                 pn_output_write_amqp);
 }
 
-static ssize_t pn_output_write_amqp(pn_transport_t *transport, char *bytes, size_t size)
+static ssize_t pn_output_write_amqp(pn_io_layer_t *io_layer, char *bytes, size_t size)
 {
+  pn_transport_t *transport = (pn_transport_t *)io_layer->context;
   if (!transport->connection) {
     return 0;
   }
@@ -2553,16 +2522,12 @@ ssize_t pn_transport_output(pn_transport
 {
   if (!transport) return PN_ARG_ERR;
 
+  pn_io_layer_t *io_layer = transport->io_layers;
   size_t total = 0;
 
-  const bool use_ssl = transport->ssl != NULL;
-
   while (size - total > 0) {
     ssize_t n;
-    if (use_ssl)
-      n = pn_ssl_output( transport->ssl, bytes + total, size - total);
-    else
-      n = transport->process_output(transport, bytes + total, size - total);
+    n = io_layer->process_output( io_layer, bytes + total, size - total);
     if (n > 0) {
       total += n;
     } else if (n == 0) {
@@ -2617,7 +2582,7 @@ pn_millis_t pn_transport_get_idle_timeou
 void pn_transport_set_idle_timeout(pn_transport_t *transport, pn_millis_t timeout)
 {
   transport->local_idle_timeout = timeout;
-  transport->process_tick = pn_process_tick;
+  transport->io_layers[PN_IO_AMQP].process_tick = pn_tick_amqp;
 }
 
 pn_millis_t pn_transport_get_remote_idle_timeout(pn_transport_t *transport)
@@ -2685,9 +2650,8 @@ void pn_link_drain(pn_link_t *receiver, 
 
 pn_timestamp_t pn_transport_tick(pn_transport_t *transport, pn_timestamp_t now)
 {
-  if (transport && transport->process_tick)
-    return transport->process_tick( transport, now );
-  return 0;
+  pn_io_layer_t *io_layer = transport->io_layers;
+  return io_layer->process_tick( io_layer, now );
 }
 
 uint64_t pn_transport_get_frames_output(const pn_transport_t *transport)
@@ -2904,3 +2868,30 @@ int pn_condition_redirect_port(pn_condit
   pn_data_rewind(data);
   return port;
 }
+
+/** Pass through input handler */
+ssize_t pn_io_layer_input_passthru(pn_io_layer_t *io_layer, const char *data, size_t available)
+{
+  pn_io_layer_t *next = io_layer->next;
+  if (next)
+    return next->process_input( next, data, available );
+  return PN_EOS;
+}
+
+/** Pass through output handler */
+ssize_t pn_io_layer_output_passthru(pn_io_layer_t *io_layer, char *bytes, size_t size)
+{
+  pn_io_layer_t *next = io_layer->next;
+  if (next)
+    return next->process_output( next, bytes, size );
+  return PN_EOS;
+}
+
+/** Pass through tick handler */
+pn_timestamp_t pn_io_layer_tick_passthru(pn_io_layer_t *io_layer, pn_timestamp_t now)
+{
+  pn_io_layer_t *next = io_layer->next;
+  if (next)
+    return next->process_tick( next, now );
+  return 0;
+}

Modified: qpid/proton/trunk/proton-c/src/sasl/sasl.c
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/src/sasl/sasl.c?rev=1439912&r1=1439911&r2=1439912&view=diff
==============================================================================
--- qpid/proton/trunk/proton-c/src/sasl/sasl.c (original)
+++ qpid/proton/trunk/proton-c/src/sasl/sasl.c Tue Jan 29 14:38:13 2013
@@ -34,6 +34,9 @@
 #define SCRATCH (1024)
 
 struct pn_sasl_t {
+  pn_transport_t *transport;
+  pn_io_layer_t *io_layer;
+  size_t header_count;
   pn_dispatcher_t *disp;
   bool client;
   bool configured;
@@ -49,6 +52,11 @@ struct pn_sasl_t {
   char scratch[SCRATCH];
 };
 
+static ssize_t pn_input_read_sasl_header(pn_io_layer_t *io_layer, const char *bytes, size_t available);
+static ssize_t pn_input_read_sasl(pn_io_layer_t *io_layer, const char *bytes, size_t available);
+static ssize_t pn_output_write_sasl_header(pn_io_layer_t *io_layer, char *bytes, size_t available);
+static ssize_t pn_output_write_sasl(pn_io_layer_t *io_layer, char *bytes, size_t available);
+
 int pn_do_init(pn_dispatcher_t *disp);
 int pn_do_mechanisms(pn_dispatcher_t *disp);
 int pn_do_challenge(pn_dispatcher_t *disp);
@@ -81,8 +89,14 @@ pn_sasl_t *pn_sasl(pn_transport_t *trans
     sasl->rcvd_done = false;
 
     transport->sasl = sasl;
+    sasl->transport = transport;
+    sasl->io_layer = &transport->io_layers[PN_IO_SASL];
+    sasl->io_layer->context = sasl;
+    sasl->io_layer->process_input = pn_input_read_sasl_header;
+    sasl->io_layer->process_output = pn_output_write_sasl_header;
+    sasl->io_layer->process_tick = pn_io_layer_tick_passthru;
 
-    pn_transport_sasl_init(transport);
+    sasl->header_count = 0;
   }
 
   return transport->sasl;
@@ -300,7 +314,7 @@ void pn_sasl_process(pn_sasl_t *sasl)
   }
 }
 
-ssize_t pn_sasl_input(pn_sasl_t *sasl, char *bytes, size_t available)
+ssize_t pn_sasl_input(pn_sasl_t *sasl, const char *bytes, size_t available)
 {
   ssize_t n = pn_dispatcher_input(sasl->disp, bytes, available);
   if (n < 0) return n;
@@ -391,3 +405,66 @@ int pn_do_outcome(pn_dispatcher_t *disp)
   disp->halt = true;
   return 0;
 }
+
+#define SASL_HEADER ("AMQP\x03\x01\x00\x00")
+#define SASL_HEADER_LEN 8
+
+static ssize_t pn_input_read_sasl_header(pn_io_layer_t *io_layer, const char *bytes, size_t available)
+{
+  pn_sasl_t *sasl = (pn_sasl_t *)io_layer->context;
+  const char *point = SASL_HEADER + sasl->header_count;
+  int delta = pn_min(available, SASL_HEADER_LEN - sasl->header_count);
+  if (!available || memcmp(bytes, point, delta)) {
+    char quoted[1024];
+    pn_quote_data(quoted, 1024, bytes, available);
+    return pn_error_format(sasl->transport->error, PN_ERR,
+                           "%s header mismatch: '%s'", "SASL", quoted);
+  } else {
+    sasl->header_count += delta;
+    if (sasl->header_count == SASL_HEADER_LEN) {
+      sasl->io_layer->process_input = pn_input_read_sasl;
+      if (sasl->disp->trace & PN_TRACE_FRM)
+        fprintf(stderr, "    <- %s\n", "SASL");
+    }
+    return delta;
+  }
+}
+
+static ssize_t pn_input_read_sasl(pn_io_layer_t *io_layer, const char *bytes, size_t available)
+{
+  pn_sasl_t *sasl = (pn_sasl_t *)io_layer->context;
+  ssize_t n = pn_sasl_input(sasl, bytes, available);
+  if (n == PN_EOS) {
+    sasl->io_layer->process_input = pn_io_layer_input_passthru;
+    pn_io_layer_t *io_next = sasl->io_layer->next;
+    return io_next->process_input( io_next, bytes, available );
+  }
+  return n;
+}
+
+static ssize_t pn_output_write_sasl_header(pn_io_layer_t *io_layer, char *bytes, size_t size)
+{
+  pn_sasl_t *sasl = (pn_sasl_t *)io_layer->context;
+  if (sasl->disp->trace & PN_TRACE_FRM)
+    fprintf(stderr, "    -> %s\n", "SASL");
+  if (size >= SASL_HEADER_LEN) {
+    memmove(bytes, SASL_HEADER, SASL_HEADER_LEN);
+    sasl->io_layer->process_output = pn_output_write_sasl;
+    return SASL_HEADER_LEN;
+  } else {
+    return pn_error_format(sasl->transport->error, PN_UNDERFLOW, "underflow writing %s header", "SASL");
+  }
+}
+
+static ssize_t pn_output_write_sasl(pn_io_layer_t *io_layer, char *bytes, size_t size)
+{
+  pn_sasl_t *sasl = (pn_sasl_t *)io_layer->context;
+  ssize_t n = pn_sasl_output(sasl, bytes, size);
+  if (n == PN_EOS) {
+    sasl->io_layer->process_output = pn_io_layer_output_passthru;
+    pn_io_layer_t *io_next = sasl->io_layer->next;
+    return io_next->process_output( io_next, bytes, size );
+  }
+  return n;
+}
+

Modified: qpid/proton/trunk/proton-c/src/ssl/openssl.c
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/src/ssl/openssl.c?rev=1439912&r1=1439911&r2=1439912&view=diff
==============================================================================
--- qpid/proton/trunk/proton-c/src/ssl/openssl.c (original)
+++ qpid/proton/trunk/proton-c/src/ssl/openssl.c Tue Jan 29 14:38:13 2013
@@ -72,6 +72,7 @@ struct pn_ssl_domain_t {
 struct pn_ssl_t {
 
   pn_transport_t   *transport;
+  pn_io_layer_t    *io_layer;
   pn_ssl_domain_t  *domain;
   const char    *session_id;
   const char *peer_hostname;
@@ -95,10 +96,6 @@ struct pn_ssl_t {
   char inbuf[APP_BUF_SIZE];
   size_t in_count;
 
-  // process cleartext i/o "above" the SSL layer
-  ssize_t (*process_input)(pn_transport_t *, const char *, size_t);
-  ssize_t (*process_output)(pn_transport_t *, char *, size_t);
-
   pn_trace_t trace;
 };
 
@@ -117,14 +114,12 @@ struct pn_ssl_session_t {
 
 /* */
 static int keyfile_pw_cb(char *buf, int size, int rwflag, void *userdata);
-static ssize_t process_input_ssl( pn_transport_t *transport, const char *input_data, size_t len);
-static ssize_t process_output_ssl( pn_transport_t *transport, char *input_data, size_t len);
-static ssize_t process_input_cleartext(pn_transport_t *transport, const char *input_data, size_t len);
-static ssize_t process_output_cleartext(pn_transport_t *transport, char *buffer, size_t max_len);
-static ssize_t process_input_unknown(pn_transport_t *transport, const char *input_data, size_t len);
-static ssize_t process_output_unknown(pn_transport_t *transport, char *input_data, size_t len);
-static ssize_t process_input_done(pn_transport_t *transport, const char *input_data, size_t len);
-static ssize_t process_output_done(pn_transport_t *transport, char *input_data, size_t len);
+static ssize_t process_input_ssl( pn_io_layer_t *io_layer, const char *input_data, size_t len);
+static ssize_t process_output_ssl( pn_io_layer_t *io_layer, char *input_data, size_t len);
+static ssize_t process_input_unknown( pn_io_layer_t *io_layer, const char *input_data, size_t len);
+static ssize_t process_output_unknown( pn_io_layer_t *io_layer, char *input_data, size_t len);
+static ssize_t process_input_done(pn_io_layer_t *io_layer, const char *input_data, size_t len);
+static ssize_t process_output_done(pn_io_layer_t *io_layer, char *input_data, size_t len);
 static connection_mode_t check_for_ssl_connection( const char *data, size_t len );
 static int init_ssl_socket( pn_ssl_t * );
 static void release_ssl_socket( pn_ssl_t * );
@@ -643,11 +638,11 @@ int pn_ssl_init( pn_ssl_t *ssl, pn_ssl_d
   ssl->domain = domain;
   domain->ref_count++;
   if (domain->allow_unsecured) {
-    ssl->process_input = process_input_unknown;
-    ssl->process_output = process_output_unknown;
+    ssl->io_layer->process_input = process_input_unknown;
+    ssl->io_layer->process_output = process_output_unknown;
   } else {
-    ssl->process_input = process_input_ssl;
-    ssl->process_output = process_output_ssl;
+    ssl->io_layer->process_input = process_input_ssl;
+    ssl->io_layer->process_output = process_output_ssl;
   }
 
   if (session_id && domain->mode == PN_SSL_MODE_CLIENT)
@@ -712,19 +707,6 @@ void pn_ssl_free( pn_ssl_t *ssl)
   free(ssl);
 }
 
-// move data received from the network into the SSL layer
-ssize_t pn_ssl_input(pn_ssl_t *ssl, const char *bytes, size_t available)
-{
-  return ssl->process_input( ssl->transport, bytes, available );
-}
-
-// pull output from the SSL layer and move into network output buffers
-ssize_t pn_ssl_output(pn_ssl_t *ssl, char *buffer, size_t max_size)
-{
-  return ssl->process_output( ssl->transport, buffer, max_size );
-}
-
-
 pn_ssl_t *pn_ssl(pn_transport_t *transport)
 {
   if (!transport) return NULL;
@@ -735,6 +717,12 @@ pn_ssl_t *pn_ssl(pn_transport_t *transpo
   ssl->transport = transport;
   transport->ssl = ssl;
 
+  ssl->io_layer = &transport->io_layers[PN_IO_SSL];
+  ssl->io_layer->context = ssl;
+  ssl->io_layer->process_input = pn_io_layer_input_passthru;
+  ssl->io_layer->process_output = pn_io_layer_output_passthru;
+  ssl->io_layer->process_tick = pn_io_layer_tick_passthru;
+
   ssl->trace = (transport->disp) ? transport->disp->trace : PN_TRACE_OFF;
 
   return ssl;
@@ -780,8 +768,8 @@ static int start_ssl_shutdown( pn_ssl_t 
 static int setup_ssl_connection( pn_ssl_t *ssl )
 {
   _log( ssl, "SSL connection detected.\n");
-  ssl->process_input = process_input_ssl;
-  ssl->process_output = process_output_ssl;
+  ssl->io_layer->process_input = process_input_ssl;
+  ssl->io_layer->process_output = process_output_ssl;
   return 0;
 }
 
@@ -790,10 +778,9 @@ static int setup_ssl_connection( pn_ssl_
 
 // take data from the network, and pass it into SSL.  Attempt to read decrypted data from
 // SSL socket and pass it to the application.
-static ssize_t process_input_ssl( pn_transport_t *transport, const char *input_data, size_t available)
+static ssize_t process_input_ssl( pn_io_layer_t *io_layer, const char *input_data, size_t available)
 {
-  pn_ssl_t *ssl = transport->ssl;
-  if (!ssl) return PN_ERR;
+  pn_ssl_t *ssl = (pn_ssl_t *)io_layer->context;
   if (ssl->ssl == NULL && init_ssl_socket(ssl)) return PN_ERR;
 
   _log( ssl, "process_input_ssl( data size=%d )\n",available );
@@ -866,7 +853,8 @@ static ssize_t process_input_ssl( pn_tra
     if (!ssl->app_input_closed) {
       char *data = ssl->inbuf;
       if (ssl->in_count > 0 || ssl->ssl_closed) {  /* if ssl_closed, send 0 count */
-        ssize_t consumed = transport->process_input(transport, data, ssl->in_count);
+        pn_io_layer_t *io_next = ssl->io_layer->next;
+        ssize_t consumed = io_next->process_input( io_next, data, ssl->in_count);
         if (consumed > 0) {
           ssl->in_count -= consumed;
           data += consumed;
@@ -906,15 +894,15 @@ static ssize_t process_input_ssl( pn_tra
   //}
   if (ssl->app_input_closed && (SSL_get_shutdown(ssl->ssl) & SSL_SENT_SHUTDOWN) ) {
     consumed = ssl->app_input_closed;
-    ssl->process_input = process_input_done;
+    ssl->io_layer->process_input = process_input_done;
   }
   _log(ssl, "process_input_ssl() returning %d\n", (int) consumed);
   return consumed;
 }
 
-static ssize_t process_output_ssl( pn_transport_t *transport, char *buffer, size_t max_len)
+static ssize_t process_output_ssl( pn_io_layer_t *io_layer, char *buffer, size_t max_len)
 {
-  pn_ssl_t *ssl = transport->ssl;
+  pn_ssl_t *ssl = (pn_ssl_t *)io_layer->context;
   if (!ssl) return PN_ERR;
   if (ssl->ssl == NULL && init_ssl_socket(ssl)) return PN_ERR;
 
@@ -926,7 +914,8 @@ static ssize_t process_output_ssl( pn_tr
     // first, get any pending application output, if possible
 
     if (!ssl->app_output_closed && ssl->out_count < APP_BUF_SIZE) {
-      ssize_t app_bytes = transport->process_output(transport, &ssl->outbuf[ssl->out_count], APP_BUF_SIZE - ssl->out_count);
+      pn_io_layer_t *io_next = ssl->io_layer->next;
+      ssize_t app_bytes = io_next->process_output( io_next, &ssl->outbuf[ssl->out_count], APP_BUF_SIZE - ssl->out_count);
       if (app_bytes > 0) {
         ssl->out_count += app_bytes;
         work_pending = true;
@@ -1018,7 +1007,7 @@ static ssize_t process_output_ssl( pn_tr
   //}
   if (written == 0 && (SSL_get_shutdown(ssl->ssl) & SSL_SENT_SHUTDOWN) && BIO_pending(ssl->bio_net_io) == 0) {
     written = ssl->app_output_closed ? ssl->app_output_closed : PN_EOS;
-    ssl->process_output = process_output_done;
+    ssl->io_layer->process_output = process_output_done;
   }
   _log(ssl, "process_output_ssl() returning %d\n", (int) written);
   return written;
@@ -1100,48 +1089,33 @@ static void release_ssl_socket( pn_ssl_t
 }
 
 
-//////// CLEARTEXT CONNECTIONS
-
-static ssize_t process_input_cleartext(pn_transport_t *transport, const char *input_data, size_t len)
-{
-  // just write directly to layer "above" SSL
-  return transport->process_input( transport, input_data, len );
-}
-
-static ssize_t process_output_cleartext(pn_transport_t *transport, char *buffer, size_t max_len)
-{
-  // just read directly from the layer "above" SSL
-  return transport->process_output( transport, buffer, max_len );
-}
-
-
-
 static int setup_cleartext_connection( pn_ssl_t *ssl )
 {
   _log( ssl, "Cleartext connection detected.\n");
-  ssl->process_input = process_input_cleartext;
-  ssl->process_output = process_output_cleartext;
+  ssl->io_layer->process_input = pn_io_layer_input_passthru;
+  ssl->io_layer->process_output = pn_io_layer_output_passthru;
   return 0;
 }
 
 
 // until we determine if the client is using SSL or not:
 
-static ssize_t process_input_unknown(pn_transport_t *transport, const char *input_data, size_t len)
+static ssize_t process_input_unknown(pn_io_layer_t *io_layer, const char *input_data, size_t len)
 {
+  pn_ssl_t *ssl = (pn_ssl_t *)io_layer->context;
   switch (check_for_ssl_connection( input_data, len )) {
   case SSL_CONNECTION:
-    setup_ssl_connection( transport->ssl );
-    return transport->ssl->process_input( transport, input_data, len );
+    setup_ssl_connection( ssl );
+    return ssl->io_layer->process_input( ssl->io_layer, input_data, len );
   case CLEAR_CONNECTION:
-    setup_cleartext_connection( transport->ssl );
-    return transport->ssl->process_input( transport, input_data, len );
+    setup_cleartext_connection( ssl );
+    return ssl->io_layer->process_input( ssl->io_layer, input_data, len );
   default:
     return 0;
   }
 }
 
-static ssize_t process_output_unknown(pn_transport_t *transport, char *input_data, size_t len)
+static ssize_t process_output_unknown(pn_io_layer_t *io_layer, char *input_data, size_t len)
 {
   // do not do output until we know if SSL is used or not
   return 0;
@@ -1253,11 +1227,11 @@ int pn_ssl_get_peer_hostname( pn_ssl_t *
   return 0;
 }
 
-static ssize_t process_input_done(pn_transport_t *transport, const char *input_data, size_t len)
+static ssize_t process_input_done(pn_io_layer_t *io_layer, const char *input_data, size_t len)
 {
   return PN_EOS;
 }
-static ssize_t process_output_done(pn_transport_t *transport, char *input_data, size_t len)
+static ssize_t process_output_done(pn_io_layer_t *io_layer, char *input_data, size_t len)
 {
   return PN_EOS;
 }

Modified: qpid/proton/trunk/proton-c/src/ssl/ssl-internal.h
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/src/ssl/ssl-internal.h?rev=1439912&r1=1439911&r2=1439912&view=diff
==============================================================================
--- qpid/proton/trunk/proton-c/src/ssl/ssl-internal.h (original)
+++ qpid/proton/trunk/proton-c/src/ssl/ssl-internal.h Tue Jan 29 14:38:13 2013
@@ -33,13 +33,6 @@
 // release the SSL context
 void pn_ssl_free( pn_ssl_t *ssl);
 
-// move data received from the network into the SSL layer
-ssize_t pn_ssl_input(pn_ssl_t *ssl, const char *bytes, size_t available);
-
-// copy data generated by SSL into the network's output buffers
-ssize_t pn_ssl_output(pn_ssl_t *ssl, char *buffer, size_t max_size);
-
-
 void pn_ssl_trace(pn_ssl_t *ssl, pn_trace_t trace);
 
 #endif /* ssl-internal.h */



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org