You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by as...@apache.org on 2016/11/14 18:28:36 UTC
[15/20] qpid-proton git commit: PROTON-1350 PROTON-1351: Introduce
proton-c core library - Created new core proton library qpid-proton-core
which only contains protocol processsing and no IO. - Rearranged source tree
to separate core protocol code and
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/a5850716/proton-c/src/core/dispatcher.c
----------------------------------------------------------------------
diff --git a/proton-c/src/core/dispatcher.c b/proton-c/src/core/dispatcher.c
new file mode 100644
index 0000000..36f8cc9
--- /dev/null
+++ b/proton-c/src/core/dispatcher.c
@@ -0,0 +1,158 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "dispatcher.h"
+
+#include "framing.h"
+#include "protocol.h"
+#include "engine-internal.h"
+
+#include "dispatch_actions.h"
+
+int pni_bad_frame(pn_transport_t *transport, uint8_t frame_type, uint16_t channel, pn_data_t *args, const pn_bytes_t *payload) {
+ pn_transport_logf(transport, "Error dispatching frame: type: %d: Unknown performative", frame_type);
+ return PN_ERR;
+}
+
+int pni_bad_frame_type(pn_transport_t *transport, uint8_t frame_type, uint16_t channel, pn_data_t *args, const pn_bytes_t *payload) {
+ pn_transport_logf(transport, "Error dispatching frame: Unknown frame type: %d", frame_type);
+ return PN_ERR;
+}
+
+// We could use a table based approach here if we needed to dynamically
+// add new performatives
+static inline int pni_dispatch_action(pn_transport_t* transport, uint64_t lcode, uint8_t frame_type, uint16_t channel, pn_data_t *args, const pn_bytes_t *payload)
+{
+ pn_action_t *action;
+ switch (frame_type) {
+ case AMQP_FRAME_TYPE:
+ /* Regular AMQP fames */
+ switch (lcode) {
+ case OPEN: action = pn_do_open; break;
+ case BEGIN: action = pn_do_begin; break;
+ case ATTACH: action = pn_do_attach; break;
+ case FLOW: action = pn_do_flow; break;
+ case TRANSFER: action = pn_do_transfer; break;
+ case DISPOSITION: action = pn_do_disposition; break;
+ case DETACH: action = pn_do_detach; break;
+ case END: action = pn_do_end; break;
+ case CLOSE: action = pn_do_close; break;
+ default: action = pni_bad_frame; break;
+ };
+ break;
+ case SASL_FRAME_TYPE:
+ /* SASL frames */
+ switch (lcode) {
+ case SASL_MECHANISMS: action = pn_do_mechanisms; break;
+ case SASL_INIT: action = pn_do_init; break;
+ case SASL_CHALLENGE: action = pn_do_challenge; break;
+ case SASL_RESPONSE: action = pn_do_response; break;
+ case SASL_OUTCOME: action = pn_do_outcome; break;
+ default: action = pni_bad_frame; break;
+ };
+ break;
+ default: action = pni_bad_frame_type; break;
+ };
+ return action(transport, frame_type, channel, args, payload);
+}
+
+static int pni_dispatch_frame(pn_transport_t * transport, pn_data_t *args, pn_frame_t frame)
+{
+ if (frame.size == 0) { // ignore null frames
+ if (transport->trace & PN_TRACE_FRM)
+ pn_transport_logf(transport, "%u <- (EMPTY FRAME)", frame.channel);
+ return 0;
+ }
+
+ ssize_t dsize = pn_data_decode(args, frame.payload, frame.size);
+ if (dsize < 0) {
+ pn_string_format(transport->scratch,
+ "Error decoding frame: %s %s\n", pn_code(dsize),
+ pn_error_text(pn_data_error(args)));
+ pn_quote(transport->scratch, frame.payload, frame.size);
+ pn_transport_log(transport, pn_string_get(transport->scratch));
+ return dsize;
+ }
+
+ uint8_t frame_type = frame.type;
+ uint16_t channel = frame.channel;
+ // XXX: assuming numeric -
+ // if we get a symbol we should map it to the numeric value and dispatch on that
+ uint64_t lcode;
+ bool scanned;
+ int e = pn_data_scan(args, "D?L.", &scanned, &lcode);
+ if (e) {
+ pn_transport_log(transport, "Scan error");
+ return e;
+ }
+ if (!scanned) {
+ pn_transport_log(transport, "Error dispatching frame");
+ return PN_ERR;
+ }
+ size_t payload_size = frame.size - dsize;
+ const char *payload_mem = payload_size ? frame.payload + dsize : NULL;
+ pn_bytes_t payload = {payload_size, payload_mem};
+
+ pn_do_trace(transport, channel, IN, args, payload_mem, payload_size);
+
+ int err = pni_dispatch_action(transport, lcode, frame_type, channel, args, &payload);
+
+ pn_data_clear(args);
+
+ return err;
+}
+
+ssize_t pn_dispatcher_input(pn_transport_t *transport, const char *bytes, size_t available, bool batch, bool *halt)
+{
+ size_t read = 0;
+
+ while (available && !*halt) {
+ pn_frame_t frame;
+
+ ssize_t n = pn_read_frame(&frame, bytes + read, available, transport->local_max_frame);
+ if (n > 0) {
+ read += n;
+ available -= n;
+ transport->input_frames_ct += 1;
+ int e = pni_dispatch_frame(transport, transport->args, frame);
+ if (e) return e;
+ } else if (n < 0) {
+ pn_do_error(transport, "amqp:connection:framing-error", "malformed frame");
+ return n;
+ } else {
+ break;
+ }
+
+ if (!batch) break;
+ }
+
+ return read;
+}
+
+ssize_t pn_dispatcher_output(pn_transport_t *transport, char *bytes, size_t size)
+{
+ int n = transport->available < size ? transport->available : size;
+ memmove(bytes, transport->output, n);
+ memmove(transport->output, transport->output + n, transport->available - n);
+ transport->available -= n;
+ // XXX: need to check for errors
+ return n;
+}
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/a5850716/proton-c/src/core/dispatcher.h
----------------------------------------------------------------------
diff --git a/proton-c/src/core/dispatcher.h b/proton-c/src/core/dispatcher.h
new file mode 100644
index 0000000..29881b5
--- /dev/null
+++ b/proton-c/src/core/dispatcher.h
@@ -0,0 +1,37 @@
+#ifndef _PROTON_DISPATCHER_H
+#define _PROTON_DISPATCHER_H 1
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#ifndef __cplusplus
+#include <stdbool.h>
+#endif
+
+#include "proton/codec.h"
+#include "proton/types.h"
+
+typedef int (pn_action_t)(pn_transport_t *transport, uint8_t frame_type, uint16_t channel, pn_data_t *args, const pn_bytes_t *payload);
+
+ssize_t pn_dispatcher_input(pn_transport_t* transport, const char* bytes, size_t available, bool batch, bool* halt);
+ssize_t pn_dispatcher_output(pn_transport_t *transport, char *bytes, size_t size);
+
+#endif /* dispatcher.h */
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/a5850716/proton-c/src/core/encoder.c
----------------------------------------------------------------------
diff --git a/proton-c/src/core/encoder.c b/proton-c/src/core/encoder.c
new file mode 100644
index 0000000..f8145fc
--- /dev/null
+++ b/proton-c/src/core/encoder.c
@@ -0,0 +1,383 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include <proton/error.h>
+#include <proton/object.h>
+#include <proton/codec.h>
+#include "encodings.h"
+#include "encoder.h"
+
+#include <string.h>
+
+#include "data.h"
+
+struct pn_encoder_t {
+ char *output;
+ size_t size;
+ char *position;
+ pn_error_t *error;
+};
+
+static void pn_encoder_initialize(void *obj)
+{
+ pn_encoder_t *encoder = (pn_encoder_t *) obj;
+ encoder->output = NULL;
+ encoder->size = 0;
+ encoder->position = NULL;
+ encoder->error = pn_error();
+}
+
+static void pn_encoder_finalize(void *obj) {
+ pn_encoder_t *encoder = (pn_encoder_t *) obj;
+ pn_error_free(encoder->error);
+}
+
+#define pn_encoder_hashcode NULL
+#define pn_encoder_compare NULL
+#define pn_encoder_inspect NULL
+
+pn_encoder_t *pn_encoder()
+{
+ static const pn_class_t clazz = PN_CLASS(pn_encoder);
+ return (pn_encoder_t *) pn_class_new(&clazz, sizeof(pn_encoder_t));
+}
+
+static uint8_t pn_type2code(pn_encoder_t *encoder, pn_type_t type)
+{
+ switch (type)
+ {
+ case PN_NULL: return PNE_NULL;
+ case PN_BOOL: return PNE_BOOLEAN;
+ case PN_UBYTE: return PNE_UBYTE;
+ case PN_BYTE: return PNE_BYTE;
+ case PN_USHORT: return PNE_USHORT;
+ case PN_SHORT: return PNE_SHORT;
+ case PN_UINT: return PNE_UINT;
+ case PN_INT: return PNE_INT;
+ case PN_CHAR: return PNE_UTF32;
+ case PN_FLOAT: return PNE_FLOAT;
+ case PN_LONG: return PNE_LONG;
+ case PN_TIMESTAMP: return PNE_MS64;
+ case PN_DOUBLE: return PNE_DOUBLE;
+ case PN_DECIMAL32: return PNE_DECIMAL32;
+ case PN_DECIMAL64: return PNE_DECIMAL64;
+ case PN_DECIMAL128: return PNE_DECIMAL128;
+ case PN_UUID: return PNE_UUID;
+ case PN_ULONG: return PNE_ULONG;
+ case PN_BINARY: return PNE_VBIN32;
+ case PN_STRING: return PNE_STR32_UTF8;
+ case PN_SYMBOL: return PNE_SYM32;
+ case PN_LIST: return PNE_LIST32;
+ case PN_ARRAY: return PNE_ARRAY32;
+ case PN_MAP: return PNE_MAP32;
+ case PN_DESCRIBED: return PNE_DESCRIPTOR;
+ default:
+ return pn_error_format(encoder->error, PN_ERR, "not a value type: %u\n", type);
+ }
+}
+
+static uint8_t pn_node2code(pn_encoder_t *encoder, pni_node_t *node)
+{
+ switch (node->atom.type) {
+ case PN_LONG:
+ if (-128 <= node->atom.u.as_long && node->atom.u.as_long <= 127) {
+ return PNE_SMALLLONG;
+ } else {
+ return PNE_LONG;
+ }
+ case PN_INT:
+ if (-128 <= node->atom.u.as_int && node->atom.u.as_int <= 127) {
+ return PNE_SMALLINT;
+ } else {
+ return PNE_INT;
+ }
+ case PN_ULONG:
+ if (node->atom.u.as_ulong < 256) {
+ return PNE_SMALLULONG;
+ } else {
+ return PNE_ULONG;
+ }
+ case PN_UINT:
+ if (node->atom.u.as_uint < 256) {
+ return PNE_SMALLUINT;
+ } else {
+ return PNE_UINT;
+ }
+ case PN_BOOL:
+ if (node->atom.u.as_bool) {
+ return PNE_TRUE;
+ } else {
+ return PNE_FALSE;
+ }
+ case PN_STRING:
+ if (node->atom.u.as_bytes.size < 256) {
+ return PNE_STR8_UTF8;
+ } else {
+ return PNE_STR32_UTF8;
+ }
+ case PN_SYMBOL:
+ if (node->atom.u.as_bytes.size < 256) {
+ return PNE_SYM8;
+ } else {
+ return PNE_SYM32;
+ }
+ case PN_BINARY:
+ if (node->atom.u.as_bytes.size < 256) {
+ return PNE_VBIN8;
+ } else {
+ return PNE_VBIN32;
+ }
+ default:
+ return pn_type2code(encoder, node->atom.type);
+ }
+}
+
+static size_t pn_encoder_remaining(pn_encoder_t *encoder) {
+ char * end = encoder->output + encoder->size;
+ if (end > encoder->position)
+ return end - encoder->position;
+ else
+ return 0;
+}
+
+static inline void pn_encoder_writef8(pn_encoder_t *encoder, uint8_t value)
+{
+ if (pn_encoder_remaining(encoder)) {
+ encoder->position[0] = value;
+ }
+ encoder->position++;
+}
+
+static inline void pn_encoder_writef16(pn_encoder_t *encoder, uint16_t value)
+{
+ if (pn_encoder_remaining(encoder) >= 2) {
+ encoder->position[0] = 0xFF & (value >> 8);
+ encoder->position[1] = 0xFF & (value );
+ }
+ encoder->position += 2;
+}
+
+static inline void pn_encoder_writef32(pn_encoder_t *encoder, uint32_t value)
+{
+ if (pn_encoder_remaining(encoder) >= 4) {
+ encoder->position[0] = 0xFF & (value >> 24);
+ encoder->position[1] = 0xFF & (value >> 16);
+ encoder->position[2] = 0xFF & (value >> 8);
+ encoder->position[3] = 0xFF & (value );
+ }
+ encoder->position += 4;
+}
+
+static inline void pn_encoder_writef64(pn_encoder_t *encoder, uint64_t value) {
+ if (pn_encoder_remaining(encoder) >= 8) {
+ encoder->position[0] = 0xFF & (value >> 56);
+ encoder->position[1] = 0xFF & (value >> 48);
+ encoder->position[2] = 0xFF & (value >> 40);
+ encoder->position[3] = 0xFF & (value >> 32);
+ encoder->position[4] = 0xFF & (value >> 24);
+ encoder->position[5] = 0xFF & (value >> 16);
+ encoder->position[6] = 0xFF & (value >> 8);
+ encoder->position[7] = 0xFF & (value );
+ }
+ encoder->position += 8;
+}
+
+static inline void pn_encoder_writef128(pn_encoder_t *encoder, char *value) {
+ if (pn_encoder_remaining(encoder) >= 16) {
+ memmove(encoder->position, value, 16);
+ }
+ encoder->position += 16;
+}
+
+static inline void pn_encoder_writev8(pn_encoder_t *encoder, const pn_bytes_t *value)
+{
+ pn_encoder_writef8(encoder, value->size);
+ if (pn_encoder_remaining(encoder) >= value->size)
+ memmove(encoder->position, value->start, value->size);
+ encoder->position += value->size;
+}
+
+static inline void pn_encoder_writev32(pn_encoder_t *encoder, const pn_bytes_t *value)
+{
+ pn_encoder_writef32(encoder, value->size);
+ if (pn_encoder_remaining(encoder) >= value->size)
+ memmove(encoder->position, value->start, value->size);
+ encoder->position += value->size;
+}
+
+/* True if node is an element of an array - not the descriptor. */
+static bool pn_is_in_array(pn_data_t *data, pni_node_t *parent, pni_node_t *node) {
+ return (parent && parent->atom.type == PN_ARRAY) /* In array */
+ && !(parent->described && !node->prev); /* Not the descriptor */
+}
+
+/** True if node is the first element of an array, not the descriptor.
+ *@pre pn_is_in_array(data, parent, node)
+ */
+static bool pn_is_first_in_array(pn_data_t *data, pni_node_t *parent, pni_node_t *node) {
+ if (!node->prev) return !parent->described; /* First node */
+ return parent->described && (!pn_data_node(data, node->prev)->prev);
+}
+
+typedef union {
+ uint32_t i;
+ uint32_t a[2];
+ uint64_t l;
+ float f;
+ double d;
+} conv_t;
+
+static int pni_encoder_enter(void *ctx, pn_data_t *data, pni_node_t *node)
+{
+ pn_encoder_t *encoder = (pn_encoder_t *) ctx;
+ pni_node_t *parent = pn_data_node(data, node->parent);
+ pn_atom_t *atom = &node->atom;
+ uint8_t code;
+ conv_t c;
+
+ /** In an array we don't write the code before each element, only the first. */
+ if (pn_is_in_array(data, parent, node)) {
+ code = pn_type2code(encoder, parent->type);
+ if (pn_is_first_in_array(data, parent, node)) {
+ pn_encoder_writef8(encoder, code);
+ }
+ } else {
+ code = pn_node2code(encoder, node);
+ pn_encoder_writef8(encoder, code);
+ }
+
+ switch (code) {
+ case PNE_DESCRIPTOR:
+ case PNE_NULL:
+ case PNE_TRUE:
+ case PNE_FALSE: return 0;
+ case PNE_BOOLEAN: pn_encoder_writef8(encoder, atom->u.as_bool); return 0;
+ case PNE_UBYTE: pn_encoder_writef8(encoder, atom->u.as_ubyte); return 0;
+ case PNE_BYTE: pn_encoder_writef8(encoder, atom->u.as_byte); return 0;
+ case PNE_USHORT: pn_encoder_writef16(encoder, atom->u.as_ushort); return 0;
+ case PNE_SHORT: pn_encoder_writef16(encoder, atom->u.as_short); return 0;
+ case PNE_UINT0: return 0;
+ case PNE_SMALLUINT: pn_encoder_writef8(encoder, atom->u.as_uint); return 0;
+ case PNE_UINT: pn_encoder_writef32(encoder, atom->u.as_uint); return 0;
+ case PNE_SMALLINT: pn_encoder_writef8(encoder, atom->u.as_int); return 0;
+ case PNE_INT: pn_encoder_writef32(encoder, atom->u.as_int); return 0;
+ case PNE_UTF32: pn_encoder_writef32(encoder, atom->u.as_char); return 0;
+ case PNE_ULONG: pn_encoder_writef64(encoder, atom->u.as_ulong); return 0;
+ case PNE_SMALLULONG: pn_encoder_writef8(encoder, atom->u.as_ulong); return 0;
+ case PNE_LONG: pn_encoder_writef64(encoder, atom->u.as_long); return 0;
+ case PNE_SMALLLONG: pn_encoder_writef8(encoder, atom->u.as_long); return 0;
+ case PNE_MS64: pn_encoder_writef64(encoder, atom->u.as_timestamp); return 0;
+ case PNE_FLOAT: c.f = atom->u.as_float; pn_encoder_writef32(encoder, c.i); return 0;
+ case PNE_DOUBLE: c.d = atom->u.as_double; pn_encoder_writef64(encoder, c.l); return 0;
+ case PNE_DECIMAL32: pn_encoder_writef32(encoder, atom->u.as_decimal32); return 0;
+ case PNE_DECIMAL64: pn_encoder_writef64(encoder, atom->u.as_decimal64); return 0;
+ case PNE_DECIMAL128: pn_encoder_writef128(encoder, atom->u.as_decimal128.bytes); return 0;
+ case PNE_UUID: pn_encoder_writef128(encoder, atom->u.as_uuid.bytes); return 0;
+ case PNE_VBIN8: pn_encoder_writev8(encoder, &atom->u.as_bytes); return 0;
+ case PNE_VBIN32: pn_encoder_writev32(encoder, &atom->u.as_bytes); return 0;
+ case PNE_STR8_UTF8: pn_encoder_writev8(encoder, &atom->u.as_bytes); return 0;
+ case PNE_STR32_UTF8: pn_encoder_writev32(encoder, &atom->u.as_bytes); return 0;
+ case PNE_SYM8: pn_encoder_writev8(encoder, &atom->u.as_bytes); return 0;
+ case PNE_SYM32: pn_encoder_writev32(encoder, &atom->u.as_bytes); return 0;
+ case PNE_ARRAY32:
+ node->start = encoder->position;
+ node->small = false;
+ // we'll backfill the size on exit
+ encoder->position += 4;
+ pn_encoder_writef32(encoder, node->described ? node->children - 1 : node->children);
+ if (node->described)
+ pn_encoder_writef8(encoder, 0);
+ return 0;
+ case PNE_LIST32:
+ case PNE_MAP32:
+ node->start = encoder->position;
+ node->small = false;
+ // we'll backfill the size later
+ encoder->position += 4;
+ pn_encoder_writef32(encoder, node->children);
+ return 0;
+ default:
+ return pn_error_format(data->error, PN_ERR, "unrecognized encoding: %u", code);
+ }
+}
+
+#include <stdio.h>
+
+static int pni_encoder_exit(void *ctx, pn_data_t *data, pni_node_t *node)
+{
+ pn_encoder_t *encoder = (pn_encoder_t *) ctx;
+ char *pos;
+
+ switch (node->atom.type) {
+ case PN_ARRAY:
+ if ((node->described && node->children == 1) || (!node->described && node->children == 0)) {
+ pn_encoder_writef8(encoder, pn_type2code(encoder, node->type));
+ }
+ // Fallthrough
+ case PN_LIST:
+ case PN_MAP:
+ pos = encoder->position;
+ encoder->position = node->start;
+ if (node->small) {
+ // backfill size
+ size_t size = pos - node->start - 1;
+ pn_encoder_writef8(encoder, size);
+ } else {
+ // backfill size
+ size_t size = pos - node->start - 4;
+ pn_encoder_writef32(encoder, size);
+ }
+ encoder->position = pos;
+ return 0;
+ default:
+ return 0;
+ }
+}
+
+ssize_t pn_encoder_encode(pn_encoder_t *encoder, pn_data_t *src, char *dst, size_t size)
+{
+ encoder->output = dst;
+ encoder->position = dst;
+ encoder->size = size;
+
+ int err = pni_data_traverse(src, pni_encoder_enter, pni_encoder_exit, encoder);
+ if (err) return err;
+ size_t encoded = encoder->position - encoder->output;
+ if (encoded > size) {
+ pn_error_format(pn_data_error(src), PN_OVERFLOW, "not enough space to encode");
+ return PN_OVERFLOW;
+ }
+ return (ssize_t)encoded;
+}
+
+ssize_t pn_encoder_size(pn_encoder_t *encoder, pn_data_t *src)
+{
+ encoder->output = 0;
+ encoder->position = 0;
+ encoder->size = 0;
+
+ pn_handle_t save = pn_data_point(src);
+ int err = pni_data_traverse(src, pni_encoder_enter, pni_encoder_exit, encoder);
+ pn_data_restore(src, save);
+
+ if (err) return err;
+ return encoder->position - encoder->output;
+}
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/a5850716/proton-c/src/core/encoder.h
----------------------------------------------------------------------
diff --git a/proton-c/src/core/encoder.h b/proton-c/src/core/encoder.h
new file mode 100644
index 0000000..20876cb
--- /dev/null
+++ b/proton-c/src/core/encoder.h
@@ -0,0 +1,31 @@
+#ifndef _PROTON_ENCODER_H
+#define _PROTON_ENCODER_H 1
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+typedef struct pn_encoder_t pn_encoder_t;
+
+pn_encoder_t *pn_encoder(void);
+ssize_t pn_encoder_encode(pn_encoder_t *encoder, pn_data_t *src, char *dst, size_t size);
+ssize_t pn_encoder_size(pn_encoder_t *encoder, pn_data_t *src);
+
+#endif /* encoder.h */
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/a5850716/proton-c/src/core/engine-internal.h
----------------------------------------------------------------------
diff --git a/proton-c/src/core/engine-internal.h b/proton-c/src/core/engine-internal.h
new file mode 100644
index 0000000..fdaf272
--- /dev/null
+++ b/proton-c/src/core/engine-internal.h
@@ -0,0 +1,375 @@
+#ifndef _PROTON_ENGINE_INTERNAL_H
+#define _PROTON_ENGINE_INTERNAL_H 1
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include <proton/object.h>
+#include <proton/engine.h>
+#include <proton/types.h>
+
+#include "buffer.h"
+#include "dispatcher.h"
+#include "util.h"
+
+typedef enum pn_endpoint_type_t {CONNECTION, SESSION, SENDER, RECEIVER} pn_endpoint_type_t;
+
+typedef struct pn_endpoint_t pn_endpoint_t;
+
+struct pn_condition_t {
+ pn_string_t *name;
+ pn_string_t *description;
+ pn_data_t *info;
+};
+
+struct pn_endpoint_t {
+ pn_endpoint_type_t type;
+ pn_state_t state;
+ pn_error_t *error;
+ pn_condition_t condition;
+ pn_condition_t remote_condition;
+ pn_endpoint_t *endpoint_next;
+ pn_endpoint_t *endpoint_prev;
+ pn_endpoint_t *transport_next;
+ pn_endpoint_t *transport_prev;
+ int refcount; // when this hits zero we generate a final event
+ bool modified;
+ bool freed;
+ bool referenced;
+};
+
+typedef struct {
+ pn_sequence_t id;
+ bool sent;
+ bool init;
+} pn_delivery_state_t;
+
+typedef struct {
+ pn_sequence_t next;
+ pn_hash_t *deliveries;
+} pn_delivery_map_t;
+
+typedef struct {
+ // XXX: stop using negative numbers
+ uint32_t local_handle;
+ uint32_t remote_handle;
+ pn_sequence_t delivery_count;
+ pn_sequence_t link_credit;
+} pn_link_state_t;
+
+typedef struct {
+ // XXX: stop using negative numbers
+ uint16_t local_channel;
+ uint16_t remote_channel;
+ bool incoming_init;
+ pn_delivery_map_t incoming;
+ pn_delivery_map_t outgoing;
+ pn_sequence_t incoming_transfer_count;
+ pn_sequence_t incoming_window;
+ pn_sequence_t remote_incoming_window;
+ pn_sequence_t outgoing_transfer_count;
+ pn_sequence_t outgoing_window;
+ pn_hash_t *local_handles;
+ pn_hash_t *remote_handles;
+
+ uint64_t disp_code;
+ bool disp_settled;
+ bool disp_type;
+ pn_sequence_t disp_first;
+ pn_sequence_t disp_last;
+ bool disp;
+} pn_session_state_t;
+
+typedef struct pn_io_layer_t {
+ ssize_t (*process_input)(struct pn_transport_t *transport, unsigned int layer, const char *, size_t);
+ ssize_t (*process_output)(struct pn_transport_t *transport, unsigned int layer, char *, size_t);
+ void (*handle_error)(struct pn_transport_t* transport, unsigned int layer);
+ pn_timestamp_t (*process_tick)(struct pn_transport_t *transport, unsigned int layer, pn_timestamp_t);
+ size_t (*buffered_output)(struct pn_transport_t *transport); // how much output is held
+} pn_io_layer_t;
+
+extern const pn_io_layer_t pni_passthru_layer;
+extern const pn_io_layer_t ssl_layer;
+extern const pn_io_layer_t sasl_header_layer;
+extern const pn_io_layer_t sasl_write_header_layer;
+
+// Bit flag defines for the protocol layers
+typedef uint8_t pn_io_layer_flags_t;
+#define LAYER_NONE 0
+#define LAYER_AMQP1 1
+#define LAYER_AMQPSASL 2
+#define LAYER_AMQPSSL 4
+#define LAYER_SSL 8
+
+typedef struct pni_sasl_t pni_sasl_t;
+typedef struct pni_ssl_t pni_ssl_t;
+
+struct pn_transport_t {
+ pn_tracer_t tracer;
+ pni_sasl_t *sasl;
+ pni_ssl_t *ssl;
+ pn_connection_t *connection; // reference counted
+ char *remote_container;
+ char *remote_hostname;
+ pn_data_t *remote_offered_capabilities;
+ pn_data_t *remote_desired_capabilities;
+ pn_data_t *remote_properties;
+ pn_data_t *disp_data;
+ //#define PN_DEFAULT_MAX_FRAME_SIZE (16*1024)
+#define PN_DEFAULT_MAX_FRAME_SIZE (0) /* for now, allow unlimited size */
+ uint32_t local_max_frame;
+ uint32_t remote_max_frame;
+ pn_condition_t remote_condition;
+ pn_condition_t condition;
+ pn_error_t *error;
+
+#define PN_IO_LAYER_CT 3
+ const pn_io_layer_t *io_layers[PN_IO_LAYER_CT];
+
+ /* dead remote detection */
+ pn_millis_t local_idle_timeout;
+ pn_millis_t remote_idle_timeout;
+ pn_timestamp_t dead_remote_deadline;
+ uint64_t last_bytes_input;
+
+ /* keepalive */
+ pn_timestamp_t keepalive_deadline;
+ uint64_t last_bytes_output;
+
+ pn_hash_t *local_channels;
+ pn_hash_t *remote_channels;
+
+
+ /* scratch area */
+ pn_string_t *scratch;
+ pn_data_t *args;
+ pn_data_t *output_args;
+ pn_buffer_t *frame; // frame under construction
+ // Temporary
+ size_t capacity;
+ size_t available; /* number of raw bytes pending output */
+ char *output;
+
+ /* statistics */
+ uint64_t bytes_input;
+ uint64_t bytes_output;
+ uint64_t output_frames_ct;
+ uint64_t input_frames_ct;
+
+ /* output buffered for send */
+ size_t output_size;
+ size_t output_pending;
+ char *output_buf;
+
+ /* input from peer */
+ size_t input_size;
+ size_t input_pending;
+ char *input_buf;
+
+ pn_record_t *context;
+
+ pn_trace_t trace;
+
+ /*
+ * The maximum channel number can be constrained in several ways:
+ * 1. an unchangeable limit imposed by this library code
+ * 2. a limit imposed by the remote peer when the connection is opened,
+ * which this app must honor
+ * 3. a limit imposed by this app, which may be raised and lowered
+ * until the OPEN frame is sent.
+ * These constraints are all summed up in channel_max, below.
+ */
+ #define PN_IMPL_CHANNEL_MAX 32767
+ uint16_t local_channel_max;
+ uint16_t remote_channel_max;
+ uint16_t channel_max;
+
+ pn_io_layer_flags_t allowed_layers;
+ pn_io_layer_flags_t present_layers;
+
+ bool freed;
+ bool open_sent;
+ bool open_rcvd;
+ bool close_sent;
+ bool close_rcvd;
+ bool tail_closed; // input stream closed by driver
+ bool head_closed;
+ bool done_processing; // if true, don't call pn_process again
+ bool posted_idle_timeout;
+ bool server;
+ bool halt;
+ bool auth_required;
+ bool authenticated;
+ bool encryption_required;
+
+ bool referenced;
+};
+
+struct pn_connection_t {
+ pn_endpoint_t endpoint;
+ pn_endpoint_t *endpoint_head;
+ pn_endpoint_t *endpoint_tail;
+ pn_endpoint_t *transport_head; // reference counted
+ pn_endpoint_t *transport_tail;
+ pn_list_t *sessions;
+ pn_list_t *freed;
+ pn_transport_t *transport;
+ pn_delivery_t *work_head;
+ pn_delivery_t *work_tail;
+ pn_delivery_t *tpwork_head; // reference counted
+ pn_delivery_t *tpwork_tail;
+ pn_string_t *container;
+ pn_string_t *hostname;
+ pn_string_t *auth_user;
+ pn_string_t *auth_password;
+ pn_data_t *offered_capabilities;
+ pn_data_t *desired_capabilities;
+ pn_data_t *properties;
+ pn_collector_t *collector;
+ pn_record_t *context;
+ pn_list_t *delivery_pool;
+};
+
+struct pn_session_t {
+ pn_endpoint_t endpoint;
+ pn_connection_t *connection; // reference counted
+ pn_list_t *links;
+ pn_list_t *freed;
+ pn_record_t *context;
+ size_t incoming_capacity;
+ pn_sequence_t incoming_bytes;
+ pn_sequence_t outgoing_bytes;
+ pn_sequence_t incoming_deliveries;
+ pn_sequence_t outgoing_deliveries;
+ pn_sequence_t outgoing_window;
+ pn_session_state_t state;
+};
+
+struct pn_terminus_t {
+ pn_string_t *address;
+ pn_data_t *properties;
+ pn_data_t *capabilities;
+ pn_data_t *outcomes;
+ pn_data_t *filter;
+ pn_durability_t durability;
+ pn_expiry_policy_t expiry_policy;
+ pn_seconds_t timeout;
+ pn_terminus_type_t type;
+ pn_distribution_mode_t distribution_mode;
+ bool dynamic;
+};
+
+struct pn_link_t {
+ pn_endpoint_t endpoint;
+ pn_terminus_t source;
+ pn_terminus_t target;
+ pn_terminus_t remote_source;
+ pn_terminus_t remote_target;
+ pn_link_state_t state;
+ pn_string_t *name;
+ pn_session_t *session; // reference counted
+ pn_delivery_t *unsettled_head;
+ pn_delivery_t *unsettled_tail;
+ pn_delivery_t *current;
+ pn_record_t *context;
+ size_t unsettled_count;
+ pn_sequence_t available;
+ pn_sequence_t credit;
+ pn_sequence_t queued;
+ int drained; // number of drained credits
+ uint8_t snd_settle_mode;
+ uint8_t rcv_settle_mode;
+ uint8_t remote_snd_settle_mode;
+ uint8_t remote_rcv_settle_mode;
+ bool drain_flag_mode; // receiver only
+ bool drain;
+ bool detached;
+};
+
+struct pn_disposition_t {
+ pn_condition_t condition;
+ uint64_t type;
+ pn_data_t *data;
+ pn_data_t *annotations;
+ uint64_t section_offset;
+ uint32_t section_number;
+ bool failed;
+ bool undeliverable;
+ bool settled;
+};
+
+struct pn_delivery_t {
+ pn_disposition_t local;
+ pn_disposition_t remote;
+ pn_link_t *link; // reference counted
+ pn_buffer_t *tag;
+ pn_delivery_t *unsettled_next;
+ pn_delivery_t *unsettled_prev;
+ pn_delivery_t *work_next;
+ pn_delivery_t *work_prev;
+ pn_delivery_t *tpwork_next;
+ pn_delivery_t *tpwork_prev;
+ pn_delivery_state_t state;
+ pn_buffer_t *bytes;
+ pn_record_t *context;
+ bool updated;
+ bool settled; // tracks whether we're in the unsettled list or not
+ bool work;
+ bool tpwork;
+ bool done;
+ bool referenced;
+};
+
+#define PN_SET_LOCAL(OLD, NEW) \
+ (OLD) = ((OLD) & PN_REMOTE_MASK) | (NEW)
+
+#define PN_SET_REMOTE(OLD, NEW) \
+ (OLD) = ((OLD) & PN_LOCAL_MASK) | (NEW)
+
+void pn_link_dump(pn_link_t *link);
+
+void pn_dump(pn_connection_t *conn);
+void pn_transport_sasl_init(pn_transport_t *transport);
+
+void pn_condition_init(pn_condition_t *condition);
+void pn_condition_tini(pn_condition_t *condition);
+void pn_modified(pn_connection_t *connection, pn_endpoint_t *endpoint, bool emit);
+void pn_real_settle(pn_delivery_t *delivery); // will free delivery if link is freed
+void pn_clear_tpwork(pn_delivery_t *delivery);
+void pn_work_update(pn_connection_t *connection, pn_delivery_t *delivery);
+void pn_clear_modified(pn_connection_t *connection, pn_endpoint_t *endpoint);
+void pn_connection_bound(pn_connection_t *conn);
+void pn_connection_unbound(pn_connection_t *conn);
+int pn_do_error(pn_transport_t *transport, const char *condition, const char *fmt, ...);
+void pn_set_error_layer(pn_transport_t *transport);
+void pn_session_unbound(pn_session_t* ssn);
+void pn_link_unbound(pn_link_t* link);
+void pn_ep_incref(pn_endpoint_t *endpoint);
+void pn_ep_decref(pn_endpoint_t *endpoint);
+
+int pn_post_frame(pn_transport_t *transport, uint8_t type, uint16_t ch, const char *fmt, ...);
+
+typedef enum {IN, OUT} pn_dir_t;
+
+void pn_do_trace(pn_transport_t *transport, uint16_t ch, pn_dir_t dir,
+ pn_data_t *args, const char *payload, size_t size);
+
+#endif /* engine-internal.h */
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org