You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by as...@apache.org on 2014/12/11 05:12:18 UTC

[5/6] qpid-proton git commit: PROTON-770: Change dispatcher interface (more parameters for frame handlers) - Use transport, frametype and channel - Pass more of the frame directly into the frame handlers

PROTON-770: Change dispatcher interface (more parameters for frame handlers)
- Use transport, frametype and channel
- Pass more of the frame directly into the frame handlers


Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/fc9b88ea
Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/fc9b88ea
Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/fc9b88ea

Branch: refs/heads/master
Commit: fc9b88eada8c685fb2a003570bf903a9360f48d2
Parents: e093b8c
Author: Andrew Stitcher <as...@apache.org>
Authored: Wed Nov 26 13:48:31 2014 -0500
Committer: Andrew Stitcher <as...@apache.org>
Committed: Wed Dec 10 16:50:01 2014 -0500

----------------------------------------------------------------------
 proton-c/src/dispatch_actions.h      | 30 ++++++-------
 proton-c/src/dispatcher/dispatcher.c | 49 +++++++-------------
 proton-c/src/dispatcher/dispatcher.h |  6 +--
 proton-c/src/sasl/sasl.c             | 32 ++++++-------
 proton-c/src/transport/transport.c   | 75 ++++++++++++++-----------------
 5 files changed, 82 insertions(+), 110 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/fc9b88ea/proton-c/src/dispatch_actions.h
----------------------------------------------------------------------
diff --git a/proton-c/src/dispatch_actions.h b/proton-c/src/dispatch_actions.h
index aa7a8f4..c3a8aab 100644
--- a/proton-c/src/dispatch_actions.h
+++ b/proton-c/src/dispatch_actions.h
@@ -24,22 +24,22 @@
 
 #include "dispatcher/dispatcher.h"
 
-/* Transport actions */
-int pn_do_open(pn_dispatcher_t *disp);
-int pn_do_begin(pn_dispatcher_t *disp);
-int pn_do_attach(pn_dispatcher_t *disp);
-int pn_do_transfer(pn_dispatcher_t *disp);
-int pn_do_flow(pn_dispatcher_t *disp);
-int pn_do_disposition(pn_dispatcher_t *disp);
-int pn_do_detach(pn_dispatcher_t *disp);
-int pn_do_end(pn_dispatcher_t *disp);
-int pn_do_close(pn_dispatcher_t *disp);
+/* AMQP actions */
+int pn_do_open(pn_transport_t *transport, uint8_t frame_type, uint16_t channel, pn_data_t *args, const pn_bytes_t *payload);
+int pn_do_begin(pn_transport_t *transport, uint8_t frame_type, uint16_t channel, pn_data_t *args, const pn_bytes_t *payload);
+int pn_do_attach(pn_transport_t *transport, uint8_t frame_type, uint16_t channel, pn_data_t *args, const pn_bytes_t *payload);
+int pn_do_transfer(pn_transport_t *transport, uint8_t frame_type, uint16_t channel, pn_data_t *args, const pn_bytes_t *payload);
+int pn_do_flow(pn_transport_t *transport, uint8_t frame_type, uint16_t channel, pn_data_t *args, const pn_bytes_t *payload);
+int pn_do_disposition(pn_transport_t *transport, uint8_t frame_type, uint16_t channel, pn_data_t *args, const pn_bytes_t *payload);
+int pn_do_detach(pn_transport_t *transport, uint8_t frame_type, uint16_t channel, pn_data_t *args, const pn_bytes_t *payload);
+int pn_do_end(pn_transport_t *transport, uint8_t frame_type, uint16_t channel, pn_data_t *args, const pn_bytes_t *payload);
+int pn_do_close(pn_transport_t *transport, uint8_t frame_type, uint16_t channel, pn_data_t *args, const pn_bytes_t *payload);
 
 /* SASL actions */
