You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rh...@apache.org on 2012/03/11 18:04:52 UTC
svn commit: r1299397 - in /qpid/proton/proton-c: ./ include/proton/ src/
src/codec/ src/dispatcher/ src/engine/ src/sasl/ src/types/
Author: rhs
Date: Sun Mar 11 17:04:51 2012
New Revision: 1299397
URL: http://svn.apache.org/viewvc?rev=1299397&view=rev
Log:
added symbol support; factored frame dispatch/post code into dispatcher object; added beginning of sasl impl
Added:
qpid/proton/proton-c/include/proton/sasl.h
qpid/proton/proton-c/src/dispatcher/ (with props)
qpid/proton/proton-c/src/dispatcher/dispatcher.c
qpid/proton/proton-c/src/dispatcher/dispatcher.h
qpid/proton/proton-c/src/sasl/ (with props)
qpid/proton/proton-c/src/sasl/sasl-internal.h
qpid/proton/proton-c/src/sasl/sasl.c
qpid/proton/proton-c/src/security.xml
qpid/proton/proton-c/src/transactions.xml
qpid/proton/proton-c/src/types/symbol.c
Modified:
qpid/proton/proton-c/Makefile
qpid/proton/proton-c/include/proton/codec.h
qpid/proton/proton-c/include/proton/engine.h
qpid/proton/proton-c/include/proton/value.h
qpid/proton/proton-c/src/codec/codec.c
qpid/proton/proton-c/src/codec/types.xml
qpid/proton/proton-c/src/engine/engine-internal.h
qpid/proton/proton-c/src/engine/engine.c
qpid/proton/proton-c/src/messaging.xml
qpid/proton/proton-c/src/protocol.h.py
qpid/proton/proton-c/src/protocol.py
qpid/proton/proton-c/src/proton.c
qpid/proton/proton-c/src/transport.xml
qpid/proton/proton-c/src/types/value-internal.h
qpid/proton/proton-c/src/types/value.c
Modified: qpid/proton/proton-c/Makefile
URL: http://svn.apache.org/viewvc/qpid/proton/proton-c/Makefile?rev=1299397&r1=1299396&r2=1299397&view=diff
==============================================================================
--- qpid/proton/proton-c/Makefile (original)
+++ qpid/proton/proton-c/Makefile Sun Mar 11 17:04:51 2012
@@ -6,14 +6,16 @@ CODEC_SRC := src/codec/codec.c
FRAMING_SRC := src/framing/framing.c
VALUE_SRC := src/types/value.c src/types/array.c src/types/list.c \
src/types/map.c src/types/string.c src/types/binary.c \
- src/types/decode.c
+ src/types/symbol.c src/types/decode.c
UTIL_HDR := include/proton/util.h
VALUE_HDR := include/proton/value.h
+DISPATCHER_SRC := src/dispatcher/dispatcher.c
ENGINE_SRC := src/engine/engine.c
+SASL_SRC := src/sasl/sasl.c
DRIVER_SRC := src/driver.c
SRCS := ${UTIL_SRC} ${VALUE_SRC} ${FRAMING_SRC} ${CODEC_SRC} ${PROTOCOL_SRC} \
- ${ENGINE_SRC} ${DRIVER_SRC}
+ ${DISPATCHER_SRC} ${ENGINE_SRC} ${SASL_SRC} ${DRIVER_SRC}
OBJS := ${SRCS:.c=.o}
DEPS := ${OBJS:.o=.d}
HDRS := ${UTIL_HDR} ${VALUE_HDR} \
@@ -21,6 +23,7 @@ HDRS := ${UTIL_HDR} ${VALUE_HDR} \
${CODEC_SRC:src/codec/%.c=include/proton/%.h} \
src/protocol.h \
include/proton/engine.h \
+ include/proton/sasl.h \
src/codec/encodings.h
PROGRAMS := src/proton
Modified: qpid/proton/proton-c/include/proton/codec.h
URL: http://svn.apache.org/viewvc/qpid/proton/proton-c/include/proton/codec.h?rev=1299397&r1=1299396&r2=1299397&view=diff
==============================================================================
--- qpid/proton/proton-c/include/proton/codec.h (original)
+++ qpid/proton/proton-c/include/proton/codec.h Sun Mar 11 17:04:51 2012
@@ -43,7 +43,7 @@ int pn_write_long(char **pos, char *limi
int pn_write_double(char **pos, char *limit, double v);
int pn_write_binary(char **pos, char *limit, size_t size, char *src);
int pn_write_utf8(char **pos, char *limit, size_t size, char *utf8);
-int pn_write_symbol(char **pos, char *limit, size_t size, char *symbol);
+int pn_write_symbol(char **pos, char *limit, size_t size, const char *symbol);
int pn_write_start(char **pos, char *limit, char **start);
int pn_write_list(char **pos, char *limit, char *start, size_t count);
int pn_write_map(char **pos, char *limit, char *start, size_t count);
Modified: qpid/proton/proton-c/include/proton/engine.h
URL: http://svn.apache.org/viewvc/qpid/proton/proton-c/include/proton/engine.h?rev=1299397&r1=1299396&r2=1299397&view=diff
==============================================================================
--- qpid/proton/proton-c/include/proton/engine.h (original)
+++ qpid/proton/proton-c/include/proton/engine.h Sun Mar 11 17:04:51 2012
@@ -39,7 +39,7 @@ typedef struct pn_delivery_t pn_delivery
typedef enum pn_endpoint_state_t {UNINIT=1, ACTIVE=2, CLOSED=4} pn_endpoint_state_t;
typedef enum pn_endpoint_type_t {CONNECTION=1, TRANSPORT=2, SESSION=3, SENDER=4, RECEIVER=5} pn_endpoint_type_t;
-typedef enum pn_disposition_t {RECEIVED=1, ACCEPTED=2, REJECTED=3, RELEASED=4, MODIFIED=5} pn_disposition_t;
+typedef enum pn_disposition_t {PN_RECEIVED=1, PN_ACCEPTED=2, PN_REJECTED=3, PN_RELEASED=4, PN_MODIFIED=5} pn_disposition_t;
/* Currently the way inheritence is done it is safe to "upcast" from
pn_{transport,connection,session,link,sender,or receiver}_t to
Added: qpid/proton/proton-c/include/proton/sasl.h
URL: http://svn.apache.org/viewvc/qpid/proton/proton-c/include/proton/sasl.h?rev=1299397&view=auto
==============================================================================
--- qpid/proton/proton-c/include/proton/sasl.h (added)
+++ qpid/proton/proton-c/include/proton/sasl.h Sun Mar 11 17:04:51 2012
@@ -0,0 +1,33 @@
+#ifndef _PROTON_SASL_H
+#define _PROTON_SASL_H 1
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include <sys/types.h>
+#include <stdbool.h>
+
+typedef struct pn_sasl_t pn_sasl_t;
+
+typedef enum {SASL_NONE=-1, SASL_OK=0, SASL_AUTH=1, SASL_SYS=2, SASL_PERM=3,
+ SASL_TEMP=4} pn_sasl_outcome_t;
+
+#endif /* sasl.h */
Modified: qpid/proton/proton-c/include/proton/value.h
URL: http://svn.apache.org/viewvc/qpid/proton/proton-c/include/proton/value.h?rev=1299397&r1=1299396&r2=1299397&view=diff
==============================================================================
--- qpid/proton/proton-c/include/proton/value.h (original)
+++ qpid/proton/proton-c/include/proton/value.h Sun Mar 11 17:04:51 2012
@@ -42,6 +42,7 @@ enum TYPE {
FLOAT,
DOUBLE,
CHAR,
+ SYMBOL,
STRING,
BINARY,
ARRAY,
@@ -51,15 +52,16 @@ enum TYPE {
REF
};
-typedef struct pn_value_st pn_value_t;
-typedef struct pn_string_st pn_string_t;
-typedef struct pn_binary_st pn_binary_t;
-typedef struct pn_array_st pn_array_t;
-typedef struct pn_list_st pn_list_t;
-typedef struct pn_map_st pn_map_t;
-typedef struct pn_tag_st pn_tag_t;
+typedef struct pn_value_t pn_value_t;
+typedef struct pn_symbol_t pn_symbol_t;
+typedef struct pn_string_t pn_string_t;
+typedef struct pn_binary_t pn_binary_t;
+typedef struct pn_array_t pn_array_t;
+typedef struct pn_list_t pn_list_t;
+typedef struct pn_map_t pn_map_t;
+typedef struct pn_tag_t pn_tag_t;
-struct pn_value_st {
+struct pn_value_t {
enum TYPE type;
union {
bool as_boolean;
@@ -74,6 +76,7 @@ struct pn_value_st {
float as_float;
double as_double;
wchar_t as_char;
+ pn_symbol_t *as_symbol;
pn_string_t *as_string;
pn_binary_t *as_binary;
pn_array_t *as_array;
@@ -84,7 +87,7 @@ struct pn_value_st {
} u;
};
-struct pn_tag_st {
+struct pn_tag_t {
pn_value_t descriptor;
pn_value_t value;
};
@@ -106,7 +109,9 @@ pn_value_t pn_from_map(pn_map_t *m);
pn_value_t pn_from_tag(pn_tag_t *t);
pn_value_t pn_from_ref(void *r);
pn_value_t pn_from_binary(pn_binary_t *b);
+pn_value_t pn_from_symbol(pn_symbol_t *s);
+int pn_compare_symbol(pn_symbol_t *a, pn_symbol_t *b);
int pn_compare_string(pn_string_t *a, pn_string_t *b);
int pn_compare_binary(pn_binary_t *a, pn_binary_t *b);
int pn_compare_list(pn_list_t *a, pn_list_t *b);
@@ -166,6 +171,12 @@ void pn_visit_tag(pn_tag_t *t, void (*vi
#define pn_to_string(V) ((V).u.as_string)
#define pn_to_binary(V) ((V).u.as_binary)
+/* symbol */
+pn_symbol_t *pn_symbol(char *name);
+size_t pn_symbol_size(pn_symbol_t *s);
+char *pn_symbol_name(pn_symbol_t *s);
+pn_symbol_t *pn_symbol_dup(pn_symbol_t *s);
+
/* string */
Modified: qpid/proton/proton-c/src/codec/codec.c
URL: http://svn.apache.org/viewvc/qpid/proton/proton-c/src/codec/codec.c?rev=1299397&r1=1299396&r2=1299397&view=diff
==============================================================================
--- qpid/proton/proton-c/src/codec/codec.c (original)
+++ qpid/proton/proton-c/src/codec/codec.c Sun Mar 11 17:04:51 2012
@@ -174,8 +174,8 @@ int pn_write_binary(char **pos, char *li
int pn_write_utf8(char **pos, char *limit, size_t size, char *utf8) {
return pn_write_variable(pos, limit, size, utf8, PNE_STR8_UTF8, PNE_STR32_UTF8);
}
-int pn_write_symbol(char **pos, char *limit, size_t size, char *symbol) {
- return pn_write_variable(pos, limit, size, symbol, PNE_SYM8, PNE_SYM32);
+int pn_write_symbol(char **pos, char *limit, size_t size, const char *symbol) {
+ return pn_write_variable(pos, limit, size, (char *) symbol, PNE_SYM8, PNE_SYM32);
}
int pn_write_start(char **pos, char *limit, char **start) {
Modified: qpid/proton/proton-c/src/codec/types.xml
URL: http://svn.apache.org/viewvc/qpid/proton/proton-c/src/codec/types.xml?rev=1299397&r1=1299396&r2=1299397&view=diff
==============================================================================
--- qpid/proton/proton-c/src/codec/types.xml (original)
+++ qpid/proton/proton-c/src/codec/types.xml Sun Mar 11 17:04:51 2012
@@ -1,13 +1,13 @@
<?xml version="1.0"?>
<!--
-
Copyright Bank of America, N.A., Barclays Bank PLC, Cisco Systems, Credit
-Suisse, Deutsche Boerse Systems, Goldman Sachs, HCL Technologies Ltd, INETCO
-Systems Limited, Informatica Corporation, JPMorgan Chase Bank Inc. N.A,
+Suisse, Deutsche Boerse, Envoy Technologies Inc., Goldman Sachs, HCL
+Technologies Ltd, IIT Software GmbH, iMatix Corporation, INETCO Systems Limited,
+Informatica Corporation, JPMorgan Chase & Co., Kaazing Corporation, N.A,
Microsoft Corporation, my-Channels, Novell, Progress Software, Red Hat Inc.,
Software AG, Solace Systems Inc., StormMQ Ltd., Tervela Inc., TWIST Process
-Innovations Ltd, VMware Inc. and WS02 Inc. 2006-2011. All rights reserved.
+Innovations Ltd, VMware, Inc., and WS02 Inc. 2006-2011. All rights reserved.
Redistribution and use in source and binary forms, with or without
modification, are permitted provided that the following conditions
Propchange: qpid/proton/proton-c/src/dispatcher/
------------------------------------------------------------------------------
--- svn:ignore (added)
+++ svn:ignore Sun Mar 11 17:04:51 2012
@@ -0,0 +1 @@
+*.d
Added: qpid/proton/proton-c/src/dispatcher/dispatcher.c
URL: http://svn.apache.org/viewvc/qpid/proton/proton-c/src/dispatcher/dispatcher.c?rev=1299397&view=auto
==============================================================================
--- qpid/proton/proton-c/src/dispatcher/dispatcher.c (added)
+++ qpid/proton/proton-c/src/dispatcher/dispatcher.c Sun Mar 11 17:04:51 2012
@@ -0,0 +1,194 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include <ctype.h>
+#include <stdlib.h>
+#include <stdio.h>
+#include <string.h>
+#include <proton/framing.h>
+#include "dispatcher.h"
+
+pn_dispatcher_t *pn_dispatcher(void *context)
+{
+ pn_dispatcher_t *disp = calloc(sizeof(pn_dispatcher_t), 1);
+
+ disp->context = context;
+
+ disp->channel = 0;
+ disp->code = 0;
+ disp->args = NULL;
+ disp->payload = NULL;
+ disp->size = 0;
+
+ disp->output_args = pn_list(16);
+ // XXX
+ disp->capacity = 4*1024;
+ disp->output = malloc(disp->capacity);
+ disp->available = 0;
+
+ return disp;
+}
+
+void pn_dispatcher_destroy(pn_dispatcher_t *disp)
+{
+ pn_free_list(disp->output_args);
+ free(disp->output);
+ free(disp);
+}
+
+void pn_dispatcher_action(pn_dispatcher_t *disp, uint8_t code, const char *name,
+ pn_action_t *action)
+{
+ disp->actions[code] = action;
+ disp->names[code] = name;
+}
+
+typedef enum {IN, OUT} pn_dir_t;
+
+static void pn_trace(pn_dispatcher_t *disp, uint16_t ch, pn_dir_t dir,
+ uint8_t code, pn_list_t *args, const char *payload,
+ size_t size)
+{
+ pn_format(disp->scratch, SCRATCH, pn_from_list(args));
+ fprintf(stderr, "[%u] %s %s %s", ch, dir == OUT ? "->" : "<-",
+ disp->names[code], disp->scratch);
+ if (size) {
+ fprintf(stderr, " (%zu) \"", size);
+ for (int i = 0; i < size; i++) {
+ char c = payload[i];
+ if (isprint(c)) {
+ fputc(c, stderr);
+ } else {
+ fprintf(stderr, "\\x%.2x", c);
+ }
+ }
+ fprintf(stderr, "\"\n");
+ } else {
+ fprintf(stderr, "\n");
+ }
+}
+
+ssize_t pn_dispatcher_input(pn_dispatcher_t *disp, char *bytes, size_t available)
+{
+ size_t read = 0;
+ while (true) {
+ pn_frame_t frame;
+ size_t n = pn_read_frame(&frame, bytes + read, available);
+ if (n) {
+ pn_value_t performative;
+ ssize_t e = pn_decode(&performative, frame.payload, frame.size);
+ if (e < 0) {
+ fprintf(stderr, "Error decoding frame: %zi\n", e);
+ pn_format(disp->scratch, SCRATCH, pn_value("z", frame.size, frame.payload));
+ fprintf(stderr, "%s\n", disp->scratch);
+ return e;
+ }
+
+ disp->channel = frame.channel;
+ // XXX: assuming numeric
+ uint8_t code = pn_to_uint8(pn_tag_descriptor(pn_to_tag(performative)));
+ disp->code = code;
+ disp->args = pn_to_list(pn_tag_value(pn_to_tag(performative)));
+ disp->size = frame.size - e;
+ if (disp->size)
+ disp->payload = frame.payload + e;
+
+ pn_trace(disp, disp->channel, IN, code, disp->args, disp->payload, disp->size);
+
+ pn_action_t *action = disp->actions[code];
+ action(disp);
+
+ disp->channel = 0;
+ disp->code = 0;
+ disp->args = NULL;
+ disp->size = 0;
+ disp->payload = NULL;
+ pn_visit(performative, pn_free_value);
+
+ available -= n;
+ read += n;
+ } else {
+ break;
+ }
+ }
+
+ return read;
+}
+
+void pn_init_frame(pn_dispatcher_t *disp)
+{
+ pn_list_clear(disp->output_args);
+ disp->output_payload = NULL;
+ disp->output_size = 0;
+}
+
+void pn_field(pn_dispatcher_t *disp, int index, pn_value_t arg)
+{
+ int n = pn_list_size(disp->output_args);
+ if (index >= n)
+ pn_list_fill(disp->output_args, EMPTY_VALUE, index - n + 1);
+ pn_list_set(disp->output_args, index, arg);
+}
+
+void pn_append_payload(pn_dispatcher_t *disp, const char *data, size_t size)
+{
+ disp->output_payload = data;
+ disp->output_size = size;
+}
+
+void pn_post_frame(pn_dispatcher_t *disp, uint16_t ch, uint32_t performative)
+{
+ pn_tag_t tag = { .descriptor = pn_ulong(performative),
+ .value = pn_from_list(disp->output_args) };
+ pn_trace(disp, ch, OUT, performative, disp->output_args, disp->output_payload,
+ disp->output_size);
+ pn_frame_t frame = {0};
+ char bytes[pn_encode_sizeof(pn_from_tag(&tag)) + disp->output_size];
+ size_t size = pn_encode(pn_from_tag(&tag), bytes);
+ for (int i = 0; i < pn_list_size(disp->output_args); i++)
+ pn_visit(pn_list_get(disp->output_args, i), pn_free_value);
+ if (disp->output_size) {
+ memmove(bytes + size, disp->output_payload, disp->output_size);
+ size += disp->output_size;
+ disp->output_payload = NULL;
+ disp->output_size = 0;
+ }
+ frame.channel = ch;
+ frame.payload = bytes;
+ frame.size = size;
+ size_t n;
+ while (!(n = pn_write_frame(disp->output + disp->available,
+ disp->capacity - disp->available, frame))) {
+ disp->capacity *= 2;
+ disp->output = realloc(disp->output, disp->capacity);
+ }
+ disp->available += n;
+}
+
+ssize_t pn_dispatcher_output(pn_dispatcher_t *disp, char *bytes, size_t size)
+{
+ int n = disp->available < size ? disp->available : size;
+ memmove(bytes, disp->output, n);
+ memmove(disp->output, disp->output + n, disp->available - n);
+ disp->available -= n;
+ // XXX: need to check for errors
+ return n;
+}
Added: qpid/proton/proton-c/src/dispatcher/dispatcher.h
URL: http://svn.apache.org/viewvc/qpid/proton/proton-c/src/dispatcher/dispatcher.h?rev=1299397&view=auto
==============================================================================
--- qpid/proton/proton-c/src/dispatcher/dispatcher.h (added)
+++ qpid/proton/proton-c/src/dispatcher/dispatcher.h Sun Mar 11 17:04:51 2012
@@ -0,0 +1,64 @@
+#ifndef _PROTON_DISPATCHER_H
+#define _PROTON_DISPATCHER_H 1
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include <sys/types.h>
+#include <stdbool.h>
+#include <proton/value.h>
+
+typedef struct pn_dispatcher_t pn_dispatcher_t;
+
+typedef void (pn_action_t)(pn_dispatcher_t *disp);
+
+#define SCRATCH (1024)
+
+struct pn_dispatcher_t {
+ pn_action_t *actions[256];
+ const char *names[256];
+ uint16_t channel;
+ uint8_t code;
+ pn_list_t *args;
+ char *payload;
+ size_t size;
+ pn_list_t *output_args;
+ const char *output_payload;
+ size_t output_size;
+ size_t capacity;
+ size_t available;
+ char *output;
+ void *context;
+ char scratch[SCRATCH];
+};
+
+pn_dispatcher_t *pn_dispatcher(void *context);
+void pn_dispatcher_destroy(pn_dispatcher_t *disp);
+void pn_dispatcher_action(pn_dispatcher_t *disp, uint8_t code, const char *name,
+ pn_action_t *action);
+void pn_init_frame(pn_dispatcher_t *disp);
+void pn_field(pn_dispatcher_t *disp, int index, pn_value_t arg);
+void pn_append_payload(pn_dispatcher_t *disp, const char *data, size_t size);
+void pn_post_frame(pn_dispatcher_t *disp, uint16_t ch, uint32_t performative);
+ssize_t pn_dispatcher_input(pn_dispatcher_t *disp, char *bytes, size_t available);
+ssize_t pn_dispatcher_output(pn_dispatcher_t *disp, char *bytes, size_t size);
+
+#endif /* dispatcher.h */
Modified: qpid/proton/proton-c/src/engine/engine-internal.h
URL: http://svn.apache.org/viewvc/qpid/proton/proton-c/src/engine/engine-internal.h?rev=1299397&r1=1299396&r2=1299397&view=diff
==============================================================================
--- qpid/proton/proton-c/src/engine/engine-internal.h (original)
+++ qpid/proton/proton-c/src/engine/engine-internal.h Sun Mar 11 17:04:51 2012
@@ -24,6 +24,7 @@
#include <proton/engine.h>
#include <proton/value.h>
+#include "../dispatcher/dispatcher.h"
#include "../util.h"
#define DESCRIPTION (1024)
@@ -89,13 +90,7 @@ typedef struct {
struct pn_transport_t {
pn_endpoint_t endpoint;
pn_connection_t *connection;
- pn_map_t *dispatch;
- pn_list_t *args;
- const char* payload_bytes;
- size_t payload_size;
- char *output;
- size_t available;
- size_t capacity;
+ pn_dispatcher_t *disp;
bool open_sent;
bool close_sent;
pn_session_state_t *sessions;
Modified: qpid/proton/proton-c/src/engine/engine.c
URL: http://svn.apache.org/viewvc/qpid/proton/proton-c/src/engine/engine.c?rev=1299397&r1=1299396&r2=1299397&view=diff
==============================================================================
--- qpid/proton/proton-c/src/engine/engine.c (original)
+++ qpid/proton/proton-c/src/engine/engine.c Sun Mar 11 17:04:51 2012
@@ -29,7 +29,6 @@
#include <stdarg.h>
#include <stdio.h>
-#include <ctype.h>
// delivery buffers
@@ -188,8 +187,7 @@ void pn_destroy_connection(pn_connection
void pn_destroy_transport(pn_transport_t *transport)
{
- pn_free_map(transport->dispatch);
- pn_free_list(transport->args);
+ pn_dispatcher_destroy(transport->disp);
for (int i = 0; i < transport->session_capacity; i++) {
pn_delivery_buffer_destroy(&transport->sessions[i].incoming);
pn_delivery_buffer_destroy(&transport->sessions[i].outgoing);
@@ -198,7 +196,6 @@ void pn_destroy_transport(pn_transport_t
}
free(transport->sessions);
free(transport->channels);
- free(transport->output);
free(transport);
}
@@ -537,32 +534,31 @@ pn_session_t *pn_session(pn_connection_t
return ssn;
}
- /* pn_map_set(MAP, pn_symbol(PN_HEAP, NAME ## _SYM), pn_ulong(PN_HEAP, NAME)); \ */
-#define __DISPATCH(MAP, NAME) \
- pn_map_set(MAP, pn_ulong(NAME ## _CODE), pn_ulong(NAME ## _IDX))
+void pn_do_open(pn_dispatcher_t *disp);
+void pn_do_begin(pn_dispatcher_t *disp);
+void pn_do_attach(pn_dispatcher_t *disp);
+void pn_do_transfer(pn_dispatcher_t *disp);
+void pn_do_flow(pn_dispatcher_t *disp);
+void pn_do_disposition(pn_dispatcher_t *disp);
+void pn_do_detach(pn_dispatcher_t *disp);
+void pn_do_end(pn_dispatcher_t *disp);
+void pn_do_close(pn_dispatcher_t *disp);
void pn_transport_init(pn_transport_t *transport)
{
pn_endpoint_init(&transport->endpoint, TRANSPORT, transport->connection);
- pn_map_t *m = pn_map(32);
- transport->dispatch = m;
+ transport->disp = pn_dispatcher(transport);
- __DISPATCH(m, OPEN);
- __DISPATCH(m, BEGIN);
- __DISPATCH(m, ATTACH);
- __DISPATCH(m, TRANSFER);
- __DISPATCH(m, FLOW);
- __DISPATCH(m, DISPOSITION);
- __DISPATCH(m, DETACH);
- __DISPATCH(m, END);
- __DISPATCH(m, CLOSE);
-
- transport->args = pn_list(16);
- // XXX
- transport->capacity = 4*1024;
- transport->output = malloc(transport->capacity);
- transport->available = 0;
+ pn_dispatcher_action(transport->disp, OPEN, "OPEN", pn_do_open);
+ pn_dispatcher_action(transport->disp, BEGIN, "BEGIN", pn_do_begin);
+ pn_dispatcher_action(transport->disp, ATTACH, "ATTACH", pn_do_attach);
+ pn_dispatcher_action(transport->disp, TRANSFER, "TRANSFER", pn_do_transfer);
+ pn_dispatcher_action(transport->disp, FLOW, "FLOW", pn_do_flow);
+ pn_dispatcher_action(transport->disp, DISPOSITION, "DISPOSITION", pn_do_disposition);
+ pn_dispatcher_action(transport->disp, DETACH, "DETACH", pn_do_detach);
+ pn_dispatcher_action(transport->disp, END, "END", pn_do_end);
+ pn_dispatcher_action(transport->disp, CLOSE, "CLOSE", pn_do_close);
transport->open_sent = false;
transport->close_sent = false;
@@ -826,31 +822,6 @@ void pn_settle(pn_delivery_t *delivery)
pn_add_tpwork(delivery);
}
-typedef enum {IN, OUT} pn_dir_t;
-
-static void pn_trace(pn_transport_t *transport, uint16_t ch, pn_dir_t dir,
- char *op, pn_list_t *args, const char *payload,
- size_t size)
-{
- pn_format(transport->scratch, SCRATCH, pn_from_list(args));
- fprintf(stderr, "[%u] %s %s %s", ch, dir == OUT ? "->" : "<-", op,
- transport->scratch);
- if (size) {
- fprintf(stderr, " (%zu) \"", size);
- for (int i = 0; i < size; i++) {
- char c = payload[i];
- if (isprint(c)) {
- fputc(c, stderr);
- } else {
- fprintf(stderr, "\\x%.2x", c);
- }
- }
- fprintf(stderr, "\"\n");
- } else {
- fprintf(stderr, "\n");
- }
-}
-
void pn_do_error(pn_transport_t *transport, const char *condition, const char *fmt, ...)
{
va_list ap;
@@ -864,16 +835,18 @@ void pn_do_error(pn_transport_t *transpo
// XXX: need to write close frame if appropriate
}
-void pn_do_open(pn_transport_t *transport, pn_list_t *args)
+void pn_do_open(pn_dispatcher_t *disp)
{
+ pn_transport_t *transport = disp->context;
pn_connection_t *conn = transport->connection;
// TODO: store the state
conn->endpoint.remote_state = ACTIVE;
}
-void pn_do_begin(pn_transport_t *transport, uint16_t ch, pn_list_t *args)
+void pn_do_begin(pn_dispatcher_t *disp)
{
- pn_value_t remote_channel = pn_list_get(args, BEGIN_REMOTE_CHANNEL);
+ pn_transport_t *transport = disp->context;
+ pn_value_t remote_channel = pn_list_get(disp->args, BEGIN_REMOTE_CHANNEL);
pn_session_state_t *state;
if (remote_channel.type == USHORT) {
// XXX: what if session is NULL?
@@ -882,7 +855,7 @@ void pn_do_begin(pn_transport_t *transpo
pn_session_t *ssn = pn_session(transport->connection);
state = pn_session_state(transport, ssn);
}
- pn_map_channel(transport, ch, state);
+ pn_map_channel(transport, disp->channel, state);
state->session->endpoint.remote_state = ACTIVE;
}
@@ -899,12 +872,14 @@ pn_link_state_t *pn_find_link(pn_session
return NULL;
}
-void pn_do_attach(pn_transport_t *transport, uint16_t ch, pn_list_t *args)
+void pn_do_attach(pn_dispatcher_t *disp)
{
+ pn_transport_t *transport = disp->context;
+ pn_list_t *args = disp->args;
uint32_t handle = pn_to_uint32(pn_list_get(args, ATTACH_HANDLE));
bool is_sender = pn_to_bool(pn_list_get(args, ATTACH_ROLE));
pn_string_t *name = pn_to_string(pn_list_get(args, ATTACH_NAME));
- pn_session_state_t *ssn_state = pn_channel_state(transport, ch);
+ pn_session_state_t *ssn_state = pn_channel_state(transport, disp->channel);
pn_link_state_t *link_state = pn_find_link(ssn_state, name);
if (!link_state) {
pn_link_t *link;
@@ -935,11 +910,13 @@ void pn_do_attach(pn_transport_t *transp
}
}
-void pn_do_transfer(pn_transport_t *transport, uint16_t channel, pn_list_t *args, const char *payload_bytes, size_t payload_size)
+void pn_do_transfer(pn_dispatcher_t *disp)
{
// XXX: multi transfer
+ pn_transport_t *transport = disp->context;
+ pn_list_t *args = disp->args;
- pn_session_state_t *ssn_state = pn_channel_state(transport, channel);
+ pn_session_state_t *ssn_state = pn_channel_state(transport, disp->channel);
uint32_t handle = pn_to_uint32(pn_list_get(args, TRANSFER_HANDLE));
pn_link_state_t *link_state = pn_handle_state(ssn_state, handle);
pn_link_t *link = link_state->link;
@@ -953,14 +930,16 @@ void pn_do_transfer(pn_transport_t *tran
// XXX: signal error somehow
}
- PN_ENSURE(delivery->bytes, delivery->capacity, payload_size);
- memmove(delivery->bytes, payload_bytes, payload_size);
- delivery->size = payload_size;
+ PN_ENSURE(delivery->bytes, delivery->capacity, disp->size);
+ memmove(delivery->bytes, disp->payload, disp->size);
+ delivery->size = disp->size;
}
-void pn_do_flow(pn_transport_t *transport, uint16_t channel, pn_list_t *args)
+void pn_do_flow(pn_dispatcher_t *disp)
{
- pn_session_state_t *ssn_state = pn_channel_state(transport, channel);
+ pn_transport_t *transport = disp->context;
+ pn_list_t *args = disp->args;
+ pn_session_state_t *ssn_state = pn_channel_state(transport, disp->channel);
pn_value_t vhandle = pn_list_get(args, FLOW_HANDLE);
if (vhandle.type != EMPTY) {
@@ -984,28 +963,30 @@ void pn_do_flow(pn_transport_t *transpor
}
}
-void pn_do_disposition(pn_transport_t *transport, uint16_t channel, pn_list_t *args)
+void pn_do_disposition(pn_dispatcher_t *disp)
{
- pn_session_state_t *ssn_state = pn_channel_state(transport, channel);
+ pn_transport_t *transport = disp->context;
+ pn_list_t *args = disp->args;
+ pn_session_state_t *ssn_state = pn_channel_state(transport, disp->channel);
bool role = pn_to_bool(pn_list_get(args, DISPOSITION_ROLE));
pn_sequence_t first = pn_to_int32(pn_list_get(args, DISPOSITION_FIRST));
pn_sequence_t last = pn_to_int32(pn_list_get(args, DISPOSITION_LAST));
//bool settled = pn_to_bool(pn_list_get(args, DISPOSITION_SETTLED));
pn_tag_t *dstate = pn_to_tag(pn_list_get(args, DISPOSITION_STATE));
uint64_t code = pn_to_uint32(pn_tag_descriptor(dstate));
- pn_disposition_t disp;
+ pn_disposition_t dispo;
switch (code)
{
- case ACCEPTED_CODE:
- disp = ACCEPTED;
+ case ACCEPTED:
+ dispo = PN_ACCEPTED;
break;
- case REJECTED_CODE:
- disp = REJECTED;
+ case REJECTED:
+ dispo = PN_REJECTED;
break;
default:
// XXX
fprintf(stderr, "default %lu\n", code);
- disp = 0;
+ dispo = 0;
break;
}
@@ -1021,20 +1002,22 @@ void pn_do_disposition(pn_transport_t *t
for (pn_sequence_t id = first; id <= last; id++) {
pn_delivery_state_t *state = pn_delivery_buffer_get(deliveries, id - lwm);
pn_delivery_t *delivery = state->delivery;
- delivery->remote_state = disp;
+ delivery->remote_state = dispo;
delivery->dirty = true;
pn_work_update(transport->connection, delivery);
}
}
-void pn_do_detach(pn_transport_t *transport, uint16_t channel, pn_list_t *args)
+void pn_do_detach(pn_dispatcher_t *disp)
{
+ pn_transport_t *transport = disp->context;
+ pn_list_t *args = disp->args;
uint32_t handle = pn_to_uint32(pn_list_get(args, DETACH_HANDLE));
bool closed = pn_to_bool(pn_list_get(args, DETACH_CLOSED));
- pn_session_state_t *ssn_state = pn_channel_state(transport, channel);
+ pn_session_state_t *ssn_state = pn_channel_state(transport, disp->channel);
if (!ssn_state) {
- pn_do_error(transport, "amqp:invalid-field", "no such channel: %u", channel);
+ pn_do_error(transport, "amqp:invalid-field", "no such channel: %u", disp->channel);
return;
}
pn_link_state_t *link_state = pn_handle_state(ssn_state, handle);
@@ -1050,92 +1033,23 @@ void pn_do_detach(pn_transport_t *transp
}
}
-void pn_do_end(pn_transport_t *transport, uint16_t channel, pn_list_t *args)
+void pn_do_end(pn_dispatcher_t *disp)
{
- pn_session_state_t *ssn_state = pn_channel_state(transport, channel);
+ pn_transport_t *transport = disp->context;
+ pn_session_state_t *ssn_state = pn_channel_state(transport, disp->channel);
pn_session_t *session = ssn_state->session;
ssn_state->remote_channel = -1;
session->endpoint.remote_state = CLOSED;
}
-void pn_do_close(pn_transport_t *transport, pn_list_t *args)
+void pn_do_close(pn_dispatcher_t *disp)
{
+ pn_transport_t *transport = disp->context;
transport->connection->endpoint.remote_state = CLOSED;
transport->endpoint.remote_state = CLOSED;
}
-static char *pn_p2op(uint32_t performative)
-{
- switch (performative)
- {
- case OPEN_CODE:
- return "OPEN";
- case BEGIN_CODE:
- return "BEGIN";
- case ATTACH_CODE:
- return "ATTACH";
- case TRANSFER_CODE:
- return "TRANSFER";
- case FLOW_CODE:
- return "FLOW";
- case DISPOSITION_CODE:
- return "DISPOSITION";
- case DETACH_CODE:
- return "DETACH";
- case END_CODE:
- return "END";
- case CLOSE_CODE:
- return "CLOSE";
- default:
- return "<UNKNOWN>";
- }
-}
-
-void pn_dispatch(pn_transport_t *transport, uint16_t channel,
- pn_tag_t *performative, const char *payload_bytes,
- size_t payload_size)
-{
- pn_value_t desc = pn_tag_descriptor(performative);
- pn_list_t *args = pn_to_list(pn_tag_value(performative));
- pn_value_t cval = pn_map_get(transport->dispatch, desc);
- uint8_t code = pn_to_uint8(cval);
-
- pn_trace(transport, channel, IN, pn_p2op(pn_to_uint32(desc)), args,
- payload_bytes, payload_size);
-
- switch (code)
- {
- case OPEN_IDX:
- pn_do_open(transport, args);
- break;
- case BEGIN_IDX:
- pn_do_begin(transport, channel, args);
- break;
- case ATTACH_IDX:
- pn_do_attach(transport, channel, args);
- break;
- case TRANSFER_IDX:
- pn_do_transfer(transport, channel, args, payload_bytes, payload_size);
- break;
- case FLOW_IDX:
- pn_do_flow(transport, channel, args);
- break;
- case DISPOSITION_IDX:
- pn_do_disposition(transport, channel, args);
- break;
- case DETACH_IDX:
- pn_do_detach(transport, channel, args);
- break;
- case END_IDX:
- pn_do_end(transport, channel, args);
- break;
- case CLOSE_IDX:
- pn_do_close(transport, args);
- break;
- }
-}
-
ssize_t pn_input(pn_transport_t *transport, char *bytes, size_t available)
{
if (transport->endpoint.local_state == CLOSED) {
@@ -1147,84 +1061,7 @@ ssize_t pn_input(pn_transport_t *transpo
return EOS;
}
- size_t read = 0;
- while (true) {
- pn_frame_t frame;
- size_t n = pn_read_frame(&frame, bytes + read, available);
- if (n) {
- pn_value_t performative;
- ssize_t e = pn_decode(&performative, frame.payload, frame.size);
- if (e < 0) {
- fprintf(stderr, "Error decoding frame: %zi\n", e);
- pn_format(transport->scratch, SCRATCH, pn_value("z", frame.size, frame.payload));
- fprintf(stderr, "%s\n", transport->scratch);
- return e;
- }
-
- pn_tag_t *perf = pn_to_tag(performative);
- pn_dispatch(transport, frame.channel, perf, frame.payload + e, frame.size - e);
- pn_visit(performative, pn_free_value);
-
- available -= n;
- read += n;
- } else {
- break;
- }
- }
-
- return read;
-}
-
-void pn_init_frame(pn_transport_t *transport)
-{
- pn_list_clear(transport->args);
- transport->payload_bytes = NULL;
- transport->payload_size = 0;
-}
-
-void pn_field(pn_transport_t *transport, int index, pn_value_t arg)
-{
- int n = pn_list_size(transport->args);
- if (index >= n)
- pn_list_fill(transport->args, EMPTY_VALUE, index - n + 1);
- pn_list_set(transport->args, index, arg);
-}
-
-void pn_append_payload(pn_transport_t *transport, const char *data, size_t size)
-{
- transport->payload_bytes = data;
- transport->payload_size = size;
-}
-
-#define BUF_SIZE (1024*1024)
-
-void pn_post_frame(pn_transport_t *transport, uint16_t ch, uint32_t performative)
-{
- pn_tag_t tag = { .descriptor = pn_ulong(performative),
- .value = pn_from_list(transport->args) };
- pn_frame_t frame = {0};
- char bytes[pn_encode_sizeof(pn_from_tag(&tag)) + transport->payload_size];
- pn_trace(transport, ch, OUT, pn_p2op(performative), transport->args,
- transport->payload_bytes, transport->payload_size);
- size_t size = pn_encode(pn_from_tag(&tag), bytes);
- for (int i = 0; i < pn_list_size(transport->args); i++)
- pn_visit(pn_list_get(transport->args, i), pn_free_value);
- if (transport->payload_size) {
- memmove(bytes + size, transport->payload_bytes, transport->payload_size);
- size += transport->payload_size;
- transport->payload_bytes = NULL;
- transport->payload_size = 0;
- }
- frame.channel = ch;
- frame.payload = bytes;
- frame.size = size;
- size_t n;
- while (!(n = pn_write_frame(transport->output + transport->available,
- transport->capacity - transport->available, frame))) {
- transport->capacity *= 2;
- transport->output = realloc(transport->output, transport->capacity);
- }
- transport->available += n;
+ return pn_dispatcher_input(transport->disp, bytes, available);
}
void pn_process_conn_setup(pn_transport_t *transport, pn_endpoint_t *endpoint)
@@ -1233,12 +1070,12 @@ void pn_process_conn_setup(pn_transport_
{
if (endpoint->local_state != UNINIT && !transport->open_sent)
{
- pn_init_frame(transport);
+ pn_init_frame(transport->disp);
/*if (container_id)
pn_field(eng, OPEN_CONTAINER_ID, pn_value("S", container_id));*/
/*if (hostname)
pn_field(eng, OPEN_HOSTNAME, pn_value("S", hostname));*/
- pn_post_frame(transport, 0, OPEN_CODE);
+ pn_post_frame(transport->disp, 0, OPEN);
transport->open_sent = true;
}
}
@@ -1252,16 +1089,16 @@ void pn_process_ssn_setup(pn_transport_t
pn_session_state_t *state = pn_session_state(transport, ssn);
if (endpoint->local_state != UNINIT && state->local_channel == (uint16_t) -1)
{
- pn_init_frame(transport);
+ pn_init_frame(transport->disp);
if ((int16_t) state->remote_channel >= 0)
- pn_field(transport, BEGIN_REMOTE_CHANNEL, pn_value("H", state->remote_channel));
- pn_field(transport, BEGIN_NEXT_OUTGOING_ID, pn_value("I", state->outgoing.next));
- pn_field(transport, BEGIN_INCOMING_WINDOW, pn_value("I", state->incoming.capacity));
- pn_field(transport, BEGIN_OUTGOING_WINDOW, pn_value("I", state->outgoing.capacity));
+ pn_field(transport->disp, BEGIN_REMOTE_CHANNEL, pn_value("H", state->remote_channel));
+ pn_field(transport->disp, BEGIN_NEXT_OUTGOING_ID, pn_value("I", state->outgoing.next));
+ pn_field(transport->disp, BEGIN_INCOMING_WINDOW, pn_value("I", state->incoming.capacity));
+ pn_field(transport->disp, BEGIN_OUTGOING_WINDOW, pn_value("I", state->outgoing.capacity));
// XXX: we use the session id as the outgoing channel, we depend
// on this for looking up via remote channel
uint16_t channel = ssn->id;
- pn_post_frame(transport, channel, BEGIN_CODE);
+ pn_post_frame(transport->disp, channel, BEGIN);
state->local_channel = channel;
}
}
@@ -1276,21 +1113,21 @@ void pn_process_link_setup(pn_transport_
pn_link_state_t *state = pn_link_state(ssn_state, link);
if (endpoint->local_state != UNINIT && state->local_handle == (uint32_t) -1)
{
- pn_init_frame(transport);
- pn_field(transport, ATTACH_ROLE, pn_boolean(endpoint->type == RECEIVER));
- pn_field(transport, ATTACH_NAME, pn_value("S", link->name));
+ pn_init_frame(transport->disp);
+ pn_field(transport->disp, ATTACH_ROLE, pn_boolean(endpoint->type == RECEIVER));
+ pn_field(transport->disp, ATTACH_NAME, pn_value("S", link->name));
// XXX
state->local_handle = link->id;
- pn_field(transport, ATTACH_HANDLE, pn_value("I", state->local_handle));
+ pn_field(transport->disp, ATTACH_HANDLE, pn_value("I", state->local_handle));
// XXX
- pn_field(transport, ATTACH_INITIAL_DELIVERY_COUNT, pn_value("I", 0));
+ pn_field(transport->disp, ATTACH_INITIAL_DELIVERY_COUNT, pn_value("I", 0));
if (link->local_source)
- pn_field(transport, ATTACH_SOURCE, pn_value("B([S])", SOURCE_CODE,
+ pn_field(transport->disp, ATTACH_SOURCE, pn_value("B([S])", SOURCE,
link->local_source));
if (link->local_target)
- pn_field(transport, ATTACH_TARGET, pn_value("B([S])", TARGET_CODE,
+ pn_field(transport->disp, ATTACH_TARGET, pn_value("B([S])", TARGET,
link->local_target));
- pn_post_frame(transport, ssn_state->local_channel, ATTACH_CODE);
+ pn_post_frame(transport->disp, ssn_state->local_channel, ATTACH);
}
}
}
@@ -1306,15 +1143,15 @@ void pn_process_flow_receiver(pn_transpo
state->link_credit += rcv->credits;
rcv->credits = 0;
- pn_init_frame(transport);
- //pn_field(transport, FLOW_NEXT_INCOMING_ID, pn_value("I", ssn_state->next_incoming_id));
- pn_field(transport, FLOW_INCOMING_WINDOW, pn_value("I", ssn_state->incoming.capacity));
- pn_field(transport, FLOW_NEXT_OUTGOING_ID, pn_value("I", ssn_state->outgoing.next));
- pn_field(transport, FLOW_OUTGOING_WINDOW, pn_value("I", ssn_state->outgoing.capacity));
- pn_field(transport, FLOW_HANDLE, pn_value("I", state->local_handle));
- //pn_field(transport, FLOW_DELIVERY_COUNT, pn_value("I", delivery_count));
- pn_field(transport, FLOW_LINK_CREDIT, pn_value("I", state->link_credit));
- pn_post_frame(transport, ssn_state->local_channel, FLOW_CODE);
+ pn_init_frame(transport->disp);
+ //pn_field(transport->disp, FLOW_NEXT_INCOMING_ID, pn_value("I", ssn_state->next_incoming_id));
+ pn_field(transport->disp, FLOW_INCOMING_WINDOW, pn_value("I", ssn_state->incoming.capacity));
+ pn_field(transport->disp, FLOW_NEXT_OUTGOING_ID, pn_value("I", ssn_state->outgoing.next));
+ pn_field(transport->disp, FLOW_OUTGOING_WINDOW, pn_value("I", ssn_state->outgoing.capacity));
+ pn_field(transport->disp, FLOW_HANDLE, pn_value("I", state->local_handle));
+ //pn_field(transport->disp, FLOW_DELIVERY_COUNT, pn_value("I", delivery_count));
+ pn_field(transport->disp, FLOW_LINK_CREDIT, pn_value("I", state->link_credit));
+ pn_post_frame(transport->disp, ssn_state->local_channel, FLOW);
}
}
}
@@ -1325,28 +1162,28 @@ void pn_post_disp(pn_transport_t *transp
pn_session_state_t *ssn_state = pn_session_state(transport, link->session);
// XXX: check for null state
pn_delivery_state_t *state = delivery->context;
- pn_init_frame(transport);
- pn_field(transport, DISPOSITION_ROLE, pn_boolean(link->endpoint.type == RECEIVER));
- pn_field(transport, DISPOSITION_FIRST, pn_uint(state->id));
- pn_field(transport, DISPOSITION_LAST, pn_uint(state->id));
+ pn_init_frame(transport->disp);
+ pn_field(transport->disp, DISPOSITION_ROLE, pn_boolean(link->endpoint.type == RECEIVER));
+ pn_field(transport->disp, DISPOSITION_FIRST, pn_uint(state->id));
+ pn_field(transport->disp, DISPOSITION_LAST, pn_uint(state->id));
// XXX
- pn_field(transport, DISPOSITION_SETTLED, pn_boolean(delivery->local_settled));
+ pn_field(transport->disp, DISPOSITION_SETTLED, pn_boolean(delivery->local_settled));
uint64_t code;
switch(delivery->local_state) {
case ACCEPTED:
- code = ACCEPTED_CODE;
+ code = ACCEPTED;
break;
case RELEASED:
- code = RELEASED_CODE;
+ code = RELEASED;
break;
//TODO: rejected and modified (both take extra data which may need to be passed through somehow) e.g. change from enum to discriminated union?
default:
code = 0;
}
if (code)
- pn_field(transport, DISPOSITION_STATE, pn_value("L([])", code));
- //pn_field(transport, DISPOSITION_BATCHABLE, pn_boolean(batchable));
- pn_post_frame(transport, ssn_state->local_channel, DISPOSITION_CODE);
+ pn_field(transport->disp, DISPOSITION_STATE, pn_value("L([])", code));
+ //pn_field(transport->disp, DISPOSITION_BATCHABLE, pn_boolean(batchable));
+ pn_post_frame(transport->disp, ssn_state->local_channel, DISPOSITION);
}
void pn_process_disp_receiver(pn_transport_t *transport, pn_endpoint_t *endpoint)
@@ -1392,16 +1229,16 @@ void pn_process_msg_data(pn_transport_t
delivery->context = state;
}
if (!state->sent && (int16_t) ssn_state->local_channel >= 0 && (int32_t) link_state->local_handle >= 0) {
- pn_init_frame(transport);
- pn_field(transport, TRANSFER_HANDLE, pn_value("I", link_state->local_handle));
- pn_field(transport, TRANSFER_DELIVERY_ID, pn_value("I", state->id));
- pn_field(transport, TRANSFER_DELIVERY_TAG, pn_from_binary(pn_binary_dup(delivery->tag)));
- pn_field(transport, TRANSFER_MESSAGE_FORMAT, pn_value("I", 0));
+ pn_init_frame(transport->disp);
+ pn_field(transport->disp, TRANSFER_HANDLE, pn_value("I", link_state->local_handle));
+ pn_field(transport->disp, TRANSFER_DELIVERY_ID, pn_value("I", state->id));
+ pn_field(transport->disp, TRANSFER_DELIVERY_TAG, pn_from_binary(pn_binary_dup(delivery->tag)));
+ pn_field(transport->disp, TRANSFER_MESSAGE_FORMAT, pn_value("I", 0));
if (delivery->bytes) {
- pn_append_payload(transport, delivery->bytes, delivery->size);
+ pn_append_payload(transport->disp, delivery->bytes, delivery->size);
delivery->size = 0;
}
- pn_post_frame(transport, ssn_state->local_channel, TRANSFER_CODE);
+ pn_post_frame(transport->disp, ssn_state->local_channel, TRANSFER);
state->sent = true;
}
}
@@ -1449,14 +1286,14 @@ void pn_process_link_teardown(pn_transpo
pn_session_state_t *ssn_state = pn_session_state(transport, session);
pn_link_state_t *state = pn_link_state(ssn_state, link);
if (endpoint->local_state == CLOSED && (int32_t) state->local_handle >= 0) {
- pn_init_frame(transport);
- pn_field(transport, DETACH_HANDLE, pn_value("I", state->local_handle));
- pn_field(transport, DETACH_CLOSED, pn_boolean(true));
+ pn_init_frame(transport->disp);
+ pn_field(transport->disp, DETACH_HANDLE, pn_value("I", state->local_handle));
+ pn_field(transport->disp, DETACH_CLOSED, pn_boolean(true));
/* XXX: error
if (condition)
// XXX: symbol
- pn_engine_field(eng, DETACH_ERROR, pn_value("B([zS])", ERROR_CODE, condition, description)); */
- pn_post_frame(transport, ssn_state->local_channel, DETACH_CODE);
+ pn_engine_field(eng, DETACH_ERROR, pn_value("B([zS])", ERROR, condition, description)); */
+ pn_post_frame(transport->disp, ssn_state->local_channel, DETACH);
state->local_handle = -2;
}
}
@@ -1470,11 +1307,11 @@ void pn_process_ssn_teardown(pn_transpor
pn_session_state_t *state = pn_session_state(transport, session);
if (endpoint->local_state == CLOSED && (int16_t) state->local_channel >= 0)
{
- pn_init_frame(transport);
+ pn_init_frame(transport->disp);
/*if (condition)
// XXX: symbol
- pn_engine_field(eng, DETACH_ERROR, pn_value("B([zS])", ERROR_CODE, condition, description));*/
- pn_post_frame(transport, state->local_channel, END_CODE);
+ pn_engine_field(eng, DETACH_ERROR, pn_value("B([zS])", ERROR, condition, description));*/
+ pn_post_frame(transport->disp, state->local_channel, END);
state->local_channel = -2;
}
}
@@ -1485,11 +1322,11 @@ void pn_process_conn_teardown(pn_transpo
if (endpoint->type == CONNECTION)
{
if (endpoint->local_state == CLOSED && !transport->close_sent) {
- pn_init_frame(transport);
+ pn_init_frame(transport->disp);
/*if (condition)
// XXX: symbol
- pn_field(eng, CLOSE_ERROR, pn_value("B([zS])", ERROR_CODE, condition, description));*/
- pn_post_frame(transport, 0, CLOSE_CODE);
+ pn_field(eng, CLOSE_ERROR, pn_value("B([zS])", ERROR, condition, description));*/
+ pn_post_frame(transport->disp, 0, CLOSE);
transport->close_sent = true;
}
}
@@ -1537,16 +1374,13 @@ ssize_t pn_output(pn_transport_t *transp
{
pn_process(transport);
- if (!transport->available && transport->endpoint.local_state == CLOSED) {
+ if (!transport->disp->available && transport->endpoint.local_state == CLOSED) {
return EOS;
}
- int n = transport->available < size ? transport->available : size;
- memmove(bytes, transport->output, n);
- memmove(transport->output, transport->output + n, transport->available - n);
- transport->available -= n;
- // XXX: need to check endpoint for errors
- return n;
+ // XXX: errors?
+
+ return pn_dispatcher_output(transport->disp, bytes, size);
}
ssize_t pn_send(pn_sender_t *sender, const char *bytes, size_t n)
Modified: qpid/proton/proton-c/src/messaging.xml
URL: http://svn.apache.org/viewvc/qpid/proton/proton-c/src/messaging.xml?rev=1299397&r1=1299396&r2=1299397&view=diff
==============================================================================
--- qpid/proton/proton-c/src/messaging.xml (original)
+++ qpid/proton/proton-c/src/messaging.xml Sun Mar 11 17:04:51 2012
@@ -1,13 +1,13 @@
<?xml version="1.0"?>
<!--
-
Copyright Bank of America, N.A., Barclays Bank PLC, Cisco Systems, Credit
-Suisse, Deutsche Boerse Systems, Goldman Sachs, HCL Technologies Ltd, INETCO
-Systems Limited, Informatica Corporation, JPMorgan Chase Bank Inc. N.A,
+Suisse, Deutsche Boerse, Envoy Technologies Inc., Goldman Sachs, HCL
+Technologies Ltd, IIT Software GmbH, iMatix Corporation, INETCO Systems Limited,
+Informatica Corporation, JPMorgan Chase & Co., Kaazing Corporation, N.A,
Microsoft Corporation, my-Channels, Novell, Progress Software, Red Hat Inc.,
Software AG, Solace Systems Inc., StormMQ Ltd., Tervela Inc., TWIST Process
-Innovations Ltd, VMware Inc. and WS02 Inc. 2006-2011. All rights reserved.
+Innovations Ltd, VMware, Inc., and WS02 Inc. 2006-2011. All rights reserved.
Redistribution and use in source and binary forms, with or without
modification, are permitted provided that the following conditions
@@ -90,8 +90,8 @@ THIS SOFTWARE, EVEN IF ADVISED OF THE PO
<section name="delivery-state">
<type name="received" class="composite" source="list" provides="delivery-state">
<descriptor name="amqp:received:list" code="0x00000000:0x00000023"/>
- <field name="section-number" type="uint"/>
- <field name="section-offset" type="ulong"/>
+ <field name="section-number" type="uint" mandatory="true"/>
+ <field name="section-offset" type="ulong" mandatory="true"/>
</type>
<type name="accepted" class="composite" source="list" provides="delivery-state, outcome">
<descriptor name="amqp:accepted:list" code="0x00000000:0x00000024"/>
Modified: qpid/proton/proton-c/src/protocol.h.py
URL: http://svn.apache.org/viewvc/qpid/proton/proton-c/src/protocol.h.py?rev=1299397&r1=1299396&r2=1299397&view=diff
==============================================================================
--- qpid/proton/proton-c/src/protocol.h.py (original)
+++ qpid/proton/proton-c/src/protocol.h.py Sun Mar 11 17:04:51 2012
@@ -39,8 +39,7 @@ for type in TYPES:
print "#define %s_SYM (\"%s\")" % (name, desc["@name"])
hi, lo = [int(x, 0) for x in desc["@code"].split(":")]
code = (hi << 32) + lo
- print "#define %s_CODE (%s)" % (name, code)
- print "#define %s_IDX (%s)" % (name, idx)
+ print "#define %s (%s)" % (name, code)
idx += 1
print
Modified: qpid/proton/proton-c/src/protocol.py
URL: http://svn.apache.org/viewvc/qpid/proton/proton-c/src/protocol.py?rev=1299397&r1=1299396&r2=1299397&view=diff
==============================================================================
--- qpid/proton/proton-c/src/protocol.py (original)
+++ qpid/proton/proton-c/src/protocol.py Sun Mar 11 17:04:51 2012
@@ -20,16 +20,19 @@ import mllib, os, sys
doc = mllib.xml_parse(os.path.join(os.path.dirname(__file__), "transport.xml"))
mdoc = mllib.xml_parse(os.path.join(os.path.dirname(__file__), "messaging.xml"))
+sdoc = mllib.xml_parse(os.path.join(os.path.dirname(__file__), "security.xml"))
def eq(attr, value):
return lambda nd: nd[attr] == value
TYPES = doc.query["amqp/section/type", eq("@class", "composite")] + \
- mdoc.query["amqp/section/type", eq("@class", "composite")]
+ mdoc.query["amqp/section/type", eq("@class", "composite")] + \
+ sdoc.query["amqp/section/type", eq("@class", "composite")]
RESTRICTIONS = {}
COMPOSITES = {}
-for type in doc.query["amqp/section/type"] + mdoc.query["amqp/section/type"]:
+for type in doc.query["amqp/section/type"] + mdoc.query["amqp/section/type"] + \
+ sdoc.query["amqp/section/type"]:
source = type["@source"]
if source:
RESTRICTIONS[type["@name"]] = source
Modified: qpid/proton/proton-c/src/proton.c
URL: http://svn.apache.org/viewvc/qpid/proton/proton-c/src/proton.c?rev=1299397&r1=1299396&r2=1299397&view=diff
==============================================================================
--- qpid/proton/proton-c/src/proton.c (original)
+++ qpid/proton/proton-c/src/proton.c Sun Mar 11 17:04:51 2012
@@ -132,7 +132,7 @@ void server_callback(pn_connection_t *co
ssize_t n = pn_recv(receiver, msg, 1024);
if (n == EOM) {
pn_advance(link);
- pn_disposition(delivery, ACCEPTED);
+ pn_disposition(delivery, PN_ACCEPTED);
break;
} else {
printf("%.*s", (int) n, msg);
@@ -243,7 +243,7 @@ void client_callback(pn_connection_t *co
size_t n = pn_recv(rcv, msg, 1024);
if (n == EOM) {
pn_advance(link);
- pn_disposition(delivery, ACCEPTED);
+ pn_disposition(delivery, PN_ACCEPTED);
pn_settle(delivery);
if (!--ctx->recv_count) {
pn_close((pn_endpoint_t *)link);
Propchange: qpid/proton/proton-c/src/sasl/
------------------------------------------------------------------------------
--- svn:ignore (added)
+++ svn:ignore Sun Mar 11 17:04:51 2012
@@ -0,0 +1 @@
+*.d
Added: qpid/proton/proton-c/src/sasl/sasl-internal.h
URL: http://svn.apache.org/viewvc/qpid/proton/proton-c/src/sasl/sasl-internal.h?rev=1299397&view=auto
==============================================================================
--- qpid/proton/proton-c/src/sasl/sasl-internal.h (added)
+++ qpid/proton/proton-c/src/sasl/sasl-internal.h Sun Mar 11 17:04:51 2012
@@ -0,0 +1,36 @@
+#ifndef _PROTON_SASL_INTERNAL_H
+#define _PROTON_SASL_INTERNAL_H 1
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include <proton/sasl.h>
+#include "../dispatcher/dispatcher.h"
+
+#define SCRATCH (1024)
+
+struct pn_sasl_t {
+ pn_dispatcher_t *disp;
+ pn_sasl_outcome_t outcome;
+ char scratch[SCRATCH];
+};
+
+#endif /* sasl-internal.h */
Added: qpid/proton/proton-c/src/sasl/sasl.c
URL: http://svn.apache.org/viewvc/qpid/proton/proton-c/src/sasl/sasl.c?rev=1299397&view=auto
==============================================================================
--- qpid/proton/proton-c/src/sasl/sasl.c (added)
+++ qpid/proton/proton-c/src/sasl/sasl.c Sun Mar 11 17:04:51 2012
@@ -0,0 +1,102 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include <stdlib.h>
+#include <stdio.h>
+#include <string.h>
+#include <proton/framing.h>
+#include <proton/value.h>
+#include "sasl-internal.h"
+#include "../protocol.h"
+
+void pn_do_init(pn_dispatcher_t *disp);
+void pn_do_mechanisms(pn_dispatcher_t *disp);
+void pn_do_outcome(pn_dispatcher_t *disp);
+
+pn_sasl_t *pn_sasl()
+{
+ pn_sasl_t *sasl = malloc(sizeof(pn_sasl_t));
+ sasl->disp = pn_dispatcher(sasl);
+
+ pn_dispatcher_action(sasl->disp, SASL_INIT, "SASL-INIT", pn_do_init);
+ pn_dispatcher_action(sasl->disp, SASL_MECHANISMS, "SASL-MECHANISMS", pn_do_mechanisms);
+ // XXX: challenge/response
+ pn_dispatcher_action(sasl->disp, SASL_OUTCOME, "SASL-OUTCOME", pn_do_outcome);
+
+ sasl->outcome = SASL_NONE;
+ return sasl;
+}
+
+void pn_sasl_client(pn_sasl_t *sasl, const char *username, const char *password)
+{
+ size_t usize = strlen(username);
+ size_t psize = strlen(password);
+ size_t size = usize + psize + 2;
+ char iresp[size];
+
+ iresp[0] = 0;
+ memmove(iresp + 1, username, usize);
+ iresp[usize + 1] = 0;
+ memmove(iresp + usize + 2, password, psize);
+
+ pn_init_frame(sasl->disp);
+ pn_field(sasl->disp, SASL_INIT_MECHANISM, pn_from_symbol(pn_symbol("PLAIN")));
+ pn_field(sasl->disp, SASL_INIT_INITIAL_RESPONSE, pn_from_binary(pn_binary(iresp, size)));
+ pn_post_frame(sasl->disp, 0, SASL_INIT);
+}
+
+void pn_sasl_destroy(pn_sasl_t *sasl)
+{
+ pn_dispatcher_destroy(sasl->disp);
+}
+
+ssize_t pn_sasl_input(pn_sasl_t *sasl, char *bytes, size_t available)
+{
+ return pn_dispatcher_input(sasl->disp, bytes, available);
+}
+
+ssize_t pn_sasl_output(pn_sasl_t *sasl, char *bytes, size_t size)
+{
+ return pn_dispatcher_output(sasl->disp, bytes, size);
+}
+
+pn_sasl_outcome_t pn_sasl_outcome(pn_sasl_t *sasl)
+{
+ return sasl->outcome;
+}
+
+void pn_do_init(pn_dispatcher_t *disp)
+{
+ //pn_sasl_t *sasl = disp->context;
+
+}
+
+void pn_do_mechanisms(pn_dispatcher_t *disp)
+{
+ //pn_sasl_t *sasl = disp->context;
+
+}
+
+void pn_do_outcome(pn_dispatcher_t *disp)
+{
+ pn_sasl_t *sasl = disp->context;
+ sasl->outcome = pn_to_int32(pn_list_get(disp->args, SASL_OUTCOME_CODE));
+}
Added: qpid/proton/proton-c/src/security.xml
URL: http://svn.apache.org/viewvc/qpid/proton/proton-c/src/security.xml?rev=1299397&view=auto
==============================================================================
--- qpid/proton/proton-c/src/security.xml (added)
+++ qpid/proton/proton-c/src/security.xml Sun Mar 11 17:04:51 2012
@@ -0,0 +1,76 @@
+<?xml version="1.0"?>
+
+<!--
+Copyright Bank of America, N.A., Barclays Bank PLC, Cisco Systems, Credit
+Suisse, Deutsche Boerse, Envoy Technologies Inc., Goldman Sachs, HCL
+Technologies Ltd, IIT Software GmbH, iMatix Corporation, INETCO Systems Limited,
+Informatica Corporation, JPMorgan Chase & Co., Kaazing Corporation, N.A,
+Microsoft Corporation, my-Channels, Novell, Progress Software, Red Hat Inc.,
+Software AG, Solace Systems Inc., StormMQ Ltd., Tervela Inc., TWIST Process
+Innovations Ltd, VMware, Inc., and WS02 Inc. 2006-2011. All rights reserved.
+
+Redistribution and use in source and binary forms, with or without
+modification, are permitted provided that the following conditions
+are met:
+1. Redistributions of source code must retain the above copyright
+notice, this list of conditions and the following disclaimer.
+2. Redistributions in binary form must reproduce the above copyright
+notice, this list of conditions and the following disclaimer in the
+documentation and/or other materials provided with the distribution.
+3. The name of the author may not be used to endorse or promote products
+derived from this software without specific prior written permission.
+
+THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR
+IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
+OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED.
+IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT,
+INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT
+NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF
+THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+-->
+
+<amqp name="security" xmlns="http://www.amqp.org/schema/amqp.xsd">
+ <section name="tls">
+ <definition name="TLS-MAJOR" value="1"/>
+ <definition name="TLS-MINOR" value="0"/>
+ <definition name="TLS-REVISION" value="0"/>
+ </section>
+ <section name="sasl">
+ <type name="sasl-mechanisms" class="composite" source="list" provides="sasl-frame">
+ <descriptor name="amqp:sasl-mechanisms:list" code="0x00000000:0x00000040"/>
+ <field name="sasl-server-mechanisms" type="symbol" mandatory="true" multiple="true"/>
+ </type>
+ <type name="sasl-init" class="composite" source="list" provides="sasl-frame">
+ <descriptor name="amqp:sasl-init:list" code="0x00000000:0x00000041"/>
+ <field name="mechanism" type="symbol" mandatory="true"/>
+ <field name="initial-response" type="binary"/>
+ <field name="hostname" type="string"/>
+ </type>
+ <type name="sasl-challenge" class="composite" source="list" provides="sasl-frame">
+ <descriptor name="amqp:sasl-challenge:list" code="0x00000000:0x00000042"/>
+ <field name="challenge" type="binary" mandatory="true"/>
+ </type>
+ <type name="sasl-response" class="composite" source="list" provides="sasl-frame">
+ <descriptor name="amqp:sasl-response:list" code="0x00000000:0x00000043"/>
+ <field name="response" type="binary" mandatory="true"/>
+ </type>
+ <type name="sasl-outcome" class="composite" source="list" provides="sasl-frame">
+ <descriptor name="amqp:sasl-outcome:list" code="0x00000000:0x00000044"/>
+ <field name="code" type="sasl-code" mandatory="true"/>
+ <field name="additional-data" type="binary"/>
+ </type>
+ <type name="sasl-code" class="restricted" source="ubyte">
+ <choice name="ok" value="0"/>
+ <choice name="auth" value="1"/>
+ <choice name="sys" value="2"/>
+ <choice name="sys-perm" value="3"/>
+ <choice name="sys-temp" value="4"/>
+ </type>
+ <definition name="SASL-MAJOR" value="1"/>
+ <definition name="SASL-MINOR" value="0"/>
+ <definition name="SASL-REVISION" value="0"/>
+ </section>
+</amqp>
Added: qpid/proton/proton-c/src/transactions.xml
URL: http://svn.apache.org/viewvc/qpid/proton/proton-c/src/transactions.xml?rev=1299397&view=auto
==============================================================================
--- qpid/proton/proton-c/src/transactions.xml (added)
+++ qpid/proton/proton-c/src/transactions.xml Sun Mar 11 17:04:51 2012
@@ -0,0 +1,73 @@
+<?xml version="1.0"?>
+
+<!--
+Copyright Bank of America, N.A., Barclays Bank PLC, Cisco Systems, Credit
+Suisse, Deutsche Boerse, Envoy Technologies Inc., Goldman Sachs, HCL
+Technologies Ltd, IIT Software GmbH, iMatix Corporation, INETCO Systems Limited,
+Informatica Corporation, JPMorgan Chase & Co., Kaazing Corporation, N.A,
+Microsoft Corporation, my-Channels, Novell, Progress Software, Red Hat Inc.,
+Software AG, Solace Systems Inc., StormMQ Ltd., Tervela Inc., TWIST Process
+Innovations Ltd, VMware, Inc., and WS02 Inc. 2006-2011. All rights reserved.
+
+Redistribution and use in source and binary forms, with or without
+modification, are permitted provided that the following conditions
+are met:
+1. Redistributions of source code must retain the above copyright
+notice, this list of conditions and the following disclaimer.
+2. Redistributions in binary form must reproduce the above copyright
+notice, this list of conditions and the following disclaimer in the
+documentation and/or other materials provided with the distribution.
+3. The name of the author may not be used to endorse or promote products
+derived from this software without specific prior written permission.
+
+THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR
+IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
+OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED.
+IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT,
+INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT
+NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF
+THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+-->
+
+<amqp name="transactions" xmlns="http://www.amqp.org/schema/amqp.xsd">
+ <section name="coordination">
+ <type name="coordinator" class="composite" source="list" provides="target">
+ <descriptor name="amqp:coordinator:list" code="0x00000000:0x00000030"/>
+ <field name="capabilities" type="symbol" requires="txn-capability" multiple="true"/>
+ </type>
+ <type name="declare" class="composite" source="list">
+ <descriptor name="amqp:declare:list" code="0x00000000:0x00000031"/>
+ <field name="global-id" type="*" requires="global-tx-id"/>
+ </type>
+ <type name="discharge" class="composite" source="list">
+ <descriptor name="amqp:discharge:list" code="0x00000000:0x00000032"/>
+ <field name="txn-id" type="*" mandatory="true" requires="txn-id"/>
+ <field name="fail" type="boolean"/>
+ </type>
+ <type name="transaction-id" class="restricted" source="binary" provides="txn-id"/>
+ <type name="declared" class="composite" source="list" provides="delivery-state, outcome">
+ <descriptor name="amqp:declared:list" code="0x00000000:0x00000033"/>
+ <field name="txn-id" type="*" mandatory="true" requires="txn-id"/>
+ </type>
+ <type name="transactional-state" class="composite" source="list" provides="delivery-state">
+ <descriptor name="amqp:transactional-state:list" code="0x00000000:0x00000034"/>
+ <field name="txn-id" type="*" mandatory="true" requires="txn-id"/>
+ <field name="outcome" type="*" requires="outcome"/>
+ </type>
+ <type name="txn-capability" class="restricted" source="symbol" provides="txn-capability">
+ <choice name="local-transactions" value="amqp:local-transactions"/>
+ <choice name="distributed-transactions" value="amqp:distributed-transactions"/>
+ <choice name="promotable-transactions" value="amqp:promotable-transactions"/>
+ <choice name="multi-txns-per-ssn" value="amqp:multi-txns-per-ssn"/>
+ <choice name="multi-ssns-per-txn" value="amqp:multi-ssns-per-txn"/>
+ </type>
+ <type name="transaction-error" class="restricted" source="symbol" provides="error-condition">
+ <choice name="unknown-id" value="amqp:transaction:unknown-id"/>
+ <choice name="transaction-rollback" value="amqp:transaction:rollback"/>
+ <choice name="transaction-timeout" value="amqp:transaction:timeout"/>
+ </type>
+ </section>
+</amqp>
Modified: qpid/proton/proton-c/src/transport.xml
URL: http://svn.apache.org/viewvc/qpid/proton/proton-c/src/transport.xml?rev=1299397&r1=1299396&r2=1299397&view=diff
==============================================================================
--- qpid/proton/proton-c/src/transport.xml (original)
+++ qpid/proton/proton-c/src/transport.xml Sun Mar 11 17:04:51 2012
@@ -1,13 +1,13 @@
<?xml version="1.0"?>
<!--
-
Copyright Bank of America, N.A., Barclays Bank PLC, Cisco Systems, Credit
-Suisse, Deutsche Boerse Systems, Goldman Sachs, HCL Technologies Ltd, INETCO
-Systems Limited, Informatica Corporation, JPMorgan Chase Bank Inc. N.A,
+Suisse, Deutsche Boerse, Envoy Technologies Inc., Goldman Sachs, HCL
+Technologies Ltd, IIT Software GmbH, iMatix Corporation, INETCO Systems Limited,
+Informatica Corporation, JPMorgan Chase & Co., Kaazing Corporation, N.A,
Microsoft Corporation, my-Channels, Novell, Progress Software, Red Hat Inc.,
Software AG, Solace Systems Inc., StormMQ Ltd., Tervela Inc., TWIST Process
-Innovations Ltd, VMware Inc. and WS02 Inc. 2006-2011. All rights reserved.
+Innovations Ltd, VMware, Inc., and WS02 Inc. 2006-2011. All rights reserved.
Redistribution and use in source and binary forms, with or without
modification, are permitted provided that the following conditions
@@ -33,7 +33,7 @@ THIS SOFTWARE, EVEN IF ADVISED OF THE PO
-->
<amqp name="transport" xmlns="http://www.amqp.org/schema/amqp.xsd">
- <section name="frame-bodies">
+ <section name="performatives">
<type name="open" class="composite" source="list" provides="frame">
<descriptor name="amqp:open:list" code="0x00000000:0x00000010"/>
<field name="container-id" type="string" mandatory="true"/>
Added: qpid/proton/proton-c/src/types/symbol.c
URL: http://svn.apache.org/viewvc/qpid/proton/proton-c/src/types/symbol.c?rev=1299397&view=auto
==============================================================================
--- qpid/proton/proton-c/src/types/symbol.c (added)
+++ qpid/proton/proton-c/src/types/symbol.c Sun Mar 11 17:04:51 2012
@@ -0,0 +1,81 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include <proton/codec.h>
+#include <string.h>
+#include <stdlib.h>
+#include <stdio.h>
+#include "value-internal.h"
+
+pn_symbol_t *pn_symbol(char *name)
+{
+ size_t size = strlen(name);
+ pn_symbol_t *sym = malloc(sizeof(pn_symbol_t) + size + 1);
+ sym->size = size;
+ strcpy(sym->name, name);
+ return sym;
+}
+
+void pn_free_symbol(pn_symbol_t *s)
+{
+ free(s);
+}
+
+size_t pn_symbol_size(pn_symbol_t *s)
+{
+ return s->size;
+}
+
+char *pn_symbol_name(pn_symbol_t *s)
+{
+ return s->name;
+}
+
+uintptr_t pn_hash_symbol(pn_symbol_t *s)
+{
+ uintptr_t hash = 0;
+ for (int i = 0; i < s->size; i++)
+ {
+ hash = 31*hash + s->name[i];
+ }
+ return hash;
+}
+
+int pn_compare_symbol(pn_symbol_t *a, pn_symbol_t *b)
+{
+ if (a->size == b->size)
+ return strncmp(a->name, b->name, a->size);
+ else
+ return b->size - a->size;
+}
+
+pn_symbol_t *pn_symbol_dup(pn_symbol_t *s)
+{
+ return pn_symbol(s->name);
+}
+
+int pn_format_symbol(char **pos, char *limit, pn_symbol_t *sym)
+{
+ if (sym)
+ return pn_fmt(pos, limit, "%s", sym->name);
+ else
+ return pn_fmt(pos, limit, "(null)");
+}
Modified: qpid/proton/proton-c/src/types/value-internal.h
URL: http://svn.apache.org/viewvc/qpid/proton/proton-c/src/types/value-internal.h?rev=1299397&r1=1299396&r2=1299397&view=diff
==============================================================================
--- qpid/proton/proton-c/src/types/value-internal.h (original)
+++ qpid/proton/proton-c/src/types/value-internal.h Sun Mar 11 17:04:51 2012
@@ -24,30 +24,35 @@
#include <proton/value.h>
-struct pn_string_st {
+struct pn_symbol_t {
+ size_t size;
+ char name[];
+};
+
+struct pn_string_t {
size_t size;
wchar_t wcs[];
};
-struct pn_binary_st {
+struct pn_binary_t {
size_t size;
char bytes[];
};
-struct pn_array_st {
+struct pn_array_t {
enum TYPE type;
size_t size;
size_t capacity;
pn_value_t values[];
};
-struct pn_list_st {
+struct pn_list_t {
size_t size;
size_t capacity;
pn_value_t values[];
};
-struct pn_map_st {
+struct pn_map_t {
size_t size;
size_t capacity;
pn_value_t pairs[];
Modified: qpid/proton/proton-c/src/types/value.c
URL: http://svn.apache.org/viewvc/qpid/proton/proton-c/src/types/value.c?rev=1299397&r1=1299396&r2=1299397&view=diff
==============================================================================
--- qpid/proton/proton-c/src/types/value.c (original)
+++ qpid/proton/proton-c/src/types/value.c Sun Mar 11 17:04:51 2012
@@ -63,6 +63,8 @@ int pn_compare_value(pn_value_t a, pn_va
return b.u.as_double - a.u.as_double;
case CHAR:
return b.u.as_char - a.u.as_char;
+ case SYMBOL:
+ return pn_compare_symbol(a.u.as_symbol, b.u.as_symbol);
case STRING:
return pn_compare_string(a.u.as_string, b.u.as_string);
case BINARY:
@@ -106,6 +108,9 @@ uintptr_t pn_hash_value(pn_value_t v)
return v.u.as_double;
case CHAR:
return v.u.as_char;
+ case SYMBOL:
+ // XXX
+ return 0;
case STRING:
return pn_hash_string(v.u.as_string);
case BINARY:
@@ -373,6 +378,11 @@ pn_value_t pn_from_binary(pn_binary_t *b
return (pn_value_t) {.type = BINARY, .u.as_binary = b};
}
+pn_value_t pn_from_symbol(pn_symbol_t *s)
+{
+ return (pn_value_t) {.type = SYMBOL, .u.as_symbol = s};
+}
+
int pn_fmt(char **pos, char *limit, const char *fmt, ...)
{
va_list ap;
@@ -447,6 +457,8 @@ int pn_format_value(char **pos, char *li
case CHAR:
if ((e = pn_fmt(pos, limit, "%lc", v.u.as_char))) return e;
break;
+ case SYMBOL:
+ if ((e = pn_fmt(pos, limit, "%s", v.u.as_symbol))) return e;
case STRING:
if ((e = pn_fmt(pos, limit, "%ls", v.u.as_string->wcs))) return e;
break;
@@ -574,6 +586,8 @@ size_t pn_encode_sizeof(pn_value_t v)
case ULONG:
case DOUBLE:
return 9;
+ case SYMBOL:
+ return 5 + v.u.as_symbol->size;
case STRING:
return 5 + 4*v.u.as_string->size;
case BINARY:
@@ -642,6 +656,9 @@ size_t pn_encode(pn_value_t v, char *out
case DOUBLE:
pn_write_double(&out, out + size, v.u.as_double);
return 9;
+ case SYMBOL:
+ pn_write_symbol(&out, out + size, v.u.as_symbol->size, v.u.as_symbol->name);
+ return out - old;
case STRING:
cd = iconv_open("UTF-8", "WCHAR_T");
insize = 4*v.u.as_string->size;
@@ -686,6 +703,7 @@ void pn_free_value(pn_value_t v)
case ULONG:
case DOUBLE:
case REF:
+ case SYMBOL:
break;
case STRING:
pn_free_string(v.u.as_string);
@@ -728,6 +746,7 @@ void pn_visit(pn_value_t v, void (*visit
case STRING:
case BINARY:
case REF:
+ case SYMBOL:
break;
case ARRAY:
pn_visit_array(v.u.as_array, visitor);
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org