You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rh...@apache.org on 2014/11/28 14:50:00 UTC
[22/51] [abbrv] qpid-proton git commit: Update with merge of latest
proton codebase and checked against latest emscripten incoming branch
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/1c2f4894/proton-c/bindings/python/setup.py.in
----------------------------------------------------------------------
diff --git a/proton-c/bindings/python/setup.py.in b/proton-c/bindings/python/setup.py.in
new file mode 100644
index 0000000..94f3dfc
--- /dev/null
+++ b/proton-c/bindings/python/setup.py.in
@@ -0,0 +1,107 @@
+#!/usr/bin/env python
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+from distutils.core import setup, Extension
+import logging
+import os
+import sys
+
+_c_module = '@SWIG_MODULE_cproton_REAL_NAME@'
+_src_file = '@PN_SWIG_PYTHON_C_WRAPPER@'
+_version = '@PN_VERSION@'
+_release = 0
+
+if "--proton-install-prefix" in sys.argv:
+ # special option used only if the python headers and library have been
+ # installed to a non-standard directory. This can be done during 'make
+ # install' from the proton build tree by using the cmake option
+ # -DCMAKE_INSTALL_PREFIX. The location of the headers and library must be
+ # specified so we can build the binding's C extension.
+ i = sys.argv.index("--proton-install-prefix") + 1
+ if i >= len(sys.argv):
+ raise ValueError("--proton-install-prefix requires a path parameter.")
+ _prefix = sys.argv[i]
+ # remove the proton arguments to they don't conflict with setup.py's other
+ # command arguments:
+ del sys.argv[i]
+ sys.argv.remove("--proton-install-prefix")
+ _destdir = os.environ.get("DESTDIR", "")
+ if _destdir and os.path.isabs(_prefix):
+ # DESTDIR may be used on unix systems to put the entire install tree
+ # under a particular directory. However, if _prefix is an absolute
+ # path, os.path.join will discard DESTDIR, so strip off the leading
+ # separator
+ _prefix = _prefix.lstrip(os.path.sep)
+
+ _inc_dir = os.path.join(_destdir,
+ _prefix,
+ '@INCLUDE_INSTALL_DIR@')
+ _lib_dir = os.path.join(_destdir,
+ _prefix,
+ '@LIB_INSTALL_DIR@')
+
+ swig_ext = Extension(_c_module, [_src_file],
+ libraries=['qpid-proton'],
+ include_dirs=[_inc_dir],
+ library_dirs=[_lib_dir])
+else:
+ swig_ext = Extension(_c_module, [_src_file],
+ libraries=['qpid-proton'])
+
+_help_description = """Before you can build or install these bindings, you must
+first install version @PN_VERSION@ of the Proton development library
+(libqpid-proton) and its C header files. These files must be available in order
+to build this packages' C-based extension.
+
+Packages for the Proton development library may be provided by your system's
+distribution. For example, the qpid-proton-c-devel RPM is available for
+Centos/RHEL via EPEL. A libqpid-proton2-dev deb file is available for Ubuntu
+via the Apache Qpid PPA (ppa:qpid/released).
+
+If your distribution does not make these packages available, you can download
+the Proton sources directly from the Apache Qpid project:
+
+ http://qpid.apache.org
+
+This package is compatible with the @PN_VERSION@ release of the Proton
+development library.
+
+If you need additional help, see http://qpid.apache.org/discussion.html
+"""
+
+_long_description = """This package contains the Python bindings for the Apache
+QPID Proton library.\n%s""" % _help_description
+
+try:
+ setup(name="python-qpid-proton",
+ version="%s-%d" % (_version, _release),
+ author="Apache Qpid",
+ author_email="dev@qpid.apache.org",
+ py_modules=["proton", "cproton"],
+ url="http://qpid.apache.org/",
+ description="Python bindings for the Proton library",
+ long_description=_long_description,
+ license="Apache Software License",
+ classifiers=["License :: OSI Approved :: Apache Software License",
+ "Intended Audience :: Developers",
+ "Programming Language :: Python"],
+ ext_modules=[swig_ext])
+except:
+ logging.error("setup failed!\n%s", _help_description)
+ raise
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/1c2f4894/proton-c/bindings/ruby/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/proton-c/bindings/ruby/CMakeLists.txt b/proton-c/bindings/ruby/CMakeLists.txt
index 9336abf..e77e4de 100644
--- a/proton-c/bindings/ruby/CMakeLists.txt
+++ b/proton-c/bindings/ruby/CMakeLists.txt
@@ -20,7 +20,9 @@
if (NOT DEFAULT_RUBY_TESTING)
message(FATAL_ERROR "Ruby bindings cannot be tested while missing dependencies")
endif (NOT DEFAULT_RUBY_TESTING)
-
+list(APPEND SWIG_MODULE_cproton-ruby_EXTRA_DEPS
+ ${CMAKE_SOURCE_DIR}/proton-c/include/proton/cproton.i
+)
include_directories (${RUBY_INCLUDE_PATH})
swig_add_module(cproton-ruby ruby ruby.i)
swig_link_libraries(cproton-ruby ${BINDING_DEPS} ${RUBY_LIBRARY})
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/1c2f4894/proton-c/bindings/ruby/lib/qpid_proton.rb
----------------------------------------------------------------------
diff --git a/proton-c/bindings/ruby/lib/qpid_proton.rb b/proton-c/bindings/ruby/lib/qpid_proton.rb
index cf044f6..e28c684 100644
--- a/proton-c/bindings/ruby/lib/qpid_proton.rb
+++ b/proton-c/bindings/ruby/lib/qpid_proton.rb
@@ -20,6 +20,7 @@
require "cproton"
require "date"
+require "qpid_proton/version"
require "qpid_proton/described"
require "qpid_proton/mapping"
require "qpid_proton/array"
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/1c2f4894/proton-c/bindings/ruby/lib/qpid_proton/version.rb
----------------------------------------------------------------------
diff --git a/proton-c/bindings/ruby/lib/qpid_proton/version.rb b/proton-c/bindings/ruby/lib/qpid_proton/version.rb
new file mode 100644
index 0000000..cd30bf0
--- /dev/null
+++ b/proton-c/bindings/ruby/lib/qpid_proton/version.rb
@@ -0,0 +1,29 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+module Qpid
+
+ module Proton
+
+ PN_VERSION_MAJOR = Cproton::PN_VERSION_MAJOR
+ PN_VERSION_MINOR = Cproton::PN_VERSION_MINOR
+
+ end
+
+end
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/1c2f4894/proton-c/bindings/ruby/qpid_proton.gemspec
----------------------------------------------------------------------
diff --git a/proton-c/bindings/ruby/qpid_proton.gemspec b/proton-c/bindings/ruby/qpid_proton.gemspec
index d193563..f61d14c 100644
--- a/proton-c/bindings/ruby/qpid_proton.gemspec
+++ b/proton-c/bindings/ruby/qpid_proton.gemspec
@@ -8,6 +8,7 @@ system "swig -ruby -I../../include -o ext/cproton/cproton.c ruby.i"
Gem::Specification.new do |s|
s.name = "qpid_proton"
s.version = "0.3"
+ s.licenses = ['Apache-2.0']
s.platform = Gem::Platform::RUBY
s.authors = ["Darryl L. Pierce"]
s.email = ["proton@qpid.apache.org"]
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/1c2f4894/proton-c/bindings/ruby/ruby.i
----------------------------------------------------------------------
diff --git a/proton-c/bindings/ruby/ruby.i b/proton-c/bindings/ruby/ruby.i
index fd11786..502fa92 100644
--- a/proton-c/bindings/ruby/ruby.i
+++ b/proton-c/bindings/ruby/ruby.i
@@ -344,5 +344,4 @@ bool pn_ssl_get_cipher_name(pn_ssl_t *ssl, char *OUTPUT, size_t MAX_OUTPUT_SIZE)
bool pn_ssl_get_protocol_name(pn_ssl_t *ssl, char *OUTPUT, size_t MAX_OUTPUT_SIZE);
%ignore pn_ssl_get_protocol_name;
-
%include "proton/cproton.i"
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/1c2f4894/proton-c/docs/man/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/proton-c/docs/man/CMakeLists.txt b/proton-c/docs/man/CMakeLists.txt
index d907a67..bd33e90 100644
--- a/proton-c/docs/man/CMakeLists.txt
+++ b/proton-c/docs/man/CMakeLists.txt
@@ -17,6 +17,6 @@
# under the License.
#
-INSTALL (FILES proton.1
+INSTALL (FILES proton.1 proton-dump.1
DESTINATION ${MAN_INSTALL_DIR}/man1)
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/1c2f4894/proton-c/docs/man/proton-dump.1
----------------------------------------------------------------------
diff --git a/proton-c/docs/man/proton-dump.1 b/proton-c/docs/man/proton-dump.1
new file mode 100644
index 0000000..0920d62
--- /dev/null
+++ b/proton-c/docs/man/proton-dump.1
@@ -0,0 +1,19 @@
+.\" DO NOT MODIFY THIS FILE! It was generated by help2man 1.44.1.
+.TH USAGE: "1" "August 2014" "Usage: proton-dump [FILE1] [FILEn] ..." "User Commands"
+.SH NAME
+proton-dump - display the contents of an AMQP dump file containing frame data
+.SH SYNOPSIS
+.B proton-dump
+[\fIFILE1\fR] [\fIFILEn\fR] ...
+.SH DESCRIPTION
+Displays the content of an AMQP dump file containing frame data.
+.TP
+[FILEn]
+Dump file to be displayed.
+.PP
+Displays the content of an AMQP dump file containing frame data.
+.TP
+[FILEn]
+Dump file to be displayed.
+.SH "SEE ALSO"
+proton(1)
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/1c2f4894/proton-c/include/proton/buffer.h
----------------------------------------------------------------------
diff --git a/proton-c/include/proton/buffer.h b/proton-c/include/proton/buffer.h
index a3cf843..26d4bb3 100644
--- a/proton-c/include/proton/buffer.h
+++ b/proton-c/include/proton/buffer.h
@@ -29,6 +29,11 @@
extern "C" {
#endif
+typedef struct {
+ size_t size;
+ char *start;
+} pn_buffer_memory_t;
+
typedef struct pn_buffer_t pn_buffer_t;
PN_EXTERN pn_buffer_t *pn_buffer(size_t capacity);
@@ -44,6 +49,7 @@ PN_EXTERN int pn_buffer_trim(pn_buffer_t *buf, size_t left, size_t right);
PN_EXTERN void pn_buffer_clear(pn_buffer_t *buf);
PN_EXTERN int pn_buffer_defrag(pn_buffer_t *buf);
PN_EXTERN pn_bytes_t pn_buffer_bytes(pn_buffer_t *buf);
+PN_EXTERN pn_buffer_memory_t pn_buffer_memory(pn_buffer_t *buf);
PN_EXTERN int pn_buffer_print(pn_buffer_t *buf);
#ifdef __cplusplus
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/1c2f4894/proton-c/include/proton/cproton.i
----------------------------------------------------------------------
diff --git a/proton-c/include/proton/cproton.i b/proton-c/include/proton/cproton.i
index cef8b1f..1a9ad7f 100644
--- a/proton-c/include/proton/cproton.i
+++ b/proton-c/include/proton/cproton.i
@@ -30,6 +30,26 @@ typedef long long int int64_t;
/* Parse these interface header files to generate APIs for script languages */
%include "proton/import_export.h"
+
+%ignore _PROTON_VERSION_H;
+%include "proton/version.h"
+
+/* We cannot safely just wrap pn_bytes_t but each language binding must have a typemap for it - presumably to a string type */
+%ignore pn_bytes_t;
+
+/* There is no need to wrap pn_class_t aa it is an internal implementation detail and cannot be used outside the library */
+%ignore pn_class_t;
+
+/* Ignore C APIs related to pn_atom_t - they can all be achieved with pn_data_t */
+%ignore pn_atom_t;
+%ignore pn_atom_t_u; /* Seem to need this even though its nested in pn_atom_t */
+%ignore pn_data_get_atom;
+%ignore pn_data_put_atom;
+
+%ignore pn_delivery_tag_t;
+%ignore pn_decimal128_t;
+%ignore pn_uuid_t;
+
%include "proton/types.h"
%ignore pn_string_vformat;
%ignore pn_string_vaddf;
@@ -60,7 +80,7 @@ typedef long long int int64_t;
%aggregate_check(int, check_sasl_outcome,
PN_SASL_NONE, PN_SASL_OK, PN_SASL_AUTH,
- PN_SASL_SYS, PN_SASL_PERM, PN_SASL_TEMP);
+ PN_SASL_SYS, PN_SASL_PERM, PN_SASL_TEMP, PN_SASL_SKIPPED);
%aggregate_check(int, check_sasl_state,
PN_SASL_CONF, PN_SASL_IDLE, PN_SASL_STEP,
@@ -982,6 +1002,12 @@ typedef long long int int64_t;
sasl != NULL;
}
+%contract pn_sasl_allow_skip(pn_sasl_t *sasl, bool allow)
+{
+ require:
+ sasl != NULL;
+}
+
%contract pn_sasl_plain(pn_sasl_t *sasl, const char *username, const char *password)
{
require:
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/1c2f4894/proton-c/include/proton/event.h
----------------------------------------------------------------------
diff --git a/proton-c/include/proton/event.h b/proton-c/include/proton/event.h
index fdb2803..c57c77d 100644
--- a/proton-c/include/proton/event.h
+++ b/proton-c/include/proton/event.h
@@ -81,8 +81,12 @@ typedef struct pn_event_t pn_event_t;
*/
typedef enum {
PN_EVENT_CATEGORY_NONE = 0,
- PN_EVENT_CATEGORY_PROTOCOL = 0x00010000,
- PN_EVENT_CATEGORY_COUNT = 2
+ PN_EVENT_CATEGORY_CONNECTION = 0x00010000,
+ PN_EVENT_CATEGORY_SESSION = 0x00020000,
+ PN_EVENT_CATEGORY_LINK = 0x00030000,
+ PN_EVENT_CATEGORY_DELIVERY = 0x00040000,
+ PN_EVENT_CATEGORY_TRANSPORT = 0x00050000,
+ PN_EVENT_CATEGORY_COUNT = 6
} pn_event_category_t;
/**
@@ -94,45 +98,137 @@ typedef enum {
* ever be generated.
*/
PN_EVENT_NONE = 0,
+
+ /**
+ * The connection has been created. This is the first event that
+ * will ever be issued for a connection. Events of this type point
+ * to the relevant connection.
+ */
+ PN_CONNECTION_INIT = PN_EVENT_CATEGORY_CONNECTION + 1,
+
+ /**
+ * The local connection endpoint has been closed. Events of this
+ * type point to the relevant connection.
+ */
+ PN_CONNECTION_OPEN = PN_EVENT_CATEGORY_CONNECTION + 2,
+
+ /**
+ * The remote endpoint has opened the connection. Events of this
+ * type point to the relevant connection.
+ */
+ PN_CONNECTION_REMOTE_OPEN = PN_EVENT_CATEGORY_CONNECTION + 3,
+
+ /**
+ * The local connection endpoint has been closed. Events of this
+ * type point to the relevant connection.
+ */
+ PN_CONNECTION_CLOSE = PN_EVENT_CATEGORY_CONNECTION + 4,
+
+ /**
+ * The remote endpoint has closed the connection. Events of this
+ * type point to the relevant connection.
+ */
+ PN_CONNECTION_REMOTE_CLOSE = PN_EVENT_CATEGORY_CONNECTION + 5,
+
+ /**
+ * The connection has been freed and any outstanding processing has
+ * been completed. This is the final event that will ever be issued
+ * for a connection.
+ */
+ PN_CONNECTION_FINAL = PN_EVENT_CATEGORY_CONNECTION + 6,
+
+ /**
+ * The session has been created. This is the first event that will
+ * ever be issued for a session.
+ */
+ PN_SESSION_INIT = PN_EVENT_CATEGORY_SESSION + 1,
+
/**
- * The endpoint state flags for a connection have changed. Events of
- * this type point to the relevant connection as well as its
- * associated transport.
+ * The local session endpoint has been opened. Events of this type
+ * point ot the relevant session.
*/
- PN_CONNECTION_REMOTE_STATE = PN_EVENT_CATEGORY_PROTOCOL+1,
- PN_CONNECTION_LOCAL_STATE = PN_EVENT_CATEGORY_PROTOCOL+2,
+ PN_SESSION_OPEN = PN_EVENT_CATEGORY_SESSION + 2,
+
/**
- * The endpoint state flags for a session have changed. Events of
- * this type point to the relevant session as well as its associated
- * connection and transport.
+ * The remote endpoint has opened the session. Events of this type
+ * point to the relevant session.
*/
- PN_SESSION_REMOTE_STATE = PN_EVENT_CATEGORY_PROTOCOL+3,
- PN_SESSION_LOCAL_STATE = PN_EVENT_CATEGORY_PROTOCOL+4,
+ PN_SESSION_REMOTE_OPEN = PN_EVENT_CATEGORY_SESSION + 3,
+
/**
- * The endpoint state flags for a link have changed. Events of this
- * type point to the relevant link as well as its associated
- * session, connection, and transport.
+ * The local session endpoint has been closed. Events of this type
+ * point ot the relevant session.
*/
- PN_LINK_REMOTE_STATE = PN_EVENT_CATEGORY_PROTOCOL+5,
- PN_LINK_LOCAL_STATE = PN_EVENT_CATEGORY_PROTOCOL+6,
+ PN_SESSION_CLOSE = PN_EVENT_CATEGORY_SESSION + 4,
+
+ /**
+ * The remote endpoint has closed the session. Events of this type
+ * point to the relevant session.
+ */
+ PN_SESSION_REMOTE_CLOSE = PN_EVENT_CATEGORY_SESSION + 5,
+
+ /**
+ * The session has been freed and any outstanding processing has
+ * been completed. This is the final event that will ever be issued
+ * for a session.
+ */
+ PN_SESSION_FINAL = PN_EVENT_CATEGORY_SESSION + 6,
+
+ /**
+ * The link has been created. This is the first event that will ever
+ * be issued for a link.
+ */
+ PN_LINK_INIT = PN_EVENT_CATEGORY_LINK + 1,
+
+ /**
+ * The local link endpoint has been opened. Events of this type
+ * point ot the relevant link.
+ */
+ PN_LINK_OPEN = PN_EVENT_CATEGORY_LINK + 2,
+
+ /**
+ * The remote endpoint has opened the link. Events of this type
+ * point to the relevant link.
+ */
+ PN_LINK_REMOTE_OPEN = PN_EVENT_CATEGORY_LINK + 3,
+
+ /**
+ * The local link endpoint has been closed. Events of this type
+ * point ot the relevant link.
+ */
+ PN_LINK_CLOSE = PN_EVENT_CATEGORY_LINK + 4,
+
+ /**
+ * The remote endpoint has closed the link. Events of this type
+ * point to the relevant link.
+ */
+ PN_LINK_REMOTE_CLOSE = PN_EVENT_CATEGORY_LINK + 5,
+
/**
* The flow control state for a link has changed. Events of this
- * type point to the relevant link along with its associated
- * session, connection, and transport.
+ * type point to the relevant link.
*/
- PN_LINK_FLOW = PN_EVENT_CATEGORY_PROTOCOL+7,
+ PN_LINK_FLOW = PN_EVENT_CATEGORY_LINK + 6,
+
+ /**
+ * The link has been freed and any outstanding processing has been
+ * completed. This is the final event that will ever be issued for a
+ * link. Events of this type point to the relevant link.
+ */
+ PN_LINK_FINAL = PN_EVENT_CATEGORY_LINK + 7,
+
/**
* A delivery has been created or updated. Events of this type point
- * to the relevant delivery as well as its associated link, session,
- * connection, and transport.
+ * to the relevant delivery.
*/
- PN_DELIVERY = PN_EVENT_CATEGORY_PROTOCOL+8,
+ PN_DELIVERY = PN_EVENT_CATEGORY_DELIVERY + 1,
+
/**
* The transport has new data to read and/or write. Events of this
- * type point to the relevant transport as well as its associated
- * connection.
+ * type point to the relevant transport.
*/
- PN_TRANSPORT = PN_EVENT_CATEGORY_PROTOCOL+9
+ PN_TRANSPORT = PN_EVENT_CATEGORY_TRANSPORT + 1
+
} pn_event_type_t;
/**
@@ -198,6 +294,11 @@ PN_EXTERN pn_event_type_t pn_event_type(pn_event_t *event);
PN_EXTERN pn_event_category_t pn_event_category(pn_event_t *event);
/**
+ * Get the context associated with an event.
+ */
+PN_EXTERN void *pn_event_context(pn_event_t *event);
+
+/**
* Get the connection associated with an event.
*
* @param[in] event an event object
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/1c2f4894/proton-c/include/proton/io.h
----------------------------------------------------------------------
diff --git a/proton-c/include/proton/io.h b/proton-c/include/proton/io.h
index fffc09a..2d56736 100644
--- a/proton-c/include/proton/io.h
+++ b/proton-c/include/proton/io.h
@@ -44,6 +44,7 @@ typedef int pn_socket_t;
#endif
typedef struct pn_io_t pn_io_t;
+typedef struct pn_selector_t pn_selector_t;
PN_EXTERN pn_io_t *pn_io(void);
PN_EXTERN void pn_io_free(pn_io_t *io);
@@ -58,6 +59,7 @@ PN_EXTERN int pn_pipe(pn_io_t *io, pn_socket_t *dest);
PN_EXTERN ssize_t pn_read(pn_io_t *io, pn_socket_t socket, void *buf, size_t size);
PN_EXTERN ssize_t pn_write(pn_io_t *io, pn_socket_t socket, const void *buf, size_t size);
PN_EXTERN bool pn_wouldblock(pn_io_t *io);
+PN_EXTERN pn_selector_t *pn_io_selector(pn_io_t *io);
#ifdef __cplusplus
}
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/1c2f4894/proton-c/include/proton/object.h
----------------------------------------------------------------------
diff --git a/proton-c/include/proton/object.h b/proton-c/include/proton/object.h
index dc4983a..9f6e63c 100644
--- a/proton-c/include/proton/object.h
+++ b/proton-c/include/proton/object.h
@@ -43,6 +43,7 @@ typedef void *(*pn_iterator_next_t)(void *state);
typedef struct pn_iterator_t pn_iterator_t;
typedef struct {
+ const char *name;
void (*initialize)(void *);
void (*finalize)(void *);
uintptr_t (*hashcode)(void *);
@@ -51,6 +52,7 @@ typedef struct {
} pn_class_t;
#define PN_CLASS(PREFIX) { \
+ #PREFIX, \
PREFIX ## _initialize, \
PREFIX ## _finalize, \
PREFIX ## _hashcode, \
@@ -58,14 +60,17 @@ typedef struct {
PREFIX ## _inspect \
}
-PN_EXTERN void *pn_new(size_t size, pn_class_t *clazz);
-PN_EXTERN void pn_initialize(void *object, pn_class_t *clazz);
+PN_EXTERN void *pn_new(size_t size, const pn_class_t* clazz);
+PN_EXTERN void *pn_new2(size_t size, const pn_class_t* clazz, void *from);
+PN_EXTERN void pn_initialize(void *object, const pn_class_t *clazz);
PN_EXTERN void *pn_incref(void *object);
+PN_EXTERN void *pn_incref2(void *object, void *from);
PN_EXTERN void pn_decref(void *object);
+PN_EXTERN void pn_decref2(void *object, void *from);
PN_EXTERN int pn_refcount(void *object);
PN_EXTERN void pn_finalize(void *object);
PN_EXTERN void pn_free(void *object);
-PN_EXTERN pn_class_t *pn_class(void *object);
+PN_EXTERN const pn_class_t *pn_class(void* object);
PN_EXTERN uintptr_t pn_hashcode(void *object);
PN_EXTERN intptr_t pn_compare(void *a, void *b);
PN_EXTERN bool pn_equals(void *a, void *b);
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/1c2f4894/proton-c/include/proton/sasl.h
----------------------------------------------------------------------
diff --git a/proton-c/include/proton/sasl.h b/proton-c/include/proton/sasl.h
index 0cd9141..1f132e6 100644
--- a/proton-c/include/proton/sasl.h
+++ b/proton-c/include/proton/sasl.h
@@ -54,7 +54,8 @@ typedef enum {
PN_SASL_AUTH=1, /** failed due to bad credentials */
PN_SASL_SYS=2, /** failed due to a system error */
PN_SASL_PERM=3, /** failed due to unrecoverable error */
- PN_SASL_TEMP=4 /** failed due to transient error */
+ PN_SASL_TEMP=4, /** failed due to transient error */
+ PN_SASL_SKIPPED=5 /** the peer didn't perform the sasl exchange */
} pn_sasl_outcome_t;
/** The state of the SASL negotiation process */
@@ -113,6 +114,18 @@ PN_EXTERN void pn_sasl_client(pn_sasl_t *sasl);
*/
PN_EXTERN void pn_sasl_server(pn_sasl_t *sasl);
+/** Configure a SASL server layer to permit the client to skip the SASL exchange.
+ *
+ * If the peer client skips the SASL exchange (i.e. goes right to the AMQP header)
+ * this server layer will succeed and result in the outcome of PN_SASL_SKIPPED.
+ * The default behavior is to fail and close the connection if the client skips
+ * SASL.
+ *
+ * @param[in] sasl the SASL layer to configure
+ * @param[in] allow true -> allow skip; false -> forbid skip
+ */
+ PN_EXTERN void pn_sasl_allow_skip(pn_sasl_t *sasl, bool allow);
+
/** Configure the SASL layer to use the "PLAIN" mechanism.
*
* A utility function to configure a simple client SASL layer using
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/1c2f4894/proton-c/include/proton/selector.h
----------------------------------------------------------------------
diff --git a/proton-c/include/proton/selector.h b/proton-c/include/proton/selector.h
index 37370d4..7d599b0 100644
--- a/proton-c/include/proton/selector.h
+++ b/proton-c/include/proton/selector.h
@@ -34,9 +34,7 @@ extern "C" {
#define PN_WRITABLE (2)
#define PN_EXPIRED (4)
-typedef struct pn_selector_t pn_selector_t;
-
-PN_EXTERN pn_selector_t *pn_selector(void);
+pn_selector_t *pni_selector(void);
PN_EXTERN void pn_selector_free(pn_selector_t *selector);
PN_EXTERN void pn_selector_add(pn_selector_t *selector, pn_selectable_t *selectable);
PN_EXTERN void pn_selector_update(pn_selector_t *selector, pn_selectable_t *selectable);
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/1c2f4894/proton-c/include/proton/terminus.h
----------------------------------------------------------------------
diff --git a/proton-c/include/proton/terminus.h b/proton-c/include/proton/terminus.h
index 9c9096b..3765b28 100644
--- a/proton-c/include/proton/terminus.h
+++ b/proton-c/include/proton/terminus.h
@@ -90,10 +90,10 @@ typedef enum {
* counting down.
*/
typedef enum {
- PN_LINK_CLOSE, /**< the terminus is orphaned when the parent link is closed */
- PN_SESSION_CLOSE, /**< the terminus is orphaned when the parent session is closed */
- PN_CONNECTION_CLOSE, /**< the terminus is orphaned when the parent connection is closed */
- PN_NEVER /**< the terminus is never considered orphaned */
+ PN_EXPIRE_WITH_LINK, /**< the terminus is orphaned when the parent link is closed */
+ PN_EXPIRE_WITH_SESSION, /**< the terminus is orphaned when the parent session is closed */
+ PN_EXPIRE_WITH_CONNECTION, /**< the terminus is orphaned when the parent connection is closed */
+ PN_EXPIRE_NEVER /**< the terminus is never considered orphaned */
} pn_expiry_policy_t;
/**
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/1c2f4894/proton-c/include/proton/transport.h
----------------------------------------------------------------------
diff --git a/proton-c/include/proton/transport.h b/proton-c/include/proton/transport.h
index 1fa24c8..abe2853 100644
--- a/proton-c/include/proton/transport.h
+++ b/proton-c/include/proton/transport.h
@@ -328,16 +328,17 @@ PN_EXTERN char *pn_transport_tail(pn_transport_t *transport);
*
* This is equivalent to copying @c size bytes afther the tail pointer
* and then calling ::pn_transport_process with an argument of @c
- * size. It is an error to call this with a @c size larger than the
- * capacity reported by ::pn_transport_capacity.
+ * size. Only some of the bytes will be copied if there is
+ * insufficienty capacity available. Use ::pn_transport_capacity to
+ * determine how much capacity the transport has.
*
* @param[in] transport the transport
* @param[in] src the start of the data to push into the transport
* @param[in] size the amount of data to push into the transport
*
- * @return 0 on success, or error code if < 0
+ * @return the number of bytes pushed on success, or error code if < 0
*/
-PN_EXTERN int pn_transport_push(pn_transport_t *transport, const char *src, size_t size);
+PN_EXTERN ssize_t pn_transport_push(pn_transport_t *transport, const char *src, size_t size);
/**
* Process input data following the tail pointer.
@@ -404,9 +405,9 @@ PN_EXTERN const char *pn_transport_head(pn_transport_t *transport);
* @param[in] transport the transport
* @param[out] dst the destination buffer
* @param[in] size the capacity of the destination buffer
- * @return 0 on success, or error code if < 0
+ * @return number of bytes copied on success, or error code if < 0
*/
-PN_EXTERN int pn_transport_peek(pn_transport_t *transport, char *dst, size_t size);
+PN_EXTERN ssize_t pn_transport_peek(pn_transport_t *transport, char *dst, size_t size);
/**
* Removes @c size bytes of output from the pending output queue
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/1c2f4894/proton-c/include/proton/types.h
----------------------------------------------------------------------
diff --git a/proton-c/include/proton/types.h b/proton-c/include/proton/types.h
index 4182f25..d15b745 100644
--- a/proton-c/include/proton/types.h
+++ b/proton-c/include/proton/types.h
@@ -23,6 +23,7 @@
*/
#include <proton/import_export.h>
+#include <stddef.h>
#include <sys/types.h>
#include <proton/type_compat.h>
@@ -58,11 +59,10 @@ typedef struct {
typedef struct {
size_t size;
- char *start;
+ const char *start;
} pn_bytes_t;
-PN_EXTERN pn_bytes_t pn_bytes(size_t size, char *start);
-PN_EXTERN pn_bytes_t pn_bytes_dup(size_t size, const char *start);
+PN_EXTERN pn_bytes_t pn_bytes(size_t size, const char *start);
/** @}
*/
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/1c2f4894/proton-c/src/buffer.c
----------------------------------------------------------------------
diff --git a/proton-c/src/buffer.c b/proton-c/src/buffer.c
index 1371831..b69034b 100644
--- a/proton-c/src/buffer.c
+++ b/proton-c/src/buffer.c
@@ -273,6 +273,18 @@ pn_bytes_t pn_buffer_bytes(pn_buffer_t *buf)
}
}
+pn_buffer_memory_t pn_buffer_memory(pn_buffer_t *buf)
+{
+ if (buf) {
+ pn_buffer_defrag(buf);
+ pn_buffer_memory_t r = {buf->size, buf->bytes};
+ return r;
+ } else {
+ pn_buffer_memory_t r = {0, NULL};
+ return r;
+ }
+}
+
int pn_buffer_print(pn_buffer_t *buf)
{
printf("pn_buffer(\"");
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/1c2f4894/proton-c/src/codec/codec.c
----------------------------------------------------------------------
diff --git a/proton-c/src/codec/codec.c b/proton-c/src/codec/codec.c
index 9370129..0660032 100644
--- a/proton-c/src/codec/codec.c
+++ b/proton-c/src/codec/codec.c
@@ -92,7 +92,7 @@ static void pn_data_finalize(void *object)
pn_free(data->encoder);
}
-static pn_fields_t *pni_node_fields(pn_data_t *data, pni_node_t *node)
+static const pn_fields_t *pni_node_fields(pn_data_t *data, pni_node_t *node)
{
if (!node) return NULL;
if (node->atom.type != PN_DESCRIBED) return NULL;
@@ -103,8 +103,9 @@ static pn_fields_t *pni_node_fields(pn_data_t *data, pni_node_t *node)
return NULL;
}
- if (descriptor->atom.u.as_ulong < 256) {
- return &FIELDS[descriptor->atom.u.as_ulong];
+ if (descriptor->atom.u.as_ulong >= FIELD_MIN && descriptor->atom.u.as_ulong <= FIELD_MAX) {
+ const pn_fields_t *f = &FIELDS[descriptor->atom.u.as_ulong-FIELD_MIN];
+ return (f->name_index!=0) ? f : NULL;
} else {
return NULL;
}
@@ -233,9 +234,16 @@ int pni_inspect_atom(pn_atom_t *atom, pn_string_t *str)
if (quote) if ((err = pn_string_addf(str, "\""))) return err;
return 0;
}
+ case PN_LIST:
+ return pn_string_addf(str, "<list>");
+ case PN_MAP:
+ return pn_string_addf(str, "<map>");
+ case PN_ARRAY:
+ return pn_string_addf(str, "<array>");
+ case PN_DESCRIBED:
+ return pn_string_addf(str, "<described>");
default:
- assert(false);
- return PN_ERR;
+ return pn_string_addf(str, "<undefined: %i>", atom->type);
}
}
@@ -245,9 +253,9 @@ int pni_inspect_enter(void *ctx, pn_data_t *data, pni_node_t *node)
pn_atom_t *atom = (pn_atom_t *) &node->atom;
pni_node_t *parent = pn_data_node(data, node->parent);
- pn_fields_t *fields = pni_node_fields(data, parent);
+ const pn_fields_t *fields = pni_node_fields(data, parent);
pni_node_t *grandparent = parent ? pn_data_node(data, parent->parent) : NULL;
- pn_fields_t *grandfields = pni_node_fields(data, grandparent);
+ const pn_fields_t *grandfields = pni_node_fields(data, grandparent);
int index = pni_node_index(data, node);
int err;
@@ -256,7 +264,9 @@ int pni_inspect_enter(void *ctx, pn_data_t *data, pni_node_t *node)
if (atom->type == PN_NULL) {
return 0;
}
- const char *name = grandfields->fields[index];
+ const char *name = (index < grandfields->field_count)
+ ? FIELD_STRINGPOOL+FIELD_FIELDS[grandfields->first_field_index+index]
+ : NULL;
if (name) {
err = pn_string_addf(str, "%s=", name);
if (err) return err;
@@ -275,7 +285,7 @@ int pni_inspect_enter(void *ctx, pn_data_t *data, pni_node_t *node)
return pn_string_addf(str, "{");
default:
if (fields && index == 0) {
- err = pn_string_addf(str, "%s", fields->name);
+ err = pn_string_addf(str, "%s", FIELD_STRINGPOOL+FIELD_NAME[fields->name_index]);
if (err) return err;
err = pn_string_addf(str, "(");
if (err) return err;
@@ -305,7 +315,7 @@ int pni_inspect_exit(void *ctx, pn_data_t *data, pni_node_t *node)
pn_string_t *str = (pn_string_t *) ctx;
pni_node_t *parent = pn_data_node(data, node->parent);
pni_node_t *grandparent = parent ? pn_data_node(data, parent->parent) : NULL;
- pn_fields_t *grandfields = pni_node_fields(data, grandparent);
+ const pn_fields_t *grandfields = pni_node_fields(data, grandparent);
pni_node_t *next = pn_data_node(data, node->next);
int err;
@@ -356,7 +366,7 @@ static int pn_data_inspect(void *obj, pn_string_t *dst)
pn_data_t *pn_data(size_t capacity)
{
- static pn_class_t clazz = PN_CLASS(pn_data);
+ static const pn_class_t clazz = PN_CLASS(pn_data);
pn_data_t *data = (pn_data_t *) pn_new(sizeof(pn_data_t), &clazz);
data->capacity = capacity;
data->size = 0;
@@ -407,12 +417,12 @@ void pn_data_clear(pn_data_t *data)
int pn_data_grow(pn_data_t *data)
{
- data->capacity = 2*(data->capacity ? data->capacity : 16);
+ data->capacity = 2*(data->capacity ? data->capacity : 2);
data->nodes = (pni_node_t *) realloc(data->nodes, data->capacity * sizeof(pni_node_t));
return 0;
}
-ssize_t pn_data_intern(pn_data_t *data, char *start, size_t size)
+ssize_t pn_data_intern(pn_data_t *data, const char *start, size_t size)
{
size_t offset = pn_buffer_size(data->buf);
int err = pn_buffer_append(data->buf, start, size);
@@ -454,7 +464,7 @@ int pn_data_intern_node(pn_data_t *data, pni_node_t *node)
node->data = true;
node->data_offset = offset;
node->data_size = bytes->size;
- pn_bytes_t buf = pn_buffer_bytes(data->buf);
+ pn_buffer_memory_t buf = pn_buffer_memory(data->buf);
bytes->start = buf.start + offset;
if (pn_buffer_capacity(data->buf) != oldcap) {
@@ -1102,7 +1112,7 @@ int pn_data_resize(pn_data_t *data, size_t size)
}
-pni_node_t *pn_data_node(pn_data_t *data, size_t nd)
+pni_node_t *pn_data_node(pn_data_t *data, pni_nid_t nd)
{
if (nd) {
return &data->nodes[nd - 1];
@@ -1348,7 +1358,7 @@ bool pn_data_lookup(pn_data_t *data, const char *name)
void pn_data_dump(pn_data_t *data)
{
- printf("{current=%" PN_ZI ", parent=%" PN_ZI "}\n", data->current, data->parent);
+ printf("{current=%" PN_ZI ", parent=%" PN_ZI "}\n", (size_t) data->current, (size_t) data->parent);
for (unsigned i = 0; i < data->size; i++)
{
pni_node_t *node = &data->nodes[i];
@@ -1356,7 +1366,11 @@ void pn_data_dump(pn_data_t *data)
pni_inspect_atom((pn_atom_t *) &node->atom, data->str);
printf("Node %i: prev=%" PN_ZI ", next=%" PN_ZI ", parent=%" PN_ZI ", down=%" PN_ZI
", children=%" PN_ZI ", type=%s (%s)\n",
- i + 1, node->prev, node->next, node->parent, node->down, node->children,
+ i + 1, (size_t) node->prev,
+ (size_t) node->next,
+ (size_t) node->parent,
+ (size_t) node->down,
+ (size_t) node->children,
pn_type_name(node->atom.type), pn_string_get(data->str));
}
}
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/1c2f4894/proton-c/src/codec/data.h
----------------------------------------------------------------------
diff --git a/proton-c/src/codec/data.h b/proton-c/src/codec/data.h
index 5ac8d45..be1669a 100644
--- a/proton-c/src/codec/data.h
+++ b/proton-c/src/codec/data.h
@@ -27,39 +27,41 @@
#include "decoder.h"
#include "encoder.h"
+typedef uint16_t pni_nid_t;
+
typedef struct {
- size_t next;
- size_t prev;
- size_t down;
- size_t parent;
- size_t children;
+ char *start;
+ size_t data_offset;
+ size_t data_size;
pn_atom_t atom;
+ pn_type_t type;
+ pni_nid_t next;
+ pni_nid_t prev;
+ pni_nid_t down;
+ pni_nid_t parent;
+ pni_nid_t children;
// for arrays
bool described;
- pn_type_t type;
bool data;
- size_t data_offset;
- size_t data_size;
- char *start;
bool small;
} pni_node_t;
struct pn_data_t {
- size_t capacity;
- size_t size;
pni_node_t *nodes;
pn_buffer_t *buf;
- size_t parent;
- size_t current;
- size_t base_parent;
- size_t base_current;
pn_decoder_t *decoder;
pn_encoder_t *encoder;
pn_error_t *error;
pn_string_t *str;
+ pni_nid_t capacity;
+ pni_nid_t size;
+ pni_nid_t parent;
+ pni_nid_t current;
+ pni_nid_t base_parent;
+ pni_nid_t base_current;
};
-pni_node_t *pn_data_node(pn_data_t *data, size_t nd);
+pni_node_t *pn_data_node(pn_data_t *data, pni_nid_t nd);
int pni_data_traverse(pn_data_t *data,
int (*enter)(void *ctx, pn_data_t *data, pni_node_t *node),
int (*exit)(void *ctx, pn_data_t *data, pni_node_t *node),
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/1c2f4894/proton-c/src/codec/decoder.c
----------------------------------------------------------------------
diff --git a/proton-c/src/codec/decoder.c b/proton-c/src/codec/decoder.c
index cfdd3b9..28e8ae1 100644
--- a/proton-c/src/codec/decoder.c
+++ b/proton-c/src/codec/decoder.c
@@ -54,7 +54,7 @@ static void pn_decoder_finalize(void *obj) {
pn_decoder_t *pn_decoder()
{
- static pn_class_t clazz = PN_CLASS(pn_decoder);
+ static const pn_class_t clazz = PN_CLASS(pn_decoder);
return (pn_decoder_t *) pn_new(sizeof(pn_decoder_t), &clazz);
}
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/1c2f4894/proton-c/src/codec/encoder.c
----------------------------------------------------------------------
diff --git a/proton-c/src/codec/encoder.c b/proton-c/src/codec/encoder.c
index 639d132..f0f3cef 100644
--- a/proton-c/src/codec/encoder.c
+++ b/proton-c/src/codec/encoder.c
@@ -56,7 +56,7 @@ static void pn_encoder_finalize(void *obj) {
pn_encoder_t *pn_encoder()
{
- static pn_class_t clazz = PN_CLASS(pn_encoder);
+ static const pn_class_t clazz = PN_CLASS(pn_encoder);
return (pn_encoder_t *) pn_new(sizeof(pn_encoder_t), &clazz);
}
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/1c2f4894/proton-c/src/dispatch_actions.h
----------------------------------------------------------------------
diff --git a/proton-c/src/dispatch_actions.h b/proton-c/src/dispatch_actions.h
new file mode 100644
index 0000000..aa7a8f4
--- /dev/null
+++ b/proton-c/src/dispatch_actions.h
@@ -0,0 +1,45 @@
+#ifndef _PROTON_DISPATCH_ACTIONS_H
+#define _PROTON_DISPATCH_ACTIONS_H 1
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "dispatcher/dispatcher.h"
+
+/* Transport actions */
+int pn_do_open(pn_dispatcher_t *disp);
+int pn_do_begin(pn_dispatcher_t *disp);
+int pn_do_attach(pn_dispatcher_t *disp);
+int pn_do_transfer(pn_dispatcher_t *disp);
+int pn_do_flow(pn_dispatcher_t *disp);
+int pn_do_disposition(pn_dispatcher_t *disp);
+int pn_do_detach(pn_dispatcher_t *disp);
+int pn_do_end(pn_dispatcher_t *disp);
+int pn_do_close(pn_dispatcher_t *disp);
+
+/* SASL actions */
+int pn_do_init(pn_dispatcher_t *disp);
+int pn_do_mechanisms(pn_dispatcher_t *disp);
+int pn_do_challenge(pn_dispatcher_t *disp);
+int pn_do_response(pn_dispatcher_t *disp);
+int pn_do_outcome(pn_dispatcher_t *disp);
+
+#endif // _PROTON_DISPATCH_ACTIONS_H
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/1c2f4894/proton-c/src/dispatcher/dispatcher.c
----------------------------------------------------------------------
diff --git a/proton-c/src/dispatcher/dispatcher.c b/proton-c/src/dispatcher/dispatcher.c
index 3f3ee3c..296c3ab 100644
--- a/proton-c/src/dispatcher/dispatcher.c
+++ b/proton-c/src/dispatcher/dispatcher.c
@@ -30,6 +30,41 @@
#include "../util.h"
#include "../platform_fmt.h"
+#include "dispatch_actions.h"
+
+int pni_bad_frame(pn_dispatcher_t* disp) {
+ pn_transport_log(disp->transport, "Error dispatching frame: Unknown performative");
+ return PN_ERR;
+}
+
+// We could use a table based approach here if we needed to dynamically
+// add new performatives
+static inline int pni_dispatch_action(pn_dispatcher_t* disp, uint64_t lcode)
+{
+ pn_action_t *action;
+ switch (lcode) {
+ /* Regular AMQP fames */
+ case OPEN: action = pn_do_open; break;
+ case BEGIN: action = pn_do_begin; break;
+ case ATTACH: action = pn_do_attach; break;
+ case FLOW: action = pn_do_flow; break;
+ case TRANSFER: action = pn_do_transfer; break;
+ case DISPOSITION: action = pn_do_disposition; break;
+ case DETACH: action = pn_do_detach; break;
+ case END: action = pn_do_end; break;
+ case CLOSE: action = pn_do_close; break;
+
+ /* SASL frames */
+ case SASL_MECHANISMS: action = pn_do_mechanisms; break;
+ case SASL_INIT: action = pn_do_init; break;
+ case SASL_CHALLENGE: action = pn_do_challenge; break;
+ case SASL_RESPONSE: action = pn_do_response; break;
+ case SASL_OUTCOME: action = pn_do_outcome; break;
+ default: action = pni_bad_frame; break;
+ };
+ return action(disp);
+}
+
pn_dispatcher_t *pn_dispatcher(uint8_t frame_type, pn_transport_t *transport)
{
pn_dispatcher_t *disp = (pn_dispatcher_t *) calloc(sizeof(pn_dispatcher_t), 1);
@@ -40,11 +75,7 @@ pn_dispatcher_t *pn_dispatcher(uint8_t frame_type, pn_transport_t *transport)
(pn_env_bool("PN_TRACE_FRM") ? PN_TRACE_FRM : PN_TRACE_OFF) |
(pn_env_bool("PN_TRACE_DRV") ? PN_TRACE_DRV : PN_TRACE_OFF);
- disp->input = pn_buffer(1024);
- disp->fragment = 0;
-
disp->channel = 0;
- disp->code = 0;
disp->args = pn_data(16);
disp->payload = NULL;
disp->size = 0;
@@ -67,7 +98,6 @@ pn_dispatcher_t *pn_dispatcher(uint8_t frame_type, pn_transport_t *transport)
void pn_dispatcher_free(pn_dispatcher_t *disp)
{
if (disp) {
- pn_buffer_free(disp->input);
pn_data_free(disp->args);
pn_data_free(disp->output_args);
pn_buffer_free(disp->frame);
@@ -77,12 +107,6 @@ void pn_dispatcher_free(pn_dispatcher_t *disp)
}
}
-void pn_dispatcher_action(pn_dispatcher_t *disp, uint8_t code,
- pn_action_t *action)
-{
- disp->actions[code] = action;
-}
-
typedef enum {IN, OUT} pn_dir_t;
static void pn_do_trace(pn_dispatcher_t *disp, uint16_t ch, pn_dir_t dir,
@@ -92,6 +116,10 @@ static void pn_do_trace(pn_dispatcher_t *disp, uint16_t ch, pn_dir_t dir,
pn_string_format(disp->scratch, "%u %s ", ch, dir == OUT ? "->" : "<-");
pn_inspect(args, disp->scratch);
+ if (pn_data_size(args)==0) {
+ pn_string_addf(disp->scratch, "(EMPTY FRAME)");
+ }
+
if (size) {
char buf[1024];
int e = pn_quote_data(buf, 1024, payload, size);
@@ -122,7 +150,8 @@ int pn_dispatch_frame(pn_dispatcher_t *disp, pn_frame_t frame)
}
disp->channel = frame.channel;
- // XXX: assuming numeric
+ // XXX: assuming numeric -
+ // if we get a symbol we should map it to the numeric value and dispatch on that
uint64_t lcode;
bool scanned;
int e = pn_data_scan(disp->args, "D?L.", &scanned, &lcode);
@@ -134,19 +163,15 @@ int pn_dispatch_frame(pn_dispatcher_t *disp, pn_frame_t frame)
pn_transport_log(disp->transport, "Error dispatching frame");
return PN_ERR;
}
- uint8_t code = lcode;
- disp->code = code;
disp->size = frame.size - dsize;
if (disp->size)
disp->payload = frame.payload + dsize;
pn_do_trace(disp, disp->channel, IN, disp->args, disp->payload, disp->size);
- pn_action_t *action = disp->actions[code];
- int err = action(disp);
+ int err = pni_dispatch_action(disp, lcode);
disp->channel = 0;
- disp->code = 0;
pn_data_clear(disp->args);
disp->size = 0;
disp->payload = NULL;
@@ -212,7 +237,7 @@ int pn_post_frame(pn_dispatcher_t *disp, uint16_t ch, const char *fmt, ...)
encode_performatives:
pn_buffer_clear( disp->frame );
- pn_bytes_t buf = pn_buffer_bytes( disp->frame );
+ pn_buffer_memory_t buf = pn_buffer_memory( disp->frame );
buf.size = pn_buffer_available( disp->frame );
ssize_t wr = pn_data_encode( disp->output_args, buf.start, buf.size );
@@ -290,7 +315,7 @@ int pn_post_transfer_frame(pn_dispatcher_t *disp, uint16_t ch,
encode_performatives:
pn_buffer_clear( disp->frame );
- pn_bytes_t buf = pn_buffer_bytes( disp->frame );
+ pn_buffer_memory_t buf = pn_buffer_memory( disp->frame );
buf.size = pn_buffer_available( disp->frame );
ssize_t wr = pn_data_encode(disp->output_args, buf.start, buf.size);
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/1c2f4894/proton-c/src/dispatcher/dispatcher.h
----------------------------------------------------------------------
diff --git a/proton-c/src/dispatcher/dispatcher.h b/proton-c/src/dispatcher/dispatcher.h
index 80b6412..a87e383 100644
--- a/proton-c/src/dispatcher/dispatcher.h
+++ b/proton-c/src/dispatcher/dispatcher.h
@@ -33,17 +33,7 @@ typedef struct pn_dispatcher_t pn_dispatcher_t;
typedef int (pn_action_t)(pn_dispatcher_t *disp);
-#define SCRATCH (1024)
-#define CODEC_LIMIT (1024)
-
struct pn_dispatcher_t {
- pn_action_t *actions[256];
- uint8_t frame_type;
- pn_trace_t trace;
- pn_buffer_t *input;
- size_t fragment;
- uint16_t channel;
- uint8_t code;
pn_data_t *args;
const char *payload;
size_t size;
@@ -55,18 +45,19 @@ struct pn_dispatcher_t {
size_t capacity;
size_t available; /* number of raw bytes pending output */
char *output;
- pn_transport_t *transport;
- bool halt;
- bool batch;
+ pn_transport_t *transport; // TODO: We keep this to get access to logging - perhaps move logging
uint64_t output_frames_ct;
uint64_t input_frames_ct;
pn_string_t *scratch;
+ pn_trace_t trace;
+ uint16_t channel;
+ uint8_t frame_type; // Used when constructing outgoing frames
+ bool halt;
+ bool batch;
};
pn_dispatcher_t *pn_dispatcher(uint8_t frame_type, pn_transport_t *transport);
void pn_dispatcher_free(pn_dispatcher_t *disp);
-void pn_dispatcher_action(pn_dispatcher_t *disp, uint8_t code,
- pn_action_t *action);
int pn_scan_args(pn_dispatcher_t *disp, const char *fmt, ...);
void pn_set_payload(pn_dispatcher_t *disp, const char *data, size_t size);
int pn_post_frame(pn_dispatcher_t *disp, uint16_t ch, const char *fmt, ...);
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/1c2f4894/proton-c/src/engine/engine-internal.h
----------------------------------------------------------------------
diff --git a/proton-c/src/engine/engine-internal.h b/proton-c/src/engine/engine-internal.h
index d13cd58..03cb630 100644
--- a/proton-c/src/engine/engine-internal.h
+++ b/proton-c/src/engine/engine-internal.h
@@ -51,6 +51,7 @@ struct pn_endpoint_t {
pn_endpoint_t *transport_prev;
bool modified;
bool freed;
+ bool posted_final;
};
typedef struct {
@@ -95,8 +96,6 @@ typedef struct {
bool disp;
} pn_session_state_t;
-#define SCRATCH (1024)
-
#include <proton/sasl.h>
#include <proton/ssl.h>
@@ -111,21 +110,14 @@ typedef struct pn_io_layer_t {
} pn_io_layer_t;
struct pn_transport_t {
- bool freed;
pn_tracer_t tracer;
size_t header_count;
pn_sasl_t *sasl;
pn_ssl_t *ssl;
pn_connection_t *connection; // reference counted
pn_dispatcher_t *disp;
- bool open_sent;
- bool open_rcvd;
- bool close_sent;
- bool close_rcvd;
char *remote_container;
char *remote_hostname;
- uint16_t channel_max;
- uint16_t remote_channel_max;
pn_data_t *remote_offered_capabilities;
pn_data_t *remote_desired_capabilities;
pn_data_t *remote_properties;
@@ -144,15 +136,14 @@ struct pn_transport_t {
/* dead remote detection */
pn_millis_t local_idle_timeout;
+ pn_millis_t remote_idle_timeout;
pn_timestamp_t dead_remote_deadline;
uint64_t last_bytes_input;
/* keepalive */
- pn_millis_t remote_idle_timeout;
pn_timestamp_t keepalive_deadline;
uint64_t last_bytes_output;
- pn_error_t *error;
pn_hash_t *local_channels;
pn_hash_t *remote_channels;
pn_string_t *scratch;
@@ -166,14 +157,23 @@ struct pn_transport_t {
size_t output_pending;
char *output_buf;
+ void *context;
+
/* input from peer */
size_t input_size;
size_t input_pending;
char *input_buf;
+
+ uint16_t channel_max;
+ uint16_t remote_channel_max;
+ bool freed;
+ bool open_sent;
+ bool open_rcvd;
+ bool close_sent;
+ bool close_rcvd;
bool tail_closed; // input stream closed by driver
bool head_closed;
-
- void *context;
+ bool done_processing; // if true, don't call pn_process again
};
struct pn_connection_t {
@@ -211,80 +211,80 @@ struct pn_session_t {
};
struct pn_terminus_t {
- pn_terminus_type_t type;
pn_string_t *address;
- pn_durability_t durability;
- pn_expiry_policy_t expiry_policy;
- pn_seconds_t timeout;
- bool dynamic;
- pn_distribution_mode_t distribution_mode;
pn_data_t *properties;
pn_data_t *capabilities;
pn_data_t *outcomes;
pn_data_t *filter;
+ pn_durability_t durability;
+ pn_expiry_policy_t expiry_policy;
+ pn_seconds_t timeout;
+ pn_terminus_type_t type;
+ pn_distribution_mode_t distribution_mode;
+ bool dynamic;
};
struct pn_link_t {
pn_endpoint_t endpoint;
- pn_string_t *name;
- pn_session_t *session; // reference counted
pn_terminus_t source;
pn_terminus_t target;
pn_terminus_t remote_source;
pn_terminus_t remote_target;
+ pn_link_state_t state;
+ pn_string_t *name;
+ pn_session_t *session; // reference counted
pn_delivery_t *unsettled_head;
pn_delivery_t *unsettled_tail;
pn_delivery_t *current;
pn_delivery_t *settled_head;
pn_delivery_t *settled_tail;
- uint8_t snd_settle_mode;
- uint8_t rcv_settle_mode;
- uint8_t remote_snd_settle_mode;
- uint8_t remote_rcv_settle_mode;
+ void *context;
size_t unsettled_count;
pn_sequence_t available;
pn_sequence_t credit;
pn_sequence_t queued;
+ int drained; // number of drained credits
+ uint8_t snd_settle_mode;
+ uint8_t rcv_settle_mode;
+ uint8_t remote_snd_settle_mode;
+ uint8_t remote_rcv_settle_mode;
bool drain_flag_mode; // receiver only
bool drain;
- int drained; // number of drained credits
- void *context;
- pn_link_state_t state;
};
struct pn_disposition_t {
+ pn_condition_t condition;
uint64_t type;
pn_data_t *data;
pn_data_t *annotations;
- pn_condition_t condition;
- uint32_t section_number;
uint64_t section_offset;
+ uint32_t section_number;
bool failed;
bool undeliverable;
bool settled;
};
struct pn_delivery_t {
- pn_link_t *link; // reference counted
- pn_buffer_t *tag;
pn_disposition_t local;
pn_disposition_t remote;
- bool updated;
- bool settled; // tracks whether we're in the unsettled list or not
+ pn_link_t *link; // reference counted
+ pn_buffer_t *tag;
pn_delivery_t *unsettled_next;
pn_delivery_t *unsettled_prev;
pn_delivery_t *settled_next;
pn_delivery_t *settled_prev;
pn_delivery_t *work_next;
pn_delivery_t *work_prev;
- bool work;
pn_delivery_t *tpwork_next;
pn_delivery_t *tpwork_prev;
- bool tpwork;
+ pn_delivery_state_t state;
pn_buffer_t *bytes;
- bool done;
void *context;
- pn_delivery_state_t state;
+ bool updated;
+ bool settled; // tracks whether we're in the unsettled list or not
+ bool work;
+ bool tpwork;
+ bool done;
};
#define PN_SET_LOCAL(OLD, NEW) \
@@ -310,5 +310,6 @@ void pn_clear_tpwork(pn_delivery_t *delivery);
void pn_work_update(pn_connection_t *connection, pn_delivery_t *delivery);
void pn_clear_modified(pn_connection_t *connection, pn_endpoint_t *endpoint);
void pn_connection_unbound(pn_connection_t *conn);
+int pn_do_error(pn_transport_t *transport, const char *condition, const char *fmt, ...);
#endif /* engine-internal.h */
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/1c2f4894/proton-c/src/engine/engine.c
----------------------------------------------------------------------
diff --git a/proton-c/src/engine/engine.c b/proton-c/src/engine/engine.c
index 718974a..02e5009 100644
--- a/proton-c/src/engine/engine.c
+++ b/proton-c/src/engine/engine.c
@@ -52,34 +52,18 @@ pn_connection_t *pn_ep_get_connection(pn_endpoint_t *endpoint)
return NULL;
}
-/* map the endpoint type to its local event type */
-static const pn_event_type_t endpoint_event_map[] = {
- PN_CONNECTION_LOCAL_STATE, /* CONNECTION */
- PN_SESSION_LOCAL_STATE, /* SESSION */
- PN_LINK_LOCAL_STATE, /* SENDER */
- PN_LINK_LOCAL_STATE}; /* RECEIVER */
-
-/* setup the event given the endpoint that generated the event */
-static void endpoint_init_event(pn_event_t *event,
- pn_endpoint_t *endpoint)
-{
- switch (endpoint->type) {
- case CONNECTION: {
- pn_connection_t *conn = (pn_connection_t *) endpoint;
- pn_event_init_connection(event, conn);
- }
- break;
- case SESSION: {
- pn_session_t *ssn = (pn_session_t *) endpoint;
- pn_event_init_session(event, ssn);
- }
- break;
+static pn_event_type_t endpoint_event(pn_endpoint_type_t type, bool open) {
+ switch (type) {
+ case CONNECTION:
+ return open ? PN_CONNECTION_OPEN : PN_CONNECTION_CLOSE;
+ case SESSION:
+ return open ? PN_SESSION_OPEN : PN_SESSION_CLOSE;
case SENDER:
- case RECEIVER: {
- pn_link_t *link = (pn_link_t*) endpoint;
- pn_event_init_link(event, link);
- }
- break;
+ case RECEIVER:
+ return open ? PN_LINK_OPEN : PN_LINK_CLOSE;
+ default:
+ assert(false);
+ return PN_EVENT_NONE;
}
}
@@ -88,11 +72,8 @@ static void pn_endpoint_open(pn_endpoint_t *endpoint)
// TODO: do we care about the current state?
PN_SET_LOCAL(endpoint->state, PN_LOCAL_ACTIVE);
pn_connection_t *conn = pn_ep_get_connection(endpoint);
- pn_event_t *event = pn_collector_put(conn->collector,
- endpoint_event_map[endpoint->type]);
- if (event) {
- endpoint_init_event(event, endpoint);
- }
+ pn_collector_put(conn->collector, endpoint_event(endpoint->type, true),
+ endpoint);
pn_modified(conn, endpoint, true);
}
@@ -101,11 +82,8 @@ static void pn_endpoint_close(pn_endpoint_t *endpoint)
// TODO: do we care about the current state?
PN_SET_LOCAL(endpoint->state, PN_LOCAL_CLOSED);
pn_connection_t *conn = pn_ep_get_connection(endpoint);
- pn_event_t *event = pn_collector_put(conn->collector,
- endpoint_event_map[endpoint->type]);
- if (event) {
- endpoint_init_event(event, endpoint);
- }
+ pn_collector_put(conn->collector, endpoint_event(endpoint->type, false),
+ endpoint);
pn_modified(conn, endpoint, true);
}
@@ -133,8 +111,6 @@ void pn_endpoint_tini(pn_endpoint_t *endpoint);
void pn_connection_free(pn_connection_t *connection)
{
assert(!connection->endpoint.freed);
- if (pn_connection_state(connection) & PN_LOCAL_ACTIVE)
- pn_connection_close(connection);
// free those endpoints that haven't been freed by the application
LL_REMOVE(connection, endpoint, &connection->endpoint);
while (connection->endpoint_head) {
@@ -200,7 +176,7 @@ void pn_condition_init(pn_condition_t *condition)
{
condition->name = pn_string(NULL);
condition->description = pn_string(NULL);
- condition->info = pn_data(16);
+ condition->info = pn_data(0);
}
void pn_condition_tini(pn_condition_t *condition)
@@ -214,7 +190,7 @@ void pn_add_session(pn_connection_t *conn, pn_session_t *ssn)
{
pn_list_add(conn->sessions, ssn);
ssn->connection = conn;
- pn_incref(conn); // keep around until finalized
+ pn_incref2(conn, ssn); // keep around until finalized
}
void pn_remove_session(pn_connection_t *conn, pn_session_t *ssn)
@@ -248,13 +224,11 @@ void pn_session_free(pn_session_t *session)
pn_link_t *link = (pn_link_t *)pn_list_get(session->links, 0);
pn_link_free(link);
}
- if (pn_session_state(session) & PN_LOCAL_ACTIVE)
- pn_session_close(session);
pn_remove_session(session->connection, session);
pn_endpoint_t *endpoint = (pn_endpoint_t *) session;
LL_REMOVE(pn_ep_get_connection(endpoint), endpoint, endpoint);
session->endpoint.freed = true;
- pn_decref(session);
+ pn_decref2(session, session->connection);
}
void *pn_session_get_context(pn_session_t *session)
@@ -304,8 +278,6 @@ void pn_terminus_free(pn_terminus_t *terminus)
void pn_link_free(pn_link_t *link)
{
assert(!link->endpoint.freed);
- if (pn_link_state(link) & PN_LOCAL_ACTIVE)
- pn_link_close(link);
pn_remove_link(link->session, link);
pn_endpoint_t *endpoint = (pn_endpoint_t *) link;
LL_REMOVE(pn_ep_get_connection(endpoint), endpoint, endpoint);
@@ -318,10 +290,10 @@ void pn_link_free(pn_link_t *link)
while (link->settled_head) {
delivery = link->settled_head;
LL_POP(link, settled, pn_delivery_t);
- pn_decref(delivery);
+ pn_decref2(delivery, link);
}
link->endpoint.freed = true;
- pn_decref(link);
+ pn_decref2(link, link->session);
}
void *pn_link_get_context(pn_link_t *link)
@@ -348,6 +320,7 @@ void pn_endpoint_init(pn_endpoint_t *endpoint, int type, pn_connection_t *conn)
endpoint->transport_prev = NULL;
endpoint->modified = false;
endpoint->freed = false;
+ endpoint->posted_final = false;
LL_ADD(conn, endpoint, endpoint);
}
@@ -359,16 +332,37 @@ void pn_endpoint_tini(pn_endpoint_t *endpoint)
pn_condition_tini(&endpoint->condition);
}
+#include "event.h"
+
+static bool pni_post_final(pn_endpoint_t *endpoint, pn_event_type_t type)
+{
+ pn_connection_t *conn = pn_ep_get_connection(endpoint);
+ if (!endpoint->posted_final) {
+ endpoint->posted_final = true;
+ pn_event_t *event = pn_collector_put(conn->collector, type, endpoint);
+ if (event) { return true; }
+ }
+
+ return false;
+}
+
static void pn_connection_finalize(void *object)
{
pn_connection_t *conn = (pn_connection_t *) object;
+
+ pn_endpoint_t *endpoint = &conn->endpoint;
+ if (pni_post_final(endpoint, PN_CONNECTION_FINAL)) {
+ return;
+ }
+
+ pn_decref2(conn->collector, conn);
pn_free(conn->sessions);
pn_free(conn->container);
pn_free(conn->hostname);
pn_free(conn->offered_capabilities);
pn_free(conn->desired_capabilities);
pn_free(conn->properties);
- pn_endpoint_tini(&conn->endpoint);
+ pn_endpoint_tini(endpoint);
}
#define pn_connection_initialize NULL
@@ -376,11 +370,9 @@ static void pn_connection_finalize(void *object)
#define pn_connection_compare NULL
#define pn_connection_inspect NULL
-#include "event.h"
-
pn_connection_t *pn_connection()
{
- static pn_class_t clazz = PN_CLASS(pn_connection);
+ static const pn_class_t clazz = PN_CLASS(pn_connection);
pn_connection_t *conn = (pn_connection_t *) pn_new(sizeof(pn_connection_t), &clazz);
if (!conn) return NULL;
@@ -398,17 +390,30 @@ pn_connection_t *pn_connection()
conn->tpwork_tail = NULL;
conn->container = pn_string(NULL);
conn->hostname = pn_string(NULL);
- conn->offered_capabilities = pn_data(16);
- conn->desired_capabilities = pn_data(16);
- conn->properties = pn_data(16);
+ conn->offered_capabilities = pn_data(0);
+ conn->desired_capabilities = pn_data(0);
+ conn->properties = pn_data(0);
conn->collector = NULL;
return conn;
}
+static const pn_event_type_t endpoint_init_event_map[] = {
+ PN_CONNECTION_INIT, /* CONNECTION */
+ PN_SESSION_INIT, /* SESSION */
+ PN_LINK_INIT, /* SENDER */
+ PN_LINK_INIT}; /* RECEIVER */
+
void pn_connection_collect(pn_connection_t *connection, pn_collector_t *collector)
{
+ pn_decref2(connection->collector, connection);
connection->collector = collector;
+ pn_incref2(connection->collector, connection);
+ pn_endpoint_t *endpoint = connection->endpoint_head;
+ while (endpoint) {
+ pn_collector_put(connection->collector, endpoint_init_event_map[endpoint->type], endpoint);
+ endpoint = endpoint->endpoint_next;
+ }
}
pn_state_t pn_connection_state(pn_connection_t *connection)
@@ -556,7 +561,7 @@ void pn_add_tpwork(pn_delivery_t *delivery)
{
LL_ADD(connection, tpwork, delivery);
delivery->tpwork = true;
- pn_incref(delivery);
+ pn_incref2(delivery, connection);
}
pn_modified(connection, &connection->endpoint, true);
}
@@ -568,7 +573,7 @@ void pn_clear_tpwork(pn_delivery_t *delivery)
{
LL_REMOVE(connection, tpwork, delivery);
delivery->tpwork = false;
- pn_decref(delivery); // may free delivery!
+ pn_decref2(delivery, connection); // may free delivery!
}
}
@@ -590,14 +595,12 @@ void pn_modified(pn_connection_t *connection, pn_endpoint_t *endpoint, bool emit
if (!endpoint->modified) {
LL_ADD(connection, transport, endpoint);
endpoint->modified = true;
- pn_incref(endpoint);
+ pn_incref2(endpoint, connection);
}
- if (emit) {
- pn_event_t *event = pn_collector_put(connection->collector, PN_TRANSPORT);
- if (event) {
- pn_event_init_connection(event, connection);
- }
+ if (emit && connection->transport) {
+ pn_collector_put(connection->collector, PN_TRANSPORT,
+ connection->transport);
}
}
@@ -608,7 +611,7 @@ void pn_clear_modified(pn_connection_t *connection, pn_endpoint_t *endpoint)
endpoint->transport_next = NULL;
endpoint->transport_prev = NULL;
endpoint->modified = false;
- pn_decref(endpoint); // may free endpoint!
+ pn_decref2(endpoint, connection); // may free endpoint!
}
}
@@ -687,6 +690,7 @@ pn_link_t *pn_link_next(pn_link_t *link, pn_state_t state)
static void pn_session_finalize(void *object)
{
pn_session_t *session = (pn_session_t *) object;
+ pn_endpoint_t *endpoint = &session->endpoint;
//pn_transport_t *transport = session->connection->transport;
//if (transport) {
/* if ((int16_t)session->state.local_channel >= 0) // END not sent */
@@ -695,13 +699,17 @@ static void pn_session_finalize(void *object)
/* pn_unmap_channel(transport, session); */
/* } */
+ if (pni_post_final(endpoint, PN_SESSION_FINAL)) {
+ return;
+ }
+
pn_free(session->links);
- pn_endpoint_tini(&session->endpoint);
+ pn_endpoint_tini(endpoint);
pn_delivery_map_free(&session->state.incoming);
pn_delivery_map_free(&session->state.outgoing);
pn_free(session->state.local_handles);
pn_free(session->state.remote_handles);
- pn_decref(session->connection);
+ pn_decref2(session->connection, session);
}
#define pn_session_initialize NULL
@@ -712,8 +720,8 @@ static void pn_session_finalize(void *object)
pn_session_t *pn_session(pn_connection_t *conn)
{
assert(conn);
- static pn_class_t clazz = PN_CLASS(pn_session);
- pn_session_t *ssn = (pn_session_t *) pn_new(sizeof(pn_session_t), &clazz);
+ static const pn_class_t clazz = PN_CLASS(pn_session);
+ pn_session_t *ssn = (pn_session_t *) pn_new2(sizeof(pn_session_t), &clazz, conn);
if (!ssn) return NULL;
pn_endpoint_init(&ssn->endpoint, SESSION, conn);
@@ -736,6 +744,7 @@ pn_session_t *pn_session(pn_connection_t *conn)
ssn->state.remote_handles = pn_hash(0, 0.75, PN_REFCOUNT);
// end transport state
+ pn_collector_put(conn->collector, PN_SESSION_INIT, ssn);
return ssn;
}
@@ -779,31 +788,36 @@ void pn_terminus_init(pn_terminus_t *terminus, pn_terminus_type_t type)
terminus->type = type;
terminus->address = pn_string(NULL);
terminus->durability = PN_NONDURABLE;
- terminus->expiry_policy = PN_SESSION_CLOSE;
+ terminus->expiry_policy = PN_EXPIRE_WITH_SESSION;
terminus->timeout = 0;
terminus->dynamic = false;
terminus->distribution_mode = PN_DIST_MODE_UNSPECIFIED;
- terminus->properties = pn_data(16);
- terminus->capabilities = pn_data(16);
- terminus->outcomes = pn_data(16);
- terminus->filter = pn_data(16);
+ terminus->properties = pn_data(0);
+ terminus->capabilities = pn_data(0);
+ terminus->outcomes = pn_data(0);
+ terminus->filter = pn_data(0);
}
static void pn_link_finalize(void *object)
{
pn_link_t *link = (pn_link_t *) object;
+ pn_endpoint_t *endpoint = &link->endpoint;
// assumptions: all deliveries freed
assert(link->settled_head == NULL);
assert(link->unsettled_head == NULL);
+ if (pni_post_final(endpoint, PN_LINK_FINAL)) {
+ return;
+ }
+
pn_terminus_free(&link->source);
pn_terminus_free(&link->target);
pn_terminus_free(&link->remote_source);
pn_terminus_free(&link->remote_target);
pn_free(link->name);
- pn_endpoint_tini(&link->endpoint);
- pn_decref(link->session);
+ pn_endpoint_tini(endpoint);
+ pn_decref2(link->session, link);
}
#define pn_link_initialize NULL
@@ -813,12 +827,12 @@ static void pn_link_finalize(void *object)
pn_link_t *pn_link_new(int type, pn_session_t *session, const char *name)
{
- static pn_class_t clazz = PN_CLASS(pn_link);
- pn_link_t *link = (pn_link_t *) pn_new(sizeof(pn_link_t), &clazz);
+ static const pn_class_t clazz = PN_CLASS(pn_link);
+ pn_link_t *link = (pn_link_t *) pn_new2(sizeof(pn_link_t), &clazz, session);
pn_endpoint_init(&link->endpoint, type, session->connection);
pn_add_link(session, link);
- pn_incref(session); // keep session until link finalized
+ pn_incref2(session, link); // keep session until link finalized
link->name = pn_string(name);
pn_terminus_init(&link->source, PN_SOURCE);
pn_terminus_init(&link->target, PN_TARGET);
@@ -844,8 +858,9 @@ pn_link_t *pn_link_new(int type, pn_session_t *session, const char *name)
link->state.remote_handle = -1;
link->state.delivery_count = 0;
link->state.link_credit = 0;
- // end transport stat
+ // end transport state
+ pn_collector_put(session->connection->collector, PN_LINK_INIT, link);
return link;
}
@@ -1057,13 +1072,13 @@ static void pn_delivery_finalize(void *object)
pn_buffer_free(delivery->bytes);
pn_disposition_finalize(&delivery->local);
pn_disposition_finalize(&delivery->remote);
- pn_decref(delivery->link);
+ pn_decref2(delivery->link, delivery);
}
static void pn_disposition_init(pn_disposition_t *ds)
{
- ds->data = pn_data(16);
- ds->annotations = pn_data(16);
+ ds->data = pn_data(0);
+ ds->annotations = pn_data(0);
pn_condition_init(&ds->condition);
}
@@ -1091,11 +1106,11 @@ pn_delivery_t *pn_delivery(pn_link_t *link, pn_delivery_tag_t tag)
pn_delivery_t *delivery = link->settled_head;
LL_POP(link, settled, pn_delivery_t);
if (!delivery) {
- static pn_class_t clazz = PN_CLASS(pn_delivery);
- delivery = (pn_delivery_t *) pn_new(sizeof(pn_delivery_t), &clazz);
+ static const pn_class_t clazz = PN_CLASS(pn_delivery);
+ delivery = (pn_delivery_t *) pn_new2(sizeof(pn_delivery_t), &clazz, link);
if (!delivery) return NULL;
delivery->link = link;
- pn_incref(delivery->link); // keep link until finalized
+ pn_incref2(delivery->link, delivery); // keep link until finalized
delivery->tag = pn_buffer(16);
delivery->bytes = pn_buffer(64);
pn_disposition_init(&delivery->local);
@@ -1442,6 +1457,7 @@ ssize_t pn_link_send(pn_link_t *sender, const char *bytes, size_t n)
{
pn_delivery_t *current = pn_link_current(sender);
if (!current) return PN_EOS;
+ if (!bytes || !n) return 0;
pn_buffer_append(current->bytes, bytes, n);
sender->session->outgoing_bytes += n;
pn_add_tpwork(current);
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/1c2f4894/proton-c/src/engine/event.c
----------------------------------------------------------------------
diff --git a/proton-c/src/engine/event.c b/proton-c/src/engine/event.c
index 9a3f220..07e3cb5 100644
--- a/proton-c/src/engine/event.c
+++ b/proton-c/src/engine/event.c
@@ -6,16 +6,13 @@ struct pn_collector_t {
pn_event_t *head;
pn_event_t *tail;
pn_event_t *free_head;
+ bool freed;
};
struct pn_event_t {
- pn_event_type_t type;
- pn_connection_t *connection;
- pn_session_t *session;
- pn_link_t *link;
- pn_delivery_t *delivery;
- pn_transport_t *transport;
+ void *context; // depends on type
pn_event_t *next;
+ pn_event_type_t type;
};
static void pn_collector_initialize(void *obj)
@@ -24,25 +21,36 @@ static void pn_collector_initialize(void *obj)
collector->head = NULL;
collector->tail = NULL;
collector->free_head = NULL;
+ collector->freed = false;
}
-static void pn_collector_finalize(void *obj)
+static void pn_collector_drain(pn_collector_t *collector)
{
- pn_collector_t *collector = (pn_collector_t *) obj;
-
while (pn_collector_peek(collector)) {
pn_collector_pop(collector);
}
assert(!collector->head);
assert(!collector->tail);
+}
+static void pn_collector_shrink(pn_collector_t *collector)
+{
pn_event_t *event = collector->free_head;
while (event) {
pn_event_t *next = event->next;
pn_free(event);
event = next;
}
+
+ collector->free_head = NULL;
+}
+
+static void pn_collector_finalize(void *obj)
+{
+ pn_collector_t *collector = (pn_collector_t *) obj;
+ pn_collector_drain(collector);
+ pn_collector_shrink(collector);
}
static int pn_collector_inspect(void *obj, pn_string_t *dst)
@@ -72,25 +80,39 @@ static int pn_collector_inspect(void *obj, pn_string_t *dst)
pn_collector_t *pn_collector(void)
{
- static pn_class_t clazz = PN_CLASS(pn_collector);
+ static const pn_class_t clazz = PN_CLASS(pn_collector);
pn_collector_t *collector = (pn_collector_t *) pn_new(sizeof(pn_collector_t), &clazz);
return collector;
}
void pn_collector_free(pn_collector_t *collector)
{
- pn_free(collector);
+ collector->freed = true;
+ pn_collector_drain(collector);
+ pn_collector_shrink(collector);
+ pn_decref(collector);
}
pn_event_t *pn_event(void);
static void pn_event_initialize(void *obj);
-pn_event_t *pn_collector_put(pn_collector_t *collector, pn_event_type_t type)
+pn_event_t *pn_collector_put(pn_collector_t *collector, pn_event_type_t type, void *context)
{
if (!collector) {
return NULL;
}
+ assert(context);
+
+ if (collector->freed) {
+ return NULL;
+ }
+
+ pn_event_t *tail = collector->tail;
+ if (tail && tail->type == type && tail->context == context) {
+ return NULL;
+ }
+
pn_event_t *event;
if (collector->free_head) {
@@ -101,8 +123,6 @@ pn_event_t *pn_collector_put(pn_collector_t *collector, pn_event_type_t type)
event = pn_event();
}
- pn_event_t *tail = collector->tail;
-
if (tail) {
tail->next = event;
collector->tail = event;
@@ -112,26 +132,16 @@ pn_event_t *pn_collector_put(pn_collector_t *collector, pn_event_type_t type)
}
event->type = type;
+ event->context = context;
+ pn_incref2(event->context, collector);
+
+ //printf("event %s on %p\n", pn_event_type_name(event->type), event->context);
return event;
}
pn_event_t *pn_collector_peek(pn_collector_t *collector)
{
- // discard any events for objects that no longer exist
- pn_event_t *event = collector->head;
- while (event && ((event->delivery && event->delivery->local.settled)
- ||
- (event->link && event->link->endpoint.freed)
- ||
- (event->session && event->session->endpoint.freed)
- ||
- (event->connection && event->connection->endpoint.freed)
- ||
- (event->transport && event->transport->freed))) {
- pn_collector_pop(collector);
- event = collector->head;
- }
return collector->head;
}
@@ -148,15 +158,15 @@ bool pn_collector_pop(pn_collector_t *collector)
collector->tail = NULL;
}
+ // decref before adding to the free list
+ if (event->context) {
+ pn_decref2(event->context, collector);
+ event->context = NULL;
+ }
+
event->next = collector->free_head;
collector->free_head = event;
- if (event->connection) pn_decref(event->connection);
- if (event->session) pn_decref(event->session);
- if (event->link) pn_decref(event->link);
- if (event->delivery) pn_decref(event->delivery);
- if (event->transport) pn_decref(event->transport);
-
return true;
}
@@ -164,11 +174,7 @@ static void pn_event_initialize(void *obj)
{
pn_event_t *event = (pn_event_t *) obj;
event->type = PN_EVENT_NONE;
- event->connection = NULL;
- event->session = NULL;
- event->link = NULL;
- event->delivery = NULL;
- event->transport = NULL;
+ event->context = NULL;
event->next = NULL;
}
@@ -178,16 +184,12 @@ static int pn_event_inspect(void *obj, pn_string_t *dst)
{
assert(obj);
pn_event_t *event = (pn_event_t *) obj;
- int err = pn_string_addf(dst, "(%d", event->type);
- void *objects[] = {event->connection, event->session, event->link,
- event->delivery, event->transport};
- for (int i = 0; i < 5; i++) {
- if (objects[i]) {
- err = pn_string_addf(dst, ", ");
- if (err) return err;
- err = pn_inspect(objects[i], dst);
- if (err) return err;
- }
+ int err = pn_string_addf(dst, "(0x%X", (unsigned int)event->type);
+ if (event->context) {
+ err = pn_string_addf(dst, ", ");
+ if (err) return err;
+ err = pn_inspect(event->context, dst);
+ if (err) return err;
}
return pn_string_addf(dst, ")");
@@ -198,45 +200,11 @@ static int pn_event_inspect(void *obj, pn_string_t *dst)
pn_event_t *pn_event(void)
{
- static pn_class_t clazz = PN_CLASS(pn_event);
+ static const pn_class_t clazz = PN_CLASS(pn_event);
pn_event_t *event = (pn_event_t *) pn_new(sizeof(pn_event_t), &clazz);
return event;
}
-void pn_event_init_transport(pn_event_t *event, pn_transport_t *transport)
-{
- event->transport = transport;
- pn_incref(event->transport);
-}
-
-void pn_event_init_connection(pn_event_t *event, pn_connection_t *connection)
-{
- event->connection = connection;
- pn_event_init_transport(event, event->connection->transport);
- pn_incref(event->connection);
-}
-
-void pn_event_init_session(pn_event_t *event, pn_session_t *session)
-{
- event->session = session;
- pn_event_init_connection(event, pn_session_connection(event->session));
- pn_incref(event->session);
-}
-
-void pn_event_init_link(pn_event_t *event, pn_link_t *link)
-{
- event->link = link;
- pn_event_init_session(event, pn_link_session(event->link));
- pn_incref(event->link);
-}
-
-void pn_event_init_delivery(pn_event_t *event, pn_delivery_t *delivery)
-{
- event->delivery = delivery;
- pn_event_init_link(event, pn_delivery_link(delivery));
- pn_incref(event->delivery);
-}
-
pn_event_type_t pn_event_type(pn_event_t *event)
{
return event->type;
@@ -247,29 +215,84 @@ pn_event_category_t pn_event_category(pn_event_t *event)
return (pn_event_category_t)(event->type & 0xFFFF0000);
}
+void *pn_event_context(pn_event_t *event)
+{
+ assert(event);
+ return event->context;
+}
+
pn_connection_t *pn_event_connection(pn_event_t *event)
{
- return event->connection;
+ pn_session_t *ssn;
+ pn_transport_t *transport;
+
+ switch (pn_event_category(event)) {
+ case PN_EVENT_CATEGORY_CONNECTION:
+ return (pn_connection_t *)event->context;
+ case PN_EVENT_CATEGORY_TRANSPORT:
+ transport = pn_event_transport(event);
+ if (transport)
+ return transport->connection;
+ return NULL;
+ default:
+ ssn = pn_event_session(event);
+ if (ssn)
+ return pn_session_connection(ssn);
+ }
+ return NULL;
}
pn_session_t *pn_event_session(pn_event_t *event)
{
- return event->session;
+ pn_link_t *link;
+ switch (pn_event_category(event)) {
+ case PN_EVENT_CATEGORY_SESSION:
+ return (pn_session_t *)event->context;
+ default:
+ link = pn_event_link(event);
+ if (link)
+ return pn_link_session(link);
+ }
+ return NULL;
}
pn_link_t *pn_event_link(pn_event_t *event)
{
- return event->link;
+ pn_delivery_t *dlv;
+ switch (pn_event_category(event)) {
+ case PN_EVENT_CATEGORY_LINK:
+ return (pn_link_t *)event->context;
+ default:
+ dlv = pn_event_delivery(event);
+ if (dlv)
+ return pn_delivery_link(dlv);
+ }
+ return NULL;
}
pn_delivery_t *pn_event_delivery(pn_event_t *event)
{
- return event->delivery;
+ switch (pn_event_category(event)) {
+ case PN_EVENT_CATEGORY_DELIVERY:
+ return (pn_delivery_t *)event->context;
+ default:
+ return NULL;
+ }
}
pn_transport_t *pn_event_transport(pn_event_t *event)
{
- return event->transport;
+ switch (pn_event_category(event)) {
+ case PN_EVENT_CATEGORY_TRANSPORT:
+ return (pn_transport_t *)event->context;
+ default:
+ {
+ pn_connection_t *conn = pn_event_connection(event);
+ if (conn)
+ return pn_connection_transport(conn);
+ return NULL;
+ }
+ }
}
const char *pn_event_type_name(pn_event_type_t type)
@@ -277,20 +300,44 @@ const char *pn_event_type_name(pn_event_type_t type)
switch (type) {
case PN_EVENT_NONE:
return "PN_EVENT_NONE";
- case PN_CONNECTION_REMOTE_STATE:
- return "PN_CONNECTION_REMOTE_STATE";
- case PN_CONNECTION_LOCAL_STATE:
- return "PN_CONNECTION_LOCAL_STATE";
- case PN_SESSION_REMOTE_STATE:
- return "PN_SESSION_REMOTE_STATE";
- case PN_SESSION_LOCAL_STATE:
- return "PN_SESSION_LOCAL_STATE";
- case PN_LINK_REMOTE_STATE:
- return "PN_LINK_REMOTE_STATE";
- case PN_LINK_LOCAL_STATE:
- return "PN_LINK_LOCAL_STATE";
+ case PN_CONNECTION_INIT:
+ return "PN_CONNECTION_INIT";
+ case PN_CONNECTION_REMOTE_OPEN:
+ return "PN_CONNECTION_REMOTE_OPEN";
+ case PN_CONNECTION_OPEN:
+ return "PN_CONNECTION_OPEN";
+ case PN_CONNECTION_REMOTE_CLOSE:
+ return "PN_CONNECTION_REMOTE_CLOSE";
+ case PN_CONNECTION_CLOSE:
+ return "PN_CONNECTION_CLOSE";
+ case PN_CONNECTION_FINAL:
+ return "PN_CONNECTION_FINAL";
+ case PN_SESSION_INIT:
+ return "PN_SESSION_INIT";
+ case PN_SESSION_REMOTE_OPEN:
+ return "PN_SESSION_REMOTE_OPEN";
+ case PN_SESSION_OPEN:
+ return "PN_SESSION_OPEN";
+ case PN_SESSION_REMOTE_CLOSE:
+ return "PN_SESSION_REMOTE_CLOSE";
+ case PN_SESSION_CLOSE:
+ return "PN_SESSION_CLOSE";
+ case PN_SESSION_FINAL:
+ return "PN_SESSION_FINAL";
+ case PN_LINK_INIT:
+ return "PN_LINK_INIT";
+ case PN_LINK_REMOTE_OPEN:
+ return "PN_LINK_REMOTE_OPEN";
+ case PN_LINK_OPEN:
+ return "PN_LINK_OPEN";
+ case PN_LINK_REMOTE_CLOSE:
+ return "PN_LINK_REMOTE_CLOSE";
+ case PN_LINK_CLOSE:
+ return "PN_LINK_CLOSE";
case PN_LINK_FLOW:
return "PN_LINK_FLOW";
+ case PN_LINK_FINAL:
+ return "PN_LINK_FINAL";
case PN_DELIVERY:
return "PN_DELIVERY";
case PN_TRANSPORT:
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/1c2f4894/proton-c/src/engine/event.h
----------------------------------------------------------------------
diff --git a/proton-c/src/engine/event.h b/proton-c/src/engine/event.h
index 80f3422..b05f2d0 100644
--- a/proton-c/src/engine/event.h
+++ b/proton-c/src/engine/event.h
@@ -22,12 +22,7 @@
*
*/
-pn_event_t *pn_collector_put(pn_collector_t *collector, pn_event_type_t type);
-
-void pn_event_init_transport(pn_event_t *event, pn_transport_t *transport);
-void pn_event_init_connection(pn_event_t *event, pn_connection_t *connection);
-void pn_event_init_session(pn_event_t *event, pn_session_t *session);
-void pn_event_init_link(pn_event_t *event, pn_link_t *link);
-void pn_event_init_delivery(pn_event_t *event, pn_delivery_t *delivery);
+pn_event_t *pn_collector_put(pn_collector_t *collector, pn_event_type_t type,
+ void *context);
#endif /* event.h */
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/1c2f4894/proton-c/src/error.c
----------------------------------------------------------------------
diff --git a/proton-c/src/error.c b/proton-c/src/error.c
index 77a3dc2..c3cf36a 100644
--- a/proton-c/src/error.c
+++ b/proton-c/src/error.c
@@ -27,9 +27,9 @@
#include "platform.h"
struct pn_error_t {
- int code;
char *text;
pn_error_t *root;
+ int code;
};
pn_error_t *pn_error()
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/1c2f4894/proton-c/src/message/message.c
----------------------------------------------------------------------
diff --git a/proton-c/src/message/message.c b/proton-c/src/message/message.c
index 6c1088a..d91ab63 100644
--- a/proton-c/src/message/message.c
+++ b/proton-c/src/message/message.c
@@ -49,11 +49,8 @@ ssize_t pn_message_data(char *dst, size_t available, const char *src, size_t siz
// message
struct pn_message_t {
- bool durable;
- uint8_t priority;
- pn_millis_t ttl;
- bool first_acquirer;
- uint32_t delivery_count;
+ pn_timestamp_t expiry_time;
+ pn_timestamp_t creation_time;
pn_data_t *id;
pn_string_t *user_id;
pn_string_t *address;
@@ -62,22 +59,28 @@ struct pn_message_t {
pn_data_t *correlation_id;
pn_string_t *content_type;
pn_string_t *content_encoding;
- pn_timestamp_t expiry_time;
- pn_timestamp_t creation_time;
pn_string_t *group_id;
- pn_sequence_t group_sequence;
pn_string_t *reply_to_group_id;
- bool inferred;
pn_data_t *data;
pn_data_t *instructions;
pn_data_t *annotations;
pn_data_t *properties;
pn_data_t *body;
- pn_format_t format;
pn_parser_t *parser;
pn_error_t *error;
+
+ pn_format_t format;
+ pn_sequence_t group_sequence;
+ pn_millis_t ttl;
+ uint32_t delivery_count;
+
+ uint8_t priority;
+
+ bool durable;
+ bool first_acquirer;
+ bool inferred;
};
void pn_message_finalize(void *obj)
@@ -318,7 +321,7 @@ int pn_message_inspect(void *obj, pn_string_t *dst)
pn_message_t *pn_message()
{
- static pn_class_t clazz = PN_CLASS(pn_message);
+ static const pn_class_t clazz = PN_CLASS(pn_message);
pn_message_t *msg = (pn_message_t *) pn_new(sizeof(pn_message_t), &clazz);
msg->durable = false;
msg->priority = PN_DEFAULT_PRIORITY;
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org