-int pn_do_init(pn_dispatcher_t *disp);
-int pn_do_mechanisms(pn_dispatcher_t *disp);
-int pn_do_challenge(pn_dispatcher_t *disp);
-int pn_do_response(pn_dispatcher_t *disp);
-int pn_do_outcome(pn_dispatcher_t *disp);
+int pn_do_init(pn_transport_t *transport, uint8_t frame_type, uint16_t channel, pn_data_t *args, const pn_bytes_t *payload);
+int pn_do_mechanisms(pn_transport_t *transport, uint8_t frame_type, uint16_t channel, pn_data_t *args, const pn_bytes_t *payload);
+int pn_do_challenge(pn_transport_t *transport, uint8_t frame_type, uint16_t channel, pn_data_t *args, const pn_bytes_t *payload);
+int pn_do_response(pn_transport_t *transport, uint8_t frame_type, uint16_t channel, pn_data_t *args, const pn_bytes_t *payload);
+int pn_do_outcome(pn_transport_t *transport, uint8_t frame_type, uint16_t channel, pn_data_t *args, const pn_bytes_t *payload);
 
 #endif // _PROTON_DISPATCH_ACTIONS_H

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/fc9b88ea/proton-c/src/dispatcher/dispatcher.c
----------------------------------------------------------------------
diff --git a/proton-c/src/dispatcher/dispatcher.c b/proton-c/src/dispatcher/dispatcher.c
index 08c5ef1..ae04706 100644
--- a/proton-c/src/dispatcher/dispatcher.c
+++ b/proton-c/src/dispatcher/dispatcher.c
@@ -28,14 +28,14 @@
 
 #include "dispatch_actions.h"
 
-int pni_bad_frame(pn_dispatcher_t* disp) {
-  pn_transport_log(disp->transport, "Error dispatching frame: Unknown performative");
+int pni_bad_frame(pn_transport_t *transport, uint8_t frame_type, uint16_t channel, pn_data_t *args, const pn_bytes_t *payload) {
+  pn_transport_logf(transport, "Error dispatching frame: type: %d: Unknown performative", frame_type);
   return PN_ERR;
 }
 
 // We could use a table based approach here if we needed to dynamically
 // add new performatives
-static inline int pni_dispatch_action(pn_dispatcher_t* disp, uint64_t lcode)
+static inline int pni_dispatch_action(pn_transport_t* transport, uint64_t lcode, uint8_t frame_type, uint16_t channel, pn_data_t *args, const pn_bytes_t *payload)
 {
   pn_action_t *action;
   switch (lcode) {
@@ -58,7 +58,7 @@ static inline int pni_dispatch_action(pn_dispatcher_t* disp, uint64_t lcode)
   case SASL_OUTCOME:    action = pn_do_outcome; break;
   default:              action = pni_bad_frame; break;
   };
-  return action(disp);
+  return action(transport, frame_type, channel, args, payload);
 }
 
 pn_dispatcher_t *pn_dispatcher(uint8_t frame_type, pn_transport_t *transport)
@@ -68,10 +68,7 @@ pn_dispatcher_t *pn_dispatcher(uint8_t frame_type, pn_transport_t *transport)
   disp->frame_type = frame_type;
   disp->transport = transport;
 
-  disp->channel = 0;
   disp->args = pn_data(16);
-  disp->payload = NULL;
-  disp->size = 0;
 
   disp->output_args = pn_data(16);
   disp->frame = pn_buffer( 4*1024 );
@@ -124,7 +121,7 @@ static void pn_do_trace(pn_dispatcher_t *disp, uint16_t ch, pn_dir_t dir,
   }
 }
 
-int pn_dispatch_frame(pn_dispatcher_t *disp, pn_frame_t frame)
+static int pni_dispatch_frame(pn_dispatcher_t *disp, pn_data_t *args, pn_frame_t frame)
 {
   if (frame.size == 0) { // ignore null frames
     if (disp->transport->trace & PN_TRACE_FRM)
@@ -132,22 +129,23 @@ int pn_dispatch_frame(pn_dispatcher_t *disp, pn_frame_t frame)
     return 0;
   }
 
-  ssize_t dsize = pn_data_decode(disp->args, frame.payload, frame.size);
+  ssize_t dsize = pn_data_decode(args, frame.payload, frame.size);
   if (dsize < 0) {
     pn_string_format(disp->scratch,
                      "Error decoding frame: %s %s\n", pn_code(dsize),
-                     pn_error_text(pn_data_error(disp->args)));
+                     pn_error_text(pn_data_error(args)));
     pn_quote(disp->scratch, frame.payload, frame.size);
     pn_transport_log(disp->transport, pn_string_get(disp->scratch));
     return dsize;
   }
 
-  disp->channel = frame.channel;
+  uint8_t frame_type = frame.type;
+  uint16_t channel = frame.channel;
   // XXX: assuming numeric -
   // if we get a symbol we should map it to the numeric value and dispatch on that
   uint64_t lcode;
   bool scanned;
-  int e = pn_data_scan(disp->args, "D?L.", &scanned, &lcode);
+  int e = pn_data_scan(args, "D?L.", &scanned, &lcode);
   if (e) {
     pn_transport_log(disp->transport, "Scan error");
     return e;
@@ -156,18 +154,15 @@ int pn_dispatch_frame(pn_dispatcher_t *disp, pn_frame_t frame)
     pn_transport_log(disp->transport, "Error dispatching frame");
     return PN_ERR;
   }
-  disp->size = frame.size - dsize;
-  if (disp->size)
-    disp->payload = frame.payload + dsize;
+  size_t payload_size = frame.size - dsize;
+  const char *payload_mem = payload_size ? frame.payload + dsize : NULL;
+  pn_bytes_t payload = {payload_size, payload_mem};
 
-  pn_do_trace(disp, disp->channel, IN, disp->args, disp->payload, disp->size);
+  pn_do_trace(disp, channel, IN, args, payload_mem, payload_size);
 
-  int err = pni_dispatch_action(disp, lcode);
+  int err = pni_dispatch_action(disp->transport, lcode, frame_type, channel, args, &payload);
 
-  disp->channel = 0;
-  pn_data_clear(disp->args);
-  disp->size = 0;
-  disp->payload = NULL;
+  pn_data_clear(args);
 
   return err;
 }
@@ -184,7 +179,7 @@ ssize_t pn_dispatcher_input(pn_dispatcher_t *disp, const char *bytes, size_t ava
       read += n;
       available -= n;
       disp->input_frames_ct += 1;
-      int e = pn_dispatch_frame(disp, frame);
+      int e = pni_dispatch_frame(disp, disp->args, frame);
       if (e) return e;
     } else {
       break;
@@ -196,16 +191,6 @@ ssize_t pn_dispatcher_input(pn_dispatcher_t *disp, const char *bytes, size_t ava
   return read;
 }
 
-int pn_scan_args(pn_dispatcher_t *disp, const char *fmt, ...)
-{
-  va_list ap;
-  va_start(ap, fmt);
-  int err = pn_data_vscan(disp->args, fmt, ap);
-  va_end(ap);
-  if (err) printf("scan error: %s\n", fmt);
-  return err;
-}
-
 void pn_set_payload(pn_dispatcher_t *disp, const char *data, size_t size)
 {
   disp->output_payload = data;

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/fc9b88ea/proton-c/src/dispatcher/dispatcher.h
----------------------------------------------------------------------
diff --git a/proton-c/src/dispatcher/dispatcher.h b/proton-c/src/dispatcher/dispatcher.h
index c578282..b9bfa2b 100644
--- a/proton-c/src/dispatcher/dispatcher.h
+++ b/proton-c/src/dispatcher/dispatcher.h
@@ -32,12 +32,10 @@
 
 typedef struct pn_dispatcher_t pn_dispatcher_t;
 
-typedef int (pn_action_t)(pn_dispatcher_t *disp);
+typedef int (pn_action_t)(pn_transport_t *transport, uint8_t frame_type, uint16_t channel, pn_data_t *args, const pn_bytes_t *payload);
 
 struct pn_dispatcher_t {
   pn_data_t *args;
-  const char *payload;
-  size_t size;
   pn_data_t *output_args;
   const char *output_payload;
   size_t output_size;
@@ -50,7 +48,6 @@ struct pn_dispatcher_t {
   uint64_t output_frames_ct;
   uint64_t input_frames_ct;
   pn_string_t *scratch;
-  uint16_t channel;
   uint8_t frame_type; // Used when constructing outgoing frames
   bool halt;
   bool batch;
@@ -58,7 +55,6 @@ struct pn_dispatcher_t {
 
 pn_dispatcher_t *pn_dispatcher(uint8_t frame_type, pn_transport_t *transport);
 void pn_dispatcher_free(pn_dispatcher_t *disp);
-int pn_scan_args(pn_dispatcher_t *disp, const char *fmt, ...);
 void pn_set_payload(pn_dispatcher_t *disp, const char *data, size_t size);
 int pn_post_frame(pn_dispatcher_t *disp, uint16_t ch, const char *fmt, ...);
 ssize_t pn_dispatcher_input(pn_dispatcher_t *disp, const char *bytes, size_t available);

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/fc9b88ea/proton-c/src/sasl/sasl.c
----------------------------------------------------------------------
diff --git a/proton-c/src/sasl/sasl.c b/proton-c/src/sasl/sasl.c
index 1d27002..5a174e2 100644
--- a/proton-c/src/sasl/sasl.c
+++ b/proton-c/src/sasl/sasl.c
@@ -400,12 +400,12 @@ ssize_t pn_sasl_output(pn_transport_t *transport, char *bytes, size_t size)
   }
 }
 
-int pn_do_init(pn_dispatcher_t *disp)
+int pn_do_init(pn_transport_t *transport, uint8_t frame_type, uint16_t channel, pn_data_t *args, const pn_bytes_t *payload)
 {
-  pni_sasl_t *sasl = disp->transport->sasl;
+  pni_sasl_t *sasl = transport->sasl;
   pn_bytes_t mech;
   pn_bytes_t recv;
-  int err = pn_scan_args(disp, "D.[sz]", &mech, &recv);
+  int err = pn_data_scan(args, "D.[sz]", &mech, &recv);
   if (err) return err;
   sasl->remote_mechanisms = pn_strndup(mech.start, mech.size);
   pn_buffer_append(sasl->recv_data, recv.start, recv.size);
@@ -413,43 +413,43 @@ int pn_do_init(pn_dispatcher_t *disp)
   return 0;
 }
 
-int pn_do_mechanisms(pn_dispatcher_t *disp)
+int pn_do_mechanisms(pn_transport_t *transport, uint8_t frame_type, uint16_t channel, pn_data_t *args, const pn_bytes_t *payload)
 {
-  pni_sasl_t *sasl = disp->transport->sasl;
+  pni_sasl_t *sasl = transport->sasl;
   sasl->rcvd_init = true;
   return 0;
 }
 
-int pn_do_recv(pn_dispatcher_t *disp)
+int pn_do_recv(pn_transport_t *transport, uint8_t frame_type, uint16_t channel, pn_data_t *args, const pn_bytes_t *payload)
 {
-  pni_sasl_t *sasl = disp->transport->sasl;
+  pni_sasl_t *sasl = transport->sasl;
   pn_bytes_t recv;
-  int err = pn_scan_args(disp, "D.[z]", &recv);
+  int err = pn_data_scan(args, "D.[z]", &recv);
   if (err) return err;
   pn_buffer_append(sasl->recv_data, recv.start, recv.size);
   return 0;
 }
 
-int pn_do_challenge(pn_dispatcher_t *disp)
+int pn_do_challenge(pn_transport_t *transport, uint8_t frame_type, uint16_t channel, pn_data_t *args, const pn_bytes_t *payload)
 {
-  return pn_do_recv(disp);
+  return pn_do_recv(transport, frame_type, channel, args, payload);
 }
 
-int pn_do_response(pn_dispatcher_t *disp)
+int pn_do_response(pn_transport_t *transport, uint8_t frame_type, uint16_t channel, pn_data_t *args, const pn_bytes_t *payload)
 {
-  return pn_do_recv(disp);
+  return pn_do_recv(transport, frame_type, channel, args, payload);
 }
 
-int pn_do_outcome(pn_dispatcher_t *disp)
+int pn_do_outcome(pn_transport_t *transport, uint8_t frame_type, uint16_t channel, pn_data_t *args, const pn_bytes_t *payload)
 {
-  pni_sasl_t *sasl = disp->transport->sasl;
+  pni_sasl_t *sasl = transport->sasl;
   uint8_t outcome;
-  int err = pn_scan_args(disp, "D.[B]", &outcome);
+  int err = pn_data_scan(args, "D.[B]", &outcome);
   if (err) return err;
   sasl->outcome = (pn_sasl_outcome_t) outcome;
   sasl->rcvd_done = true;
   sasl->sent_done = true;
-  disp->halt = true;
+  sasl->disp->halt = true;
   return 0;
 }
 

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/fc9b88ea/proton-c/src/transport/transport.c
----------------------------------------------------------------------
diff --git a/proton-c/src/transport/transport.c b/proton-c/src/transport/transport.c
index bb59d84..097d863 100644
--- a/proton-c/src/transport/transport.c
+++ b/proton-c/src/transport/transport.c
@@ -699,16 +699,15 @@ static char *pn_bytes_strdup(pn_bytes_t str)
   return pn_strndup(str.start, str.size);
 }
 
-int pn_do_open(pn_dispatcher_t *disp)
+int pn_do_open(pn_transport_t *transport, uint8_t frame_type, uint16_t channel, pn_data_t *args, const pn_bytes_t *payload)
 {
-  pn_transport_t *transport = disp->transport;
   pn_connection_t *conn = transport->connection;
   bool container_q, hostname_q;
   pn_bytes_t remote_container, remote_hostname;
   pn_data_clear(transport->remote_offered_capabilities);
   pn_data_clear(transport->remote_desired_capabilities);
   pn_data_clear(transport->remote_properties);
-  int err = pn_scan_args(disp, "D.[?S?SIHI..CCC]", &container_q,
+  int err = pn_data_scan(args, "D.[?S?SIHI..CCC]", &container_q,
                          &remote_container, &hostname_q, &remote_hostname,
                          &transport->remote_max_frame,
                          &transport->remote_channel_max,
@@ -723,8 +722,8 @@ int pn_do_open(pn_dispatcher_t *disp)
                         transport->remote_max_frame, AMQP_MIN_MAX_FRAME_SIZE);
       transport->remote_max_frame = AMQP_MIN_MAX_FRAME_SIZE;
     }
-    disp->remote_max_frame = transport->remote_max_frame;
-    pn_buffer_clear( disp->frame );
+    transport->disp->remote_max_frame = transport->remote_max_frame;
+    pn_buffer_clear( transport->disp->frame );
   }
   if (container_q) {
     transport->remote_container = pn_bytes_strdup(remote_container);
@@ -747,13 +746,12 @@ int pn_do_open(pn_dispatcher_t *disp)
   return 0;
 }
 
-int pn_do_begin(pn_dispatcher_t *disp)
+int pn_do_begin(pn_transport_t *transport, uint8_t frame_type, uint16_t channel, pn_data_t *args, const pn_bytes_t *payload)
 {
-  pn_transport_t *transport = disp->transport;
   bool reply;
   uint16_t remote_channel;
   pn_sequence_t next;
-  int err = pn_scan_args(disp, "D.[?HI]", &reply, &remote_channel, &next);
+  int err = pn_data_scan(args, "D.[?HI]", &reply, &remote_channel, &next);
   if (err) return err;
 
   pn_session_t *ssn;
@@ -764,7 +762,7 @@ int pn_do_begin(pn_dispatcher_t *disp)
     ssn = pn_session(transport->connection);
   }
   ssn->state.incoming_transfer_count = next;
-  pni_map_remote_channel(ssn, disp->channel);
+  pni_map_remote_channel(ssn, channel);
   PN_SET_REMOTE(ssn->endpoint.state, PN_REMOTE_ACTIVE);
   pn_collector_put(transport->connection->collector, PN_OBJECT, ssn, PN_SESSION_REMOTE_OPEN);
   return 0;
@@ -835,9 +833,8 @@ int pn_terminus_set_address_bytes(pn_terminus_t *terminus, pn_bytes_t address)
   return pn_string_setn(terminus->address, address.start, address.size);
 }
 
-int pn_do_attach(pn_dispatcher_t *disp)
+int pn_do_attach(pn_transport_t *transport, uint8_t frame_type, uint16_t channel, pn_data_t *args, const pn_bytes_t *payload)
 {
-  pn_transport_t *transport = disp->transport;
   pn_bytes_t name;
   uint32_t handle;
   bool is_sender;
@@ -850,7 +847,7 @@ int pn_do_attach(pn_dispatcher_t *disp)
   pn_bytes_t dist_mode;
   bool snd_settle, rcv_settle;
   uint8_t snd_settle_mode, rcv_settle_mode;
-  int err = pn_scan_args(disp, "D.[SIo?B?BD.[SIsIo.s]D.[SIsIo]..I]", &name, &handle,
+  int err = pn_data_scan(args, "D.[SIo?B?BD.[SIsIo.s]D.[SIsIo]..I]", &name, &handle,
                          &is_sender,
                          &snd_settle, &snd_settle_mode,
                          &rcv_settle, &rcv_settle_mode,
@@ -864,7 +861,7 @@ int pn_do_attach(pn_dispatcher_t *disp)
   strncpy(strname, name.start, name.size);
   strname[name.size] = '\0';
 
-  pn_session_t *ssn = pn_channel_state(transport, disp->channel);
+  pn_session_t *ssn = pn_channel_state(transport, channel);
   if (!ssn) {
       pn_do_error(transport, "amqp:connection:no-session", "attach without a session");
       return PN_EOS;
@@ -907,7 +904,7 @@ int pn_do_attach(pn_dispatcher_t *disp)
   } else {
     uint64_t code = 0;
     pn_data_clear(link->remote_target.capabilities);
-    err = pn_scan_args(disp, "D.[.....D..DL[C]...]", &code,
+    err = pn_data_scan(args, "D.[.....D..DL[C]...]", &code,
                        link->remote_target.capabilities);
     if (code == COORDINATOR) {
       pn_terminus_set_type(rtgt, PN_COORDINATOR);
@@ -928,7 +925,7 @@ int pn_do_attach(pn_dispatcher_t *disp)
   pn_data_clear(link->remote_target.properties);
   pn_data_clear(link->remote_target.capabilities);
 
-  err = pn_scan_args(disp, "D.[.....D.[.....C.C.CC]D.[.....CC]",
+  err = pn_data_scan(args, "D.[.....D.[.....C.C.CC]D.[.....CC]",
                      link->remote_source.properties,
                      link->remote_source.filter,
                      link->remote_source.outcomes,
@@ -961,20 +958,19 @@ static void pn_full_settle(pn_delivery_map_t *db, pn_delivery_t *delivery)
   pn_clear_tpwork(delivery);
 }
 
-int pn_do_transfer(pn_dispatcher_t *disp)
+int pn_do_transfer(pn_transport_t *transport, uint8_t frame_type, uint16_t channel, pn_data_t *args, const pn_bytes_t *payload)
 {
   // XXX: multi transfer
-  pn_transport_t *transport = disp->transport;
   uint32_t handle;
   pn_bytes_t tag;
   bool id_present;
   pn_sequence_t id;
   bool settled;
   bool more;
-  int err = pn_scan_args(disp, "D.[I?Iz.oo]", &handle, &id_present, &id, &tag,
+  int err = pn_data_scan(args, "D.[I?Iz.oo]", &handle, &id_present, &id, &tag,
                          &settled, &more);
   if (err) return err;
-  pn_session_t *ssn = pn_channel_state(transport, disp->channel);
+  pn_session_t *ssn = pn_channel_state(transport, channel);
 
   if (!ssn->state.incoming_window) {
     return pn_do_error(transport, "amqp:session:window-violation", "incoming session window exceeded");
@@ -1013,8 +1009,8 @@ int pn_do_transfer(pn_dispatcher_t *disp)
     }
   }
 
-  pn_buffer_append(delivery->bytes, disp->payload, disp->size);
-  ssn->incoming_bytes += disp->size;
+  pn_buffer_append(delivery->bytes, payload->start, payload->size);
+  ssn->incoming_bytes += payload->size;
   delivery->done = !more;
 
   ssn->state.incoming_transfer_count++;
@@ -1029,19 +1025,18 @@ int pn_do_transfer(pn_dispatcher_t *disp)
   return 0;
 }
 
-int pn_do_flow(pn_dispatcher_t *disp)
+int pn_do_flow(pn_transport_t *transport, uint8_t frame_type, uint16_t channel, pn_data_t *args, const pn_bytes_t *payload)
 {
-  pn_transport_t *transport = disp->transport;
   pn_sequence_t onext, inext, delivery_count;
   uint32_t iwin, owin, link_credit;
   uint32_t handle;
   bool inext_init, handle_init, dcount_init, drain;
-  int err = pn_scan_args(disp, "D.[?IIII?I?II.o]", &inext_init, &inext, &iwin,
+  int err = pn_data_scan(args, "D.[?IIII?I?II.o]", &inext_init, &inext, &iwin,
                          &onext, &owin, &handle_init, &handle, &dcount_init,
                          &delivery_count, &link_credit, &drain);
   if (err) return err;
 
-  pn_session_t *ssn = pn_channel_state(transport, disp->channel);
+  pn_session_t *ssn = pn_channel_state(transport, channel);
 
   if (inext_init) {
     ssn->state.remote_incoming_window = inext + iwin - ssn->state.outgoing_transfer_count;
@@ -1098,21 +1093,20 @@ static int pn_scan_error(pn_data_t *data, pn_condition_t *condition, const char
   return 0;
 }
 
-int pn_do_disposition(pn_dispatcher_t *disp)
+int pn_do_disposition(pn_transport_t *transport, uint8_t frame_type, uint16_t channel, pn_data_t *args, const pn_bytes_t *payload)
 {
-  pn_transport_t *transport = disp->transport;
   bool role;
   pn_sequence_t first, last;
   uint64_t type = 0;
   bool last_init, settled, type_init;
   pn_data_clear(transport->disp_data);
-  int err = pn_scan_args(disp, "D.[oI?IoD?LC]", &role, &first, &last_init,
+  int err = pn_data_scan(args, "D.[oI?IoD?LC]", &role, &first, &last_init,
                          &last, &settled, &type_init, &type,
                          transport->disp_data);
   if (err) return err;
   if (!last_init) last = first;
 
-  pn_session_t *ssn = pn_channel_state(transport, disp->channel);
+  pn_session_t *ssn = pn_channel_state(transport, channel);
   pn_delivery_map_t *deliveries;
   if (role) {
     deliveries = &ssn->state.outgoing;
@@ -1177,24 +1171,23 @@ int pn_do_disposition(pn_dispatcher_t *disp)
   return 0;
 }
 
-int pn_do_detach(pn_dispatcher_t *disp)
+int pn_do_detach(pn_transport_t *transport, uint8_t frame_type, uint16_t channel, pn_data_t *args, const pn_bytes_t *payload)
 {
-  pn_transport_t *transport = disp->transport;
   uint32_t handle;
   bool closed;
-  int err = pn_scan_args(disp, "D.[Io]", &handle, &closed);
+  int err = pn_data_scan(args, "D.[Io]", &handle, &closed);
   if (err) return err;
 
-  pn_session_t *ssn = pn_channel_state(transport, disp->channel);
+  pn_session_t *ssn = pn_channel_state(transport, channel);
   if (!ssn) {
-    return pn_do_error(transport, "amqp:invalid-field", "no such channel: %u", disp->channel);
+    return pn_do_error(transport, "amqp:invalid-field", "no such channel: %u", channel);
   }
   pn_link_t *link = pn_handle_state(ssn, handle);
   if (!link) {
     return pn_do_error(transport, "amqp:invalid-field", "no such handle: %u", handle);
   }
 
-  err = pn_scan_error(disp->args, &link->endpoint.remote_condition, SCAN_ERROR_DETACH);
+  err = pn_scan_error(args, &link->endpoint.remote_condition, SCAN_ERROR_DETACH);
   if (err) return err;
 
   if (closed)
@@ -1209,11 +1202,10 @@ int pn_do_detach(pn_dispatcher_t *disp)
   return 0;
 }
 
-int pn_do_end(pn_dispatcher_t *disp)
+int pn_do_end(pn_transport_t *transport, uint8_t frame_type, uint16_t channel, pn_data_t *args, const pn_bytes_t *payload)
 {
-  pn_transport_t *transport = disp->transport;
-  pn_session_t *ssn = pn_channel_state(transport, disp->channel);
-  int err = pn_scan_error(disp->args, &ssn->endpoint.remote_condition, SCAN_ERROR_DEFAULT);
+  pn_session_t *ssn = pn_channel_state(transport, channel);
+  int err = pn_scan_error(args, &ssn->endpoint.remote_condition, SCAN_ERROR_DEFAULT);
   if (err) return err;
   PN_SET_REMOTE(ssn->endpoint.state, PN_REMOTE_CLOSED);
   pn_collector_put(transport->connection->collector, PN_OBJECT, ssn, PN_SESSION_REMOTE_CLOSE);
@@ -1221,11 +1213,10 @@ int pn_do_end(pn_dispatcher_t *disp)
   return 0;
 }
 
-int pn_do_close(pn_dispatcher_t *disp)
+int pn_do_close(pn_transport_t *transport, uint8_t frame_type, uint16_t channel, pn_data_t *args, const pn_bytes_t *payload)
 {
-  pn_transport_t *transport = disp->transport;
   pn_connection_t *conn = transport->connection;
-  int err = pn_scan_error(disp->args, &transport->remote_condition, SCAN_ERROR_DEFAULT);
+  int err = pn_scan_error(args, &transport->remote_condition, SCAN_ERROR_DEFAULT);
   if (err) return err;
   transport->close_rcvd = true;
   PN_SET_REMOTE(conn->endpoint.state, PN_REMOTE_CLOSED);


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org