You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ac...@apache.org on 2016/11/17 18:18:52 UTC
[05/11] qpid-proton git commit: PROTON-1344: C proactor for
multi-threaded proton applications
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/ca454180/examples/engine/c/psend.c
----------------------------------------------------------------------
diff --git a/examples/engine/c/psend.c b/examples/engine/c/psend.c
deleted file mode 100644
index 2ad0edb..0000000
--- a/examples/engine/c/psend.c
+++ /dev/null
@@ -1,373 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-
-/*################################################################
- This program is half of a pair. Precv and Psend are meant to
- be simple-as-possible examples of how to use the proton-c
- engine interface to send and receive messages over a single
- connection and a single session.
-
- In addition to being examples, these programs or their
- descendants will be used in performance regression testing
- for both throughput and latency, and long-term soak testing.
-
- This program, psend, is highly similar to its peer precv.
- I put all the good comments in precv.
-*################################################################*/
-
-
-#include <unistd.h>
-#include <stdlib.h>
-#include <stdio.h>
-#include <string.h>
-#include <ctype.h>
-#include <time.h>
-#include <sys/time.h>
-#define __STDC_FORMAT_MACROS
-#include <inttypes.h>
-
-
-#include <proton/connection.h>
-#include <proton/delivery.h>
-#include <proton/driver.h>
-#include <proton/event.h>
-#include <proton/terminus.h>
-#include <proton/link.h>
-#include <proton/message.h>
-#include <proton/session.h>
-
-
-#define MY_BUF_SIZE 1000
-
-
-
-void
-print_timestamp ( FILE * fp, char const * label )
-{
- struct timeval tv;
- struct tm * timeinfo;
-
- gettimeofday ( & tv, 0 );
- timeinfo = localtime ( & tv.tv_sec );
-
- int seconds_today = 3600 * timeinfo->tm_hour +
- 60 * timeinfo->tm_min +
- timeinfo->tm_sec;
-
- fprintf ( fp, "time : %d.%.6ld : %s\n", seconds_today, tv.tv_usec, label );
-}
-
-
-
-
-
-static
-double
-get_time ( )
-{
- struct timeval tv;
- struct tm * timeinfo;
-
- gettimeofday ( & tv, 0 );
- timeinfo = localtime ( & tv.tv_sec );
-
- double time_now = 3600 * timeinfo->tm_hour +
- 60 * timeinfo->tm_min +
- timeinfo->tm_sec;
-
- time_now += ((double)(tv.tv_usec) / 1000000.0);
- return time_now;
-}
-
-
-
-
-
-int
-main ( int argc, char ** argv )
-{
- char addr [ 1000 ];
- char host [ 1000 ];
- char port [ 1000 ];
- char output_file_name[1000];
-
-
- uint64_t messages = 2000000,
- delivery_count = 0;
-
- int message_length = 100;
-
- bool done = false;
- int sent_count = 0;
- int n_links = 5;
- int const max_links = 100;
-
- strcpy ( addr, "queue" );
- strcpy ( host, "0.0.0.0" );
- strcpy ( port, "5672" );
-
- FILE * output_fp;
-
-
- for ( int i = 1; i < argc; ++ i )
- {
- if ( ! strcmp ( "--host", argv[i] ) )
- {
- strcpy ( host, argv[i+1] );
- ++ i;
- }
- else
- if ( ! strcmp ( "--port", argv[i] ) )
- {
- strcpy ( port, argv[i+1] );
- ++ i;
- }
- else
- if ( ! strcmp ( "--messages", argv[i] ) )
- {
- sscanf ( argv [ i+1 ], "%" SCNd64 , & messages );
- ++ i;
- }
- else
- if ( ! strcmp ( "--message_length", argv[i] ) )
- {
- sscanf ( argv [ i+1 ], "%d", & message_length );
- ++ i;
- }
- else
- if ( ! strcmp ( "--n_links", argv[i] ) )
- {
- sscanf ( argv [ i+1 ], "%d", & n_links );
- ++ i;
- }
- if ( ! strcmp ( "--output", argv[i] ) )
- {
- if ( ! strcmp ( "stderr", argv[i+1] ) )
- {
- output_fp = stderr;
- strcpy ( output_file_name, "stderr");
- }
- else
- if ( ! strcmp ( "stdout", argv[i+1] ) )
- {
- output_fp = stdout;
- strcpy ( output_file_name, "stdout");
- }
- else
- {
- output_fp = fopen ( argv[i+1], "w" );
- strcpy ( output_file_name, argv[i+1] );
- if ( ! output_fp )
- {
- fprintf ( stderr, "Can't open |%s| for writing.\n", argv[i+1] );
- exit ( 1 );
- }
- }
- ++ i;
- }
- else
- {
- fprintf ( output_fp, "unknown arg %s", argv[i] );
- }
- }
-
-
- fprintf ( output_fp, "host %s\n", host );
- fprintf ( output_fp, "port %s\n", port );
- fprintf ( output_fp, "messages %" PRId64 "\n", messages );
- fprintf ( output_fp, "message_length %d\n", message_length );
- fprintf ( output_fp, "n_links %d\n", n_links );
- fprintf ( output_fp, "output %s\n", output_file_name );
-
-
- if ( n_links > max_links )
- {
- fprintf ( output_fp, "You can't have more than %d links.\n", max_links );
- exit ( 1 );
- }
-
- pn_driver_t * driver;
- pn_connector_t * connector;
- pn_connector_t * driver_connector;
- pn_connection_t * connection;
- pn_collector_t * collector;
- pn_link_t * links [ max_links ];
- pn_session_t * session;
- pn_event_t * event;
- pn_delivery_t * delivery;
-
-
- char * message = (char *) malloc(message_length);
- memset ( message, 13, message_length );
-
- /*----------------------------------------------------
- Get everything set up.
- We will have a single connector, a single
- connection, a single session, and a single link.
- ----------------------------------------------------*/
- driver = pn_driver ( );
- connector = pn_connector ( driver, host, port, 0 );
-
- connection = pn_connection();
- collector = pn_collector ( );
- pn_connection_collect ( connection, collector );
- pn_connector_set_connection ( connector, connection );
-
- session = pn_session ( connection );
- pn_connection_open ( connection );
- pn_session_open ( session );
-
- for ( int i = 0; i < n_links; ++ i )
- {
- char name[100];
- sprintf ( name, "tvc_15_%d", i );
- links[i] = pn_sender ( session, name );
- pn_terminus_set_address ( pn_link_target(links[i]), addr );
- pn_link_open ( links[i] );
- }
-
- /*-----------------------------------------------------------
- For my speed tests, I do not want to count setup time.
- Start timing here. The receiver will print out a similar
- timestamp when he receives the final message.
- -----------------------------------------------------------*/
- fprintf ( output_fp, "psend: sending %llu messages.\n", messages );
-
- // Just before we start sending, print the start timestamp.
- fprintf ( output_fp, "psend_start %.3lf\n", get_time() );
-
- while ( 1 )
- {
- pn_driver_wait ( driver, -1 );
-
- int event_count = 1;
- while ( event_count > 0 )
- {
- event_count = 0;
- pn_connector_process ( connector );
-
- event = pn_collector_peek(collector);
- while ( event )
- {
- ++ event_count;
- pn_event_type_t event_type = pn_event_type ( event );
- //fprintf ( output_fp, "event: %s\n", pn_event_type_name ( event_type ) );
-
- switch ( event_type )
- {
- case PN_LINK_FLOW:
- {
- if ( delivery_count < messages )
- {
- /*---------------------------------------------------
- We may have opened multiple links.
- The event will tell us which one this flow-event
- happened on. If the flow event gave us some
- credit, we will greedily send messages until it
- is all used up.
- ---------------------------------------------------*/
- pn_link_t * link = pn_event_link ( event );
- int credit = pn_link_credit ( link );
-
- while ( credit > 0 )
- {
- // Every delivery we create needs a unique tag.
- char str [ 100 ];
- sprintf ( str, "%x", delivery_count ++ );
- delivery = pn_delivery ( link, pn_dtag(str, strlen(str)) );
-
- // If you settle the delivery before sending it,
- // you will spend some time wondering why your
- // messages don't have any content when they arrive
- // at the receiver.
- pn_link_send ( link, message, message_length );
- pn_delivery_settle ( delivery );
- pn_link_advance ( link );
- credit = pn_link_credit ( link );
- }
-
- if ( delivery_count >= messages )
- {
- fprintf ( output_fp,
- "I have sent all %d messages.\n" ,
- delivery_count
- );
- /*
- I'm still kind of hazy on how to shut down the
- psend / precv system properly ....
- I can't go to all_done here, or precv will never
- get all its messages and terminate.
- So I let precv terminate properly ... which means
- that this program, psend, dies with an error.
- Hmm.
- */
- // goto all_done;
- }
- }
- }
- break;
-
-
- case PN_TRANSPORT:
- // I don't know what this means here, either.
- break;
-
-
- case PN_TRANSPORT_TAIL_CLOSED:
- goto all_done;
- break;
-
-
- default:
- /*
- fprintf ( output_fp,
- "precv unhandled event: %s\n",
- pn_event_type_name(event_type)
- );
- */
- break;
-
- }
-
- pn_collector_pop ( collector );
- event = pn_collector_peek(collector);
- }
- }
- }
-
- all_done:
-
- for ( int i = 0; i < n_links; ++ i )
- {
- pn_link_close ( links[i] );
- }
-
- pn_session_close ( session );
- pn_connection_close ( connection );
-
- return 0;
-}
-
-
-
-
-
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/ca454180/examples/exampletest.py
----------------------------------------------------------------------
diff --git a/examples/exampletest.py b/examples/exampletest.py
new file mode 100644
index 0000000..d40b9cb
--- /dev/null
+++ b/examples/exampletest.py
@@ -0,0 +1,183 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License
+#
+
+# A test library to make it easy to run unittest tests that start,
+# monitor, and report output from sub-processes. In particular
+# it helps with starting processes that listen on random ports.
+
+import unittest
+import os, sys, socket, time, re, inspect, errno, threading
+from random import randrange
+from subprocess import Popen, PIPE, STDOUT
+from copy import copy
+import platform
+from os.path import dirname as dirname
+
+def pick_port():
+ """Pick a random port."""
+ p = randrange(10000, 20000)
+ return p
+
+class ProcError(Exception):
+ """An exception that captures failed process output"""
+ def __init__(self, proc, what="bad exit status"):
+ out = proc.out.strip()
+ if out:
+ out = "\nvvvvvvvvvvvvvvvv\n%s\n^^^^^^^^^^^^^^^^\n" % out
+ else:
+ out = ", no output)"
+ super(Exception, self, ).__init__(
+ "%s %s, code=%s%s" % (proc.args, what, proc.returncode, out))
+
+class NotFoundError(ProcError):
+ pass
+
+class Proc(Popen):
+ """A example process that stores its output, optionally run with valgrind."""
+
+ if "VALGRIND" in os.environ and os.environ["VALGRIND"]:
+ env_args = [os.environ["VALGRIND"], "--error-exitcode=42", "--quiet", "--leak-check=full"]
+ else:
+ env_args = []
+
+ @property
+ def out(self):
+ self._out.seek(0)
+ return self._out.read()
+
+ def __init__(self, args, **kwargs):
+ """Start an example process"""
+ args = list(args)
+ self.args = args
+ self._out = os.tmpfile()
+ try:
+ Popen.__init__(self, self.env_args + self.args, stdout=self._out, stderr=STDOUT, **kwargs)
+ except OSError, e:
+ if e.errno == errno.ENOENT:
+ raise NotFoundError(self, str(e))
+ raise ProcError(self, str(e))
+ except Exception, e:
+ raise ProcError(self, str(e))
+
+ def kill(self):
+ try:
+ if self.poll() is None:
+ Popen.kill(self)
+ except:
+ pass # Already exited.
+ return self.out
+
+ def wait_out(self, timeout=10, expect=0):
+ """Wait for process to exit, return output. Raise ProcError on failure."""
+ t = threading.Thread(target=self.wait)
+ t.start()
+ t.join(timeout)
+ if self.poll() is None: # Still running
+ self.kill()
+ raise ProcError(self, "timeout")
+ if expect is not None and self.poll() != expect:
+ raise ProcError(self)
+ return self.out
+
+# Work-around older python unittest that lacks setUpClass.
+if hasattr(unittest.TestCase, 'setUpClass') and hasattr(unittest.TestCase, 'tearDownClass'):
+ TestCase = unittest.TestCase
+else:
+ class TestCase(unittest.TestCase):
+ """
+ Roughly provides setUpClass and tearDownClass functionality for older python
+ versions in our test scenarios. If subclasses override setUp or tearDown
+ they *must* call the superclass.
+ """
+ def setUp(self):
+ if not hasattr(type(self), '_setup_class_count'):
+ type(self)._setup_class_count = len(
+ inspect.getmembers(
+ type(self),
+ predicate=lambda(m): inspect.ismethod(m) and m.__name__.startswith('test_')))
+ type(self).setUpClass()
+
+ def tearDown(self):
+ self.assertTrue(self._setup_class_count > 0)
+ self._setup_class_count -= 1
+ if self._setup_class_count == 0:
+ type(self).tearDownClass()
+
+class ExampleTestCase(TestCase):
+ """TestCase that manages started processes"""
+ def setUp(self):
+ super(ExampleTestCase, self).setUp()
+ self.procs = []
+
+ def tearDown(self):
+ for p in self.procs:
+ p.kill()
+ super(ExampleTestCase, self).tearDown()
+
+ def proc(self, *args, **kwargs):
+ p = Proc(*args, **kwargs)
+ self.procs.append(p)
+ return p
+
+def wait_port(port, timeout=10):
+ """Wait up to timeout for port to be connectable."""
+ if timeout:
+ deadline = time.time() + timeout
+ while (timeout is None or time.time() < deadline):
+ try:
+ s = socket.create_connection((None, port), timeout) # Works for IPv6 and v4
+ s.close()
+ return
+ except socket.error, e:
+ if e.errno != errno.ECONNREFUSED: # Only retry on connection refused error.
+ raise
+ raise socket.timeout()
+
+
+class BrokerTestCase(ExampleTestCase):
+ """
+ ExampleTest that starts a broker in setUpClass and kills it in tearDownClass.
+ Subclass must set `broker_exe` class variable with the name of the broker executable.
+ """
+
+ @classmethod
+ def setUpClass(cls):
+ cls.port = pick_port()
+ cls.addr = "127.0.0.1:%s/examples" % (cls.port)
+ cls.broker = None # In case Proc throws, create the attribute.
+ cls.broker = Proc(cls.broker_exe + ["-a", cls.addr])
+ try:
+ wait_port(cls.port)
+ except Exception, e:
+ cls.broker.kill()
+ raise ProcError(cls.broker, "timed out waiting for port")
+
+ @classmethod
+ def tearDownClass(cls):
+ if cls.broker: cls.broker.kill()
+
+ def tearDown(self):
+ b = type(self).broker
+ if b and b.poll() != None: # Broker crashed
+ type(self).setUpClass() # Start another for the next test.
+ raise ProcError(b, "broker crash")
+ super(BrokerTestCase, self).tearDown()
+
+if __name__ == "__main__":
+ unittest.main()
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/ca454180/proton-c/bindings/cpp/include/proton/io/connection_engine.hpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/include/proton/io/connection_engine.hpp b/proton-c/bindings/cpp/include/proton/io/connection_engine.hpp
index 70e6754..d9825c2 100644
--- a/proton-c/bindings/cpp/include/proton/io/connection_engine.hpp
+++ b/proton-c/bindings/cpp/include/proton/io/connection_engine.hpp
@@ -121,8 +121,8 @@ PN_CPP_CLASS_EXTERN connection_engine {
/// Configure a connection by applying exactly the options in opts (including proton::messaging_handler)
/// Does not apply any default options, to apply container defaults use connect() or accept()
- /// instead.
- void configure(const connection_options& opts=connection_options());
+ /// instead. If server==true, configure a server connection.
+ void configure(const connection_options& opts=connection_options(), bool server=false);
/// Call configure() with client options and call connection::open()
/// Options applied: container::id(), container::client_connection_options(), opts.
@@ -200,12 +200,13 @@ PN_CPP_CLASS_EXTERN connection_engine {
PN_CPP_EXTERN proton::container* container() const;
private:
+ void init();
connection_engine(const connection_engine&);
connection_engine& operator=(const connection_engine&);
messaging_handler* handler_;
proton::container* container_;
- pn_connection_engine_t c_engine_;
+ pn_connection_engine_t engine_;
};
} // io
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/ca454180/proton-c/bindings/cpp/include/proton/sender_options.hpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/include/proton/sender_options.hpp b/proton-c/bindings/cpp/include/proton/sender_options.hpp
index 64fea2f..0a618ff 100644
--- a/proton-c/bindings/cpp/include/proton/sender_options.hpp
+++ b/proton-c/bindings/cpp/include/proton/sender_options.hpp
@@ -90,10 +90,10 @@ class sender_options {
PN_CPP_EXTERN sender_options& auto_settle(bool);
/// Options for the source node of the sender.
- PN_CPP_EXTERN sender_options& source(source_options &);
+ PN_CPP_EXTERN sender_options& source(const source_options &);
/// Options for the receiver node of the receiver.
- PN_CPP_EXTERN sender_options& target(target_options &);
+ PN_CPP_EXTERN sender_options& target(const target_options &);
/// @cond INTERNAL
private:
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/ca454180/proton-c/bindings/cpp/src/io/connection_engine.cpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/src/io/connection_engine.cpp b/proton-c/bindings/cpp/src/io/connection_engine.cpp
index 4712b3e..5e6483f 100644
--- a/proton-c/bindings/cpp/src/io/connection_engine.cpp
+++ b/proton-c/bindings/cpp/src/io/connection_engine.cpp
@@ -40,35 +40,34 @@
namespace proton {
namespace io {
-connection_engine::connection_engine() :
- container_(0)
-{
- int err;
- if ((err = pn_connection_engine_init(&c_engine_)))
- throw proton::error(std::string("connection_engine init failed: ")+pn_code(err));
-}
-
-connection_engine::connection_engine(class container& cont, event_loop* loop) :
- container_(&cont)
-{
- int err;
- if ((err = pn_connection_engine_init(&c_engine_)))
- throw proton::error(std::string("connection_engine init failed: ")+pn_code(err));
+void connection_engine::init() {
+ if (pn_connection_engine_init(&engine_, pn_connection(), pn_transport()) != 0) {
+ this->~connection_engine(); // Dtor won't be called on throw from ctor.
+ throw proton::error(std::string("connection_engine allocation failed"));
+ }
+}
+
+connection_engine::connection_engine() : handler_(0), container_(0) { init(); }
+
+connection_engine::connection_engine(class container& cont, event_loop* loop) : handler_(0), container_(&cont) {
+ init();
connection_context& ctx = connection_context::get(connection());
ctx.container = container_;
ctx.event_loop.reset(loop);
}
-void connection_engine::configure(const connection_options& opts) {
- proton::connection c = connection();
+connection_engine::~connection_engine() {
+ pn_connection_engine_destroy(&engine_);
+}
+
+void connection_engine::configure(const connection_options& opts, bool server) {
+ proton::connection c(connection());
opts.apply_unbound(c);
+ if (server) pn_transport_set_server(engine_.transport);
+ pn_connection_engine_bind(&engine_);
opts.apply_bound(c);
handler_ = opts.handler();
- connection_context::get(connection()).collector = c_engine_.collector;
-}
-
-connection_engine::~connection_engine() {
- pn_connection_engine_final(&c_engine_);
+ connection_context::get(connection()).collector = engine_.collector;
}
void connection_engine::connect(const connection_options& opts) {
@@ -78,7 +77,7 @@ void connection_engine::connect(const connection_options& opts) {
all.update(container_->client_connection_options());
}
all.update(opts);
- configure(all);
+ configure(all, false);
connection().open();
}
@@ -89,12 +88,12 @@ void connection_engine::accept(const connection_options& opts) {
all.update(container_->server_connection_options());
}
all.update(opts);
- configure(all);
+ configure(all, true);
}
bool connection_engine::dispatch() {
pn_event_t* c_event;
- while ((c_event = pn_connection_engine_dispatch(&c_engine_)) != NULL) {
+ while ((c_event = pn_connection_engine_event(&engine_)) != NULL) {
proton_event cpp_event(c_event, container_);
try {
if (handler_ != 0) {
@@ -102,51 +101,56 @@ bool connection_engine::dispatch() {
cpp_event.dispatch(adapter);
}
} catch (const std::exception& e) {
- disconnected(error_condition("exception", e.what()));
+ pn_condition_t *cond = pn_transport_condition(engine_.transport);
+ if (!pn_condition_is_set(cond)) {
+ pn_condition_format(cond, "exception", "%s", e.what());
+ }
}
+ pn_connection_engine_pop_event(&engine_);
}
- return !pn_connection_engine_finished(&c_engine_);
+ return !pn_connection_engine_finished(&engine_);
}
mutable_buffer connection_engine::read_buffer() {
- pn_rwbytes_t buffer = pn_connection_engine_read_buffer(&c_engine_);
+ pn_rwbytes_t buffer = pn_connection_engine_read_buffer(&engine_);
return mutable_buffer(buffer.start, buffer.size);
}
void connection_engine::read_done(size_t n) {
- return pn_connection_engine_read_done(&c_engine_, n);
+ return pn_connection_engine_read_done(&engine_, n);
}
void connection_engine::read_close() {
- pn_connection_engine_read_close(&c_engine_);
+ pn_connection_engine_read_close(&engine_);
}
const_buffer connection_engine::write_buffer() {
- pn_bytes_t buffer = pn_connection_engine_write_buffer(&c_engine_);
+ pn_bytes_t buffer = pn_connection_engine_write_buffer(&engine_);
return const_buffer(buffer.start, buffer.size);
}
void connection_engine::write_done(size_t n) {
- return pn_connection_engine_write_done(&c_engine_, n);
+ return pn_connection_engine_write_done(&engine_, n);
}
void connection_engine::write_close() {
- pn_connection_engine_write_close(&c_engine_);
+ pn_connection_engine_write_close(&engine_);
}
void connection_engine::disconnected(const proton::error_condition& err) {
- pn_condition_t* condition = pn_connection_engine_condition(&c_engine_);
- if (!pn_condition_is_set(condition)) // Don't overwrite existing condition
+ pn_condition_t* condition = pn_transport_condition(engine_.transport);
+ if (!pn_condition_is_set(condition)) {
set_error_condition(err, condition);
- pn_connection_engine_disconnected(&c_engine_);
+ }
+ pn_connection_engine_close(&engine_);
}
proton::connection connection_engine::connection() const {
- return make_wrapper(c_engine_.connection);
+ return make_wrapper(engine_.connection);
}
proton::transport connection_engine::transport() const {
- return make_wrapper(c_engine_.transport);
+ return make_wrapper(engine_.transport);
}
proton::container* connection_engine::container() const {
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/ca454180/proton-c/bindings/cpp/src/sender_options.cpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/src/sender_options.cpp b/proton-c/bindings/cpp/src/sender_options.cpp
index f5d5525..bed4a69 100644
--- a/proton-c/bindings/cpp/src/sender_options.cpp
+++ b/proton-c/bindings/cpp/src/sender_options.cpp
@@ -108,8 +108,8 @@ void sender_options::update(const sender_options& x) { impl_->update(*x.impl_);
sender_options& sender_options::handler(class messaging_handler &h) { impl_->handler = &h; return *this; }
sender_options& sender_options::delivery_mode(proton::delivery_mode m) {impl_->delivery_mode = m; return *this; }
sender_options& sender_options::auto_settle(bool b) {impl_->auto_settle = b; return *this; }
-sender_options& sender_options::source(source_options &s) {impl_->source = s; return *this; }
-sender_options& sender_options::target(target_options &s) {impl_->target = s; return *this; }
+sender_options& sender_options::source(const source_options &s) {impl_->source = s; return *this; }
+sender_options& sender_options::target(const target_options &s) {impl_->target = s; return *this; }
void sender_options::apply(sender& s) const { impl_->apply(s); }
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/ca454180/proton-c/docs/api/index.md
----------------------------------------------------------------------
diff --git a/proton-c/docs/api/index.md b/proton-c/docs/api/index.md
index 10aea84..ccd679d 100644
--- a/proton-c/docs/api/index.md
+++ b/proton-c/docs/api/index.md
@@ -1,5 +1,39 @@
Proton Documentation {#index}
====================
-The proton library contains two APIs: The [Engine API](@ref engine),
-and the [Messenger API](@ref messenger).
+## The Protocol Engine
+
+The [Engine API](@ref engine) is a "pure AMQP" toolkit, it decodes AMQP bytes
+into proton [events](@ref event) and generates AMQP bytes from application
+calls.
+
+The [connection engine](@ref connection_engine) provides a simple bytes in/bytes
+out, event-driven interface so you can read AMQP data from any source, process
+the resulting [events](@ref event) and write AMQP output to any destination.
+
+There is no IO or threading code in this part of the library, so it can be
+embedded in many different environments. The proton project provides language
+bindings (Python, Ruby, Go etc.) that embed it into the standard IO and
+threading facilities of the bound language.
+
+## Integrating with IO
+
+The [Proactor API](@ref proactor) is a pro-active, asynchronous framewokr to
+build single or multi-threaded Proton C applications.
+
+For advanced use-cases it is possible to write your own implementation of the
+proactor API for an unusual IO or threading framework. Any proton application
+written to the proactor API will be able to use your implementation.
+
+## Messenger and Reactor APIs
+
+The [Messenger](@ref messenger) [Reactor](@ref reactor) APIs were intended
+to be simple APIs that included IO support directly out of the box.
+
+They both had good points but were both based on the assumption of a single-threaded
+environment using a POSIX-like poll() call. This was a problem for performance on some
+platforms and did not support multi-threaded applications.
+
+Note however that application code which interacts with the AMQP @ref engine and
+processes AMQP @ref "events" event is the same for the proactor and reactor APIs,
+so is quite easy to convert. The main difference is in how connections are set up.
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/ca454180/proton-c/include/proton/cid.h
----------------------------------------------------------------------
diff --git a/proton-c/include/proton/cid.h b/proton-c/include/proton/cid.h
index 1e4715f..cd68de4 100644
--- a/proton-c/include/proton/cid.h
+++ b/proton-c/include/proton/cid.h
@@ -56,7 +56,10 @@ typedef enum {
CID_pn_selector,
CID_pn_selectable,
- CID_pn_url
+ CID_pn_url,
+
+ CID_pn_listener,
+ CID_pn_proactor
} pn_cid_t;
#endif /* cid.h */
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/ca454180/proton-c/include/proton/condition.h
----------------------------------------------------------------------
diff --git a/proton-c/include/proton/condition.h b/proton-c/include/proton/condition.h
index cf2f8aa..ae2beff 100644
--- a/proton-c/include/proton/condition.h
+++ b/proton-c/include/proton/condition.h
@@ -165,6 +165,10 @@ PN_EXTERN const char *pn_condition_redirect_host(pn_condition_t *condition);
*/
PN_EXTERN int pn_condition_redirect_port(pn_condition_t *condition);
+PN_EXTERN int pn_condition_copy(pn_condition_t *dest, pn_condition_t *src);
+PN_EXTERN pn_condition_t *pn_condition(void);
+PN_EXTERN void pn_condition_free(pn_condition_t *);
+
/** @}
*/
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/ca454180/proton-c/include/proton/connection.h
----------------------------------------------------------------------
diff --git a/proton-c/include/proton/connection.h b/proton-c/include/proton/connection.h
index c5b5490..0ed23b0 100644
--- a/proton-c/include/proton/connection.h
+++ b/proton-c/include/proton/connection.h
@@ -82,6 +82,7 @@ extern "C" {
*/
#define PN_REMOTE_MASK (PN_REMOTE_UNINIT | PN_REMOTE_ACTIVE | PN_REMOTE_CLOSED)
+PN_EXTERN pn_connection_t *pn_connection(void);
/**
* Factory to construct a new Connection.
*
@@ -148,6 +149,13 @@ PN_EXTERN pn_error_t *pn_connection_error(pn_connection_t *connection);
PN_EXTERN void pn_connection_collect(pn_connection_t *connection, pn_collector_t *collector);
/**
+ * Get the collector set with pn_connection_collect()
+ * @return NULL if pn_connection_collect() has not been called.
+*/
+PN_EXTERN pn_collector_t* pn_connection_collector(pn_connection_t *connection);
+
+
+/**
* @deprecated
* Get the application context that is associated with a connection
* object.
@@ -477,6 +485,16 @@ PN_EXTERN pn_data_t *pn_connection_remote_properties(pn_connection_t *connection
*/
PN_EXTERN pn_transport_t *pn_connection_transport(pn_connection_t *connection);
+/**
+ * Create a connection with `size` bytes of extra aligned storage in the same heap block.
+ */
+PN_EXTERN pn_connection_t* pn_connection_with_extra(size_t size);
+
+/**
+ * Get the start and size of extra storage allocated by pn_connection_extra()
+ */
+PN_EXTERN pn_rwbytes_t pn_connection_get_extra(pn_connection_t *connection);
+
/** @}
*/
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/ca454180/proton-c/include/proton/connection_engine.h
----------------------------------------------------------------------
diff --git a/proton-c/include/proton/connection_engine.h b/proton-c/include/proton/connection_engine.h
index d9df77b..b7022a9 100644
--- a/proton-c/include/proton/connection_engine.h
+++ b/proton-c/include/proton/connection_engine.h
@@ -20,160 +20,289 @@
* under the License.
*/
-///@file
-///
-/// **Experimental** The Connection Engine API wraps up the proton engine
-/// objects associated with a single connection: pn_connection_t, pn_transport_t
-/// and pn_collector_t. It provides a simple bytes-in/bytes-out interface for IO
-/// and generates pn_event_t events to be handled by the application.
-///
-/// The connection engine can be fed with raw AMQP bytes from any source, and it
-/// generates AMQP byte output to be written to any destination. You can use the
-/// engine to integrate proton AMQP with any IO library, or native IO on any
-/// platform.
-///
-/// The engine is not thread safe but each engine is independent. Separate
-/// engines can be used concurrently. For example a multi-threaded application
-/// can process connections in multiple threads, but serialize work for each
-/// connection to the corresponding engine.
-///
-/// The engine is designed to be thread and IO neutral so it can be integrated with
-/// single or multi-threaded code in reactive or proactive IO frameworks.
-///
-/// Summary of use:
-///
-/// - while !pn_connection_engine_finished()
-/// - Call pn_connection_engine_dispatch() to dispatch events until it returns NULL.
-/// - Read data from your source into pn_connection_engine_read_buffer()
-/// - Call pn_connection_engine_read_done() when complete.
-/// - Write data from pn_connection_engine_write_buffer() to your destination.
-/// - Call pn_connection_engine_write_done() to indicate how much was written.
-///
-/// Note on blocking: the _read/write_buffer and _read/write_done functions can
-/// all generate events that may cause the engine to finish. Before you wait for
-/// IO, always drain pn_connection_engine_dispatch() till it returns NULL and
-/// check pn_connection_engine_finished() in case there is nothing more to do..
-///
-/// Note on error handling: the pn_connection_engine_*() functions do not return
-/// an error code. If an error occurs it will be reported as a
-/// PN_TRANSPORT_ERROR event and pn_connection_engine_finished() will return
-/// true once all final events have been processed.
-///
-/// @defgroup connection_engine The Connection Engine
-/// @{
-///
-
-#include <proton/condition.h>
-#include <proton/event.h>
+/**
+ * @file
+ *
+ * **Experimental** The connection IO API is a set of functions to simplify
+ * integrating proton with different IO and concurrency platforms. The portable
+ * parts of a Proton application should use the @ref engine types. We will
+ * use "application" to mean the portable part of the application and
+ * "integration" to mean code that integrates with a particular IO platform.
+ *
+ * The connection_engine functions take a @ref pn_connection_t\*, and perform common
+ * tasks involving the @ref pn_connection_t and it's @ref pn_transport_t and
+ * @ref pn_collector_t so you can treat them as a unit. You can also work with
+ * these types directly for features not available via @ref connection_engine API.
+ *
+ * @defgroup connection_engine Connection Engine
+ *
+ * **Experimental**: Toolkit for integrating proton with arbitrary network or IO
+ * transports. Provides a single point of control for an AMQP connection and
+ * a simple bytes-in/bytes-out interface that lets you:
+ *
+ * - process AMQP-encoded bytes from some input byte stream
+ * - generate @ref pn_event_t events for your application to handle
+ * - encode resulting AMQP output bytes for some output byte stream
+ *
+ * The engine contains a @ref pn_connection_t, @ref pn_transport_t and @ref
+ * pn_collector_t and provides functions to operate on all three as a unit for
+ * IO integration. You can also use them directly for anything not covered by
+ * this API
+ *
+ * For example a simple blocking IO integration with the imaginary "my_io" library:
+ *
+ * pn_connection_engine_t ce;
+ * pn_connection_engine_init(&ce);
+ * while (!pn_connection_engine_finished(&ce) {
+ * // Dispatch events to be handled by the application.
+ * pn_event_t *e;
+ * while ((e = pn_connection_engine_event(&ce))!= NULL) {
+ * my_app_handle(e); // Pass to the application handler
+ * switch (pn_event_type(e)) {
+ * case PN_CONNECTION_INIT: pn_connection_engine_bind(&ce);
+ * // Only for full-duplex IO where read/write can shutdown separately.
+ * case PN_TRANSPORT_CLOSE_READ: my_io_shutdown_read(...); break;
+ * case PN_TRANSPORT_CLOSE_WRITE: my_io_shutdown_write(...); break;
+ * default: break;
+ * };
+ * e = pn_connection_engine_pop_event(&ce);
+ * }
+ * // Read from my_io into the connection buffer
+ * pn_rwbytes_t readbuf = pn_connection_engine_read_buffer(&ce);
+ * if (readbuf.size) {
+ * size_t n = my_io_read(readbuf.start, readbuf.size, ...);
+ * if (n > 0) {
+ * pn_connection_engine_read_done(&ce, n);
+ * } else if (n < 0) {
+ * pn_connection_engine_errorf(&ce, "read-err", "something-bad (%d): %s", n, ...);
+ * pn_connection_engine_read_close(&ce);
+ * }
+ * }
+ * // Write from connection buffer to my_io
+ * pn_bytes_t writebuf = pn_connection_engine_write_buffer(&ce);
+ * if (writebuf.size) {
+ * size_t n = my_io_write_data(writebuf.start, writebuf.size, ...);
+ * if (n < 0) {
+ * pn_connection_engine_errorf(&ce, "write-err", "something-bad (%d): %s", d, ...);
+ * pn_connection_engine_write_close(&ce);
+ * } else {
+ * pn_connection_engine_write_done(&ce, n);
+ * }
+ * }
+ * }
+ * // If my_io doesn't have separate read/write shutdown, then we should close it now.
+ * my_io_close(...);
+ *
+ * AMQP is a full-duplex, asynchronous protocol. The "read" and "write" sides of
+ * an AMQP connection can close separately, the example shows how to handle this
+ * for full-duplex IO or IO with a simple close.
+ *
+ * The engine buffers events, you must keep processing till
+ * pn_connection_engine_finished() is true, to ensure all reading, writing and event
+ * handling (including ERROR and FINAL events) is completely finished.
+ *
+ * ## Error handling
+ *
+ * The pn_connection_engine_*() functions do not return an error code. IO errors set
+ * the transport condition and are returned as a PN_TRANSPORT_ERROR. The integration
+ * code can set errors using pn_connection_engine_errorf()
+ *
+ * ## Other IO patterns
+ *
+ * This API supports asynchronous, proactive, non-blocking and reactive IO. An
+ * integration does not have to follow the dispatch-read-write sequence above,
+ * but note that you should handle all available events before calling
+ * pn_connection_engine_read_buffer() and check that `size` is non-zero before
+ * starting a blocking or asynchronous read call. A `read` started while there
+ * are unprocessed CLOSE events in the buffer may never complete.
+ *
+ * ## Thread safety
+ *
+ * The @ref engine types are not thread safe, but each connection and its
+ * associated types forms an independent unit. Different connections can be
+ * processed concurrently by different threads.
+ *
+ * @defgroup connection_engine Connection IO
+ * @{
+ */
+
#include <proton/import_export.h>
+#include <proton/event.h>
#include <proton/types.h>
+#include <stdarg.h>
+
#ifdef __cplusplus
extern "C" {
#endif
-/// A connection engine is a trio of pn_connection_t, pn_transport_t and pn_collector_t.
-/// Use the pn_connection_engine_*() functions to operate on it.
-/// It is a plain struct, not a proton object. Use pn_connection_engine_init to set up
-/// the initial objects and pn_connection_engine_final to release them.
-///
+/**
+ * Struct containing a connection, transport and collector. See
+ * pn_connection_engine_init(), pn_connection_engine_destroy() and pn_connection_engine()
+ */
typedef struct pn_connection_engine_t {
- pn_connection_t* connection;
- pn_transport_t* transport;
- pn_collector_t* collector;
- pn_event_t* event;
+ pn_connection_t *connection;
+ pn_transport_t *transport;
+ pn_collector_t *collector;
} pn_connection_engine_t;
-/// Initialize a pn_connection_engine_t struct with a new connection and
-/// transport.
-///
-/// Call pn_connection_engine_final to free resources when you are done.
-///
-///@return 0 on success, a proton error code on failure (@see error.h)
-///
-PN_EXTERN int pn_connection_engine_init(pn_connection_engine_t* engine);
-
-/// Free resources used by the engine, set the connection and transport pointers
-/// to NULL.
-PN_EXTERN void pn_connection_engine_final(pn_connection_engine_t* engine);
-
-/// Get the engine's read buffer. Read data from your IO source to buf.start, up
-/// to a max of buf.size. Then call pn_connection_engine_read_done().
-///
-/// buf.size==0 means the engine cannot read presently, calling
-/// pn_connection_engine_dispatch() may create more buffer space.
-///
-PN_EXTERN pn_rwbytes_t pn_connection_engine_read_buffer(pn_connection_engine_t*);
-
-/// Consume the first n bytes of data in pn_connection_engine_read_buffer() and
-/// update the buffer.
-PN_EXTERN void pn_connection_engine_read_done(pn_connection_engine_t*, size_t n);
-
-/// Close the read side of the transport when no more data is available.
-/// Note there may still be events for pn_connection_engine_dispatch() or data
-/// in pn_connection_engine_write_buffer()
-PN_EXTERN void pn_connection_engine_read_close(pn_connection_engine_t*);
-
-/// Get the engine's write buffer. Write data from buf.start to your IO destination,
-/// up to a max of buf.size. Then call pn_connection_engine_write_done().
-///
-/// buf.size==0 means the engine has nothing to write presently. Calling
-/// pn_connection_engine_dispatch() may generate more data.
-PN_EXTERN pn_bytes_t pn_connection_engine_write_buffer(pn_connection_engine_t*);
-
-/// Call when the first n bytes of pn_connection_engine_write_buffer() have been
-/// written to IO and can be re-used for new data. Updates the buffer.
-PN_EXTERN void pn_connection_engine_write_done(pn_connection_engine_t*, size_t n);
-
-/// Call when the write side of IO has closed and no more data can be written.
-/// Note that there may still be events for pn_connection_engine_dispatch() or
-/// data to read into pn_connection_engine_read_buffer().
-PN_EXTERN void pn_connection_engine_write_close(pn_connection_engine_t*);
-
-/// Close both sides of the transport, equivalent to
-/// pn_connection_engine_read_close(); pn_connection_engine_write_close()
-///
-/// You must still call pn_connection_engine_dispatch() to process final
-/// events.
-///
-/// To provide transport error information to the handler, set it on
-/// pn_connection_engine_condition()
-/// *before* calling pn_connection_engine_disconnected(). This sets
-/// the error on the pn_transport_t object.
-///
-/// Note this does *not* modify the pn_connection_t, so you can distinguish
-/// between a connection close error sent by the remote peer (which will set the
-/// connection condition) and a transport error (which sets the transport
-/// condition.)
-///
-PN_EXTERN void pn_connection_engine_disconnected(pn_connection_engine_t*);
-
-/// Get the next available event.
-/// Call in a loop until it returns NULL to dispatch all available events.
-/// Note this call may modify the read and write buffers.
-///
-/// @return Pointer to the next event, or NULL if there are none available.
-///
-PN_EXTERN pn_event_t* pn_connection_engine_dispatch(pn_connection_engine_t*);
-
-/// Return true if the engine is finished - all data has been written, all
-/// events have been handled and the transport is closed.
-PN_EXTERN bool pn_connection_engine_finished(pn_connection_engine_t*);
-
-/// Get the AMQP connection, owned by the pn_connection_engine_t.
-PN_EXTERN pn_connection_t* pn_connection_engine_connection(pn_connection_engine_t*);
-
-/// Get the proton transport, owned by the pn_connection_engine_t.
-PN_EXTERN pn_transport_t* pn_connection_engine_transport(pn_connection_engine_t*);
-
-/// Get the condition object for the engine's transport.
-///
-/// Note that IO errors should be set on this, the transport condition, not on
-/// the pn_connection_t condition. The connection's condition is for errors
-/// received via the AMQP protocol, the transport condition is for errors in the
-/// the IO layer such as a socket read or disconnect errors.
-///
-PN_EXTERN pn_condition_t* pn_connection_engine_condition(pn_connection_engine_t*);
+/**
+ * Set #connection and #transport to the provided values, or create a new
+ * @ref pn_connection_t or @ref pn_transport_t if either is NULL.
+ * The provided values belong to the connection engine and will be freed by
+ * pn_connection_engine_destroy()
+ *
+ * Create a new @ref pn_collector_t and set as #collector.
+ *
+ * The transport and connection are *not* bound at this point. You should
+ * configure them as needed and let the application handle the
+ * PN_CONNECTION_INIT from pn_connection_engine_event() before calling
+ * pn_connection_engine_bind().
+ *
+ * @return if any allocation fails call pn_connection_engine_destroy() and return PN_OUT_OF_MEMORY
+ */
+PN_EXTERN int pn_connection_engine_init(pn_connection_engine_t*, pn_connection_t*, pn_transport_t*);
+
+/**
+ * Bind the connection to the transport when the external IO is ready.
+ *
+ * The following functions (if called at all) must be called *before* bind:
+ * pn_connection_set_username(), pn_connection_set_password(), pn_transport_set_server()
+ *
+ * If there is an external IO error during setup, set a transport error, close
+ * the transport and then bind. The error events are reported to the application
+ * via pn_connection_engine_event().
+ *
+ * @return an error code if the bind fails.
+ */
+PN_EXTERN int pn_connection_engine_bind(pn_connection_engine_t *);
+
+/**
+ * Unbind, release and free #connection, #transpot and #collector. Set all pointers to NULL.
+ * Does not free the @ref pn_connection_engine_t struct itself.
+ */
+PN_EXTERN void pn_connection_engine_destroy(pn_connection_engine_t *);
+
+/**
+ * Get the read buffer.
+ *
+ * Copy data from your input byte source to buf.start, up to buf.size.
+ * Call pn_connection_engine_read_done() when reading is complete.
+ *
+ * buf.size==0 means reading is not possible: no buffer space or the read side is closed.
+ */
+PN_EXTERN pn_rwbytes_t pn_connection_engine_read_buffer(pn_connection_engine_t *);
+
+/**
+ * Process the first n bytes of data in pn_connection_engine_read_buffer() and
+ * reclaim the buffer space.
+ */
+PN_EXTERN void pn_connection_engine_read_done(pn_connection_engine_t *, size_t n);
+
+/**
+ * Close the read side. Call when the IO can no longer be read.
+ */
+PN_EXTERN void pn_connection_engine_read_close(pn_connection_engine_t *);
+
+/**
+ * True if read side is closed.
+ */
+PN_EXTERN bool pn_connection_engine_read_closed(pn_connection_engine_t *);
+
+/**
+ * Get the write buffer.
+ *
+ * Write data from buf.start to your IO destination, up to a max of buf.size.
+ * Call pn_connection_engine_write_done() when writing is complete.
+ *
+ * buf.size==0 means there is nothing to write.
+ */
+ PN_EXTERN pn_bytes_t pn_connection_engine_write_buffer(pn_connection_engine_t *);
+
+/**
+ * Call when the first n bytes of pn_connection_engine_write_buffer() have been
+ * written to IO. Reclaims the buffer space and reset the write buffer.
+ */
+PN_EXTERN void pn_connection_engine_write_done(pn_connection_engine_t *, size_t n);
+
+/**
+ * Close the write side. Call when IO can no longer be written to.
+ */
+PN_EXTERN void pn_connection_engine_write_close(pn_connection_engine_t *);
+
+/**
+ * True if write side is closed.
+ */
+PN_EXTERN bool pn_connection_engine_write_closed(pn_connection_engine_t *);
+
+/**
+ * Close both sides side.
+ */
+PN_EXTERN void pn_connection_engine_close(pn_connection_engine_t * c);
+
+/**
+ * Get the current event. Call pn_connection_engine_done() when done handling it.
+ * Note that if PN_TRACE_EVT is enabled this will log the event, so you should
+ * avoid calling it more than once per event. Use pn_connection_engine_has_event()
+ * to silently test if any events are available.
+ *
+ * @return NULL if there are no more events ready. Reading/writing data may produce more.
+ */
+PN_EXTERN pn_event_t* pn_connection_engine_event(pn_connection_engine_t *);
+
+/**
+ * True if pn_connection_engine_event() will return a non-NULL event.
+ */
+PN_EXTERN bool pn_connection_engine_has_event(pn_connection_engine_t *);
+
+/**
+ * Drop the current event, advance pn_connection_engine_event() to the next event.
+ */
+PN_EXTERN void pn_connection_engine_pop_event(pn_connection_engine_t *);
+
+/**
+ * Return true if the the engine is closed for reading and writing and there are
+ * no more events.
+ *
+ * Call pn_connection_engine_free() to free all related memory.
+ */
+PN_EXTERN bool pn_connection_engine_finished(pn_connection_engine_t *);
+
+/**
+ * Set IO error information.
+ *
+ * The name and formatted description are set on the transport condition, and
+ * returned as a PN_TRANSPORT_ERROR event from pn_connection_engine_event().
+ *
+ * You must call this *before* pn_connection_engine_read_close() or
+ * pn_connection_engine_write_close() to ensure the error is processed.
+ *
+ * If there is already a transport condition set, this call does nothing. For
+ * more complex cases, you can work with the transport condition directly using:
+ *
+ * pn_condition_t *cond = pn_transport_condition(pn_connection_transport(conn));
+ */
+PN_EXTERN void pn_connection_engine_errorf(pn_connection_engine_t *ce, const char *name, const char *fmt, ...);
+
+/**
+ * Set IO error information via a va_list, see pn_connection_engine_errorf()
+ */
+PN_EXTERN void pn_connection_engine_verrorf(pn_connection_engine_t *ce, const char *name, const char *fmt, va_list);
+
+/**
+ * Log a string message using the connection's transport log.
+ */
+PN_EXTERN void pn_connection_engine_log(pn_connection_engine_t *ce, const char *msg);
+
+/**
+ * Log a printf formatted message using the connection's transport log.
+ */
+PN_EXTERN void pn_connection_engine_logf(pn_connection_engine_t *ce, char *fmt, ...);
+
+/**
+ * Log a printf formatted message using the connection's transport log.
+ */
+PN_EXTERN void pn_connection_engine_vlogf(pn_connection_engine_t *ce, const char *fmt, va_list ap);
///@}
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/ca454180/proton-c/include/proton/event.h
----------------------------------------------------------------------
diff --git a/proton-c/include/proton/event.h b/proton-c/include/proton/event.h
index d10927b..4dca2d5 100644
--- a/proton-c/include/proton/event.h
+++ b/proton-c/include/proton/event.h
@@ -277,13 +277,19 @@ typedef enum {
PN_TRANSPORT_ERROR,
/**
- * Indicates that the head of the transport has been closed. This
+ * Indicates that the "head" or writing end of the transport has been closed. This
* means the transport will never produce more bytes for output to
* the network. Events of this type point to the relevant transport.
*/
PN_TRANSPORT_HEAD_CLOSED,
/**
+ * The write side of the transport is closed, it will no longer produce bytes
+ * to write to external IO. Synonynm for PN_TRANSPORT_HEAD_CLOSED
+ */
+ PN_TRANSPORT_WRITE_CLOSED = PN_TRANSPORT_HEAD_CLOSED,
+
+ /**
* Indicates that the tail of the transport has been closed. This
* means the transport will never be able to process more bytes from
* the network. Events of this type point to the relevant transport.
@@ -291,6 +297,12 @@ typedef enum {
PN_TRANSPORT_TAIL_CLOSED,
/**
+ * The read side of the transport is closed, it will no longer read bytes
+ * from external IO. Synonynm for PN_TRANSPORT_TAIL_CLOSED
+ */
+ PN_TRANSPORT_READ_CLOSED = PN_TRANSPORT_TAIL_CLOSED,
+
+ /**
* Indicates that the both the head and tail of the transport are
* closed. Events of this type point to the relevant transport.
*/
@@ -302,7 +314,39 @@ typedef enum {
PN_SELECTABLE_WRITABLE,
PN_SELECTABLE_ERROR,
PN_SELECTABLE_EXPIRED,
- PN_SELECTABLE_FINAL
+ PN_SELECTABLE_FINAL,
+
+ /**
+ * pn_connection_wake() was called.
+ * Events of this type point to the @ref pn_connection_t.
+ */
+ PN_CONNECTION_WAKE,
+
+ /**
+ * pn_listener_close() was called or an error occurred, see pn_listener_condition()
+ * Events of this type point to the @ref pn_listener_t.
+ */
+ PN_LISTENER_CLOSE,
+
+ /**
+ * pn_proactor_interrupt() was called to interrupt a proactor thread
+ * Events of this type point to the @ref pn_proactor_t.
+ */
+ PN_PROACTOR_INTERRUPT,
+
+ /**
+ * pn_proactor_set_timeout() time limit expired.
+ * Events of this type point to the @ref pn_proactor_t.
+ */
+ PN_PROACTOR_TIMEOUT,
+
+ /**
+ * The proactor becaome inactive: all listeners and connections are closed and
+ * their events processed, the timeout is expired.
+ *
+ * Events of this type point to the @ref pn_proactor_t.
+ */
+ PN_PROACTOR_INACTIVE
} pn_event_type_t;
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/ca454180/proton-c/include/proton/extra.h
----------------------------------------------------------------------
diff --git a/proton-c/include/proton/extra.h b/proton-c/include/proton/extra.h
new file mode 100644
index 0000000..ea2e1ef
--- /dev/null
+++ b/proton-c/include/proton/extra.h
@@ -0,0 +1,69 @@
+#ifndef EXTRA_H
+#define EXTRA_H
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include <proton/type_compat.h>
+#include <proton/types.h>
+#include <stddef.h>
+#include <stdlib.h>
+
+/**
+ * @cond INTERNAL
+ * Support for allocating extra aligned memory after a type.
+ */
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+/**
+ * extra_t contains a size and is maximally aligned so the memory immediately
+ * after it can store any type of value.
+ */
+typedef union pn_extra_t {
+ size_t size;
+#if __STDC_VERSION__ >= 201112
+ max_align_t max;
+#else
+/* Not standard but fairly safe */
+ uint64_t i;
+ long double d;
+ void *v;
+ void (*fp)(void);
+#endif
+} pn_extra_t;
+
+static inline pn_rwbytes_t pn_extra_rwbytes(pn_extra_t *x) {
+ return pn_rwbytes(x->size, (char*)(x+1));
+}
+
+/* Declare private helper struct for T */
+#define PN_EXTRA_DECLARE(T) typedef struct T##__extra { T base; pn_extra_t extra; } T##__extra
+#define PN_EXTRA_SIZEOF(T, N) (sizeof(T##__extra)+(N))
+#define PN_EXTRA_GET(T, P) pn_extra_rwbytes(&((T##__extra*)(P))->extra)
+
+#ifdef __cplusplus
+}
+#endif
+
+/** @endcond */
+
+#endif // EXTRA_H
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/ca454180/proton-c/include/proton/listener.h
----------------------------------------------------------------------
diff --git a/proton-c/include/proton/listener.h b/proton-c/include/proton/listener.h
new file mode 100644
index 0000000..f55479b
--- /dev/null
+++ b/proton-c/include/proton/listener.h
@@ -0,0 +1,76 @@
+#ifndef PROTON_LISTENER_H
+#define PROTON_LISTENER_H
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include <proton/types.h>
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+/**
+ * @file
+ *
+ * Listener API for the proton @proactor
+ *
+ * @defgroup listener Listener
+ * @ingroup proactor
+ * @{
+ */
+
+typedef struct pn_proactor_t pn_proactor_t;
+typedef struct pn_condition_t pn_condition_t;
+
+/**
+ * Listener accepts connections, see pn_proactor_listen()
+ */
+typedef struct pn_listener_t pn_listener_t;
+
+/**
+ * The proactor that created the listener.
+ */
+pn_proactor_t *pn_listener_proactor(pn_listener_t *c);
+
+/**
+ * Get the error condition for a listener.
+ */
+pn_condition_t *pn_listener_condition(pn_listener_t *l);
+
+/**
+ * Get the user-provided value associated with the listener in pn_proactor_listen()
+ * The start address is aligned so you can cast it to any type.
+ */
+pn_rwbytes_t pn_listener_get_extra(pn_listener_t*);
+
+/**
+ * Close the listener (thread safe).
+ */
+void pn_listener_close(pn_listener_t *l);
+
+/**
+ *@}
+ */
+
+#ifdef __cplusplus
+}
+#endif
+
+#endif // PROTON_LISTENER_H
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/ca454180/proton-c/include/proton/object.h
----------------------------------------------------------------------
diff --git a/proton-c/include/proton/object.h b/proton-c/include/proton/object.h
index bafdcf4..0433b58 100644
--- a/proton-c/include/proton/object.h
+++ b/proton-c/include/proton/object.h
@@ -159,6 +159,29 @@ PREFIX ## _t *PREFIX ## _new(void) { \
PREFIX ## _inspect \
}
+/* Class to identify a plain C struct in a pn_event_t. No refcounting or memory management. */
+#define PN_STRUCT_CLASSDEF(PREFIX, CID) \
+ const pn_class_t *PREFIX ## __class(void); \
+ static const pn_class_t *PREFIX ## _reify(void *p) { return PREFIX ## __class(); } \
+ const pn_class_t *PREFIX ## __class(void) { \
+ static const pn_class_t clazz = { \
+ #PREFIX, \
+ CID, \
+ NULL, /*_new*/ \
+ NULL, /*_initialize*/ \
+ pn_void_incref, \
+ pn_void_decref, \
+ pn_void_refcount, \
+ NULL, /* _finalize */ \
+ NULL, /* _free */ \
+ PREFIX ## _reify, \
+ pn_void_hashcode, \
+ pn_void_compare, \
+ pn_void_inspect \
+ }; \
+ return &clazz; \
+ }
+
PN_EXTERN pn_cid_t pn_class_id(const pn_class_t *clazz);
PN_EXTERN const char *pn_class_name(const pn_class_t *clazz);
PN_EXTERN void *pn_class_new(const pn_class_t *clazz, size_t size);
@@ -181,6 +204,10 @@ PN_EXTERN intptr_t pn_class_compare(const pn_class_t *clazz, void *a, void *b);
PN_EXTERN bool pn_class_equals(const pn_class_t *clazz, void *a, void *b);
PN_EXTERN int pn_class_inspect(const pn_class_t *clazz, void *object, pn_string_t *dst);
+PN_EXTERN void *pn_void_new(const pn_class_t *clazz, size_t size);
+PN_EXTERN void pn_void_incref(void *object);
+PN_EXTERN void pn_void_decref(void *object);
+PN_EXTERN int pn_void_refcount(void *object);
PN_EXTERN uintptr_t pn_void_hashcode(void *object);
PN_EXTERN intptr_t pn_void_compare(void *a, void *b);
PN_EXTERN int pn_void_inspect(void *object, pn_string_t *dst);
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/ca454180/proton-c/include/proton/proactor.h
----------------------------------------------------------------------
diff --git a/proton-c/include/proton/proactor.h b/proton-c/include/proton/proactor.h
new file mode 100644
index 0000000..49d7b6a
--- /dev/null
+++ b/proton-c/include/proton/proactor.h
@@ -0,0 +1,174 @@
+#ifndef PROTON_PROACTOR_H
+#define PROTON_PROACTOR_H
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include <proton/types.h>
+#include <proton/import_export.h>
+#include <proton/listener.h>
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+typedef struct pn_condition_t pn_condition_t;
+
+/**
+ * @file
+ *
+ * **Experimental**: Proactor API for the the proton @ref engine.
+ *
+ * @defgroup proactor Proactor
+ *
+ * **Experimental**: Proactor API for portable, multi-threaded, asynchronous applications.
+ *
+ * The proactor establishes and listens for connections. It creates the @ref
+ * "transport" transport that sends and receives data over the network and
+ * delivers @ref "events" event to application threads for processing.
+ *
+ * ## Multi-threading
+ *
+ * The @ref proactor is thread-safe, but the @ref "protocol engine" is not. The
+ * proactor ensures that each @ref "connection" connection and its associated
+ * values (@ref session, @ref link etc.) is processed sequentially, even if there
+ * are multiple application threads. See pn_proactor_wait()
+ *
+ * @{
+ */
+
+/**
+ * The proactor creates and manage @ref "transports" transport and delivers @ref
+ * "event" events to the application.
+ *
+ */
+typedef struct pn_proactor_t pn_proactor_t;
+
+/**
+ * Create a proactor. Must be freed with pn_proactor_free()
+ */
+pn_proactor_t *pn_proactor(void);
+
+/**
+ * Free the proactor.
+ */
+void pn_proactor_free(pn_proactor_t*);
+
+/* FIXME aconway 2016-11-12: connect and listen need options to enable
+ things like websockets, alternate encryption or other features.
+ The "extra" parameter will be replaced by an "options" parameter
+ that will include providing extra data and other manipulators
+ to affect how the connection is processed.
+*/
+
+/**
+ * Asynchronous connect: a connection and transport will be created, the
+ * relevant events will be returned by pn_proactor_wait()
+ *
+ * Errors are indicated by PN_TRANSPORT_ERROR/PN_TRANSPORT_CLOSE events.
+ *
+ * @param extra bytes to copy to pn_connection_get_extra() on the new connection, @ref
+ * pn_rwbytes_null for nothing.
+ *
+ * @return error if the connect cannot be initiated e.g. an allocation failure.
+ * IO errors will be returned as transport events via pn_proactor_wait()
+ */
+int pn_proactor_connect(pn_proactor_t*, const char *host, const char *port, pn_bytes_t extra);
+
+/**
+ * Asynchronous listen: start listening, connections will be returned by pn_proactor_wait()
+ * An error are indicated by PN_LISTENER_ERROR event.
+ *
+ * @param extra bytes to copy to pn_connection_get_extra() on the new connection, @ref
+ * pn_rwbytes_null for nothing.
+ *
+ * @return error if the connect cannot be initiated e.g. an allocation failure.
+ * IO errors will be returned as transport events via pn_proactor_wait()
+ */
+pn_listener_t *pn_proactor_listen(pn_proactor_t *, const char *host, const char *port, int backlog, pn_bytes_t extra);
+
+/**
+ * Wait for an event. Can be called in multiple threads concurrently.
+ * You must call pn_event_done() when the event has been handled.
+ *
+ * The proactor ensures that events that cannot be handled concurrently
+ * (e.g. events for for the same connection) are never returned concurrently.
+ */
+pn_event_t *pn_proactor_wait(pn_proactor_t* d);
+
+/**
+ * Cause PN_PROACTOR_INTERRUPT to be returned to exactly one thread calling wait()
+ * for each call to pn_proactor_interrupt(). Thread safe.
+ */
+void pn_proactor_interrupt(pn_proactor_t* d);
+
+/**
+ * Cause PN_PROACTOR_TIMEOUT to be returned to a thread calling wait() after
+ * timeout milliseconds. Thread safe.
+ *
+ * Note calling pn_proactor_set_timeout() again before the PN_PROACTOR_TIMEOUT is
+ * delivered will cancel the previous timeout and deliver an event only after
+ * the new timeout.
+ */
+void pn_proactor_set_timeout(pn_proactor_t* d, pn_millis_t timeout);
+
+/**
+ * Cause a PN_CONNECTION_WAKE event to be returned by the proactor, even if
+ * there are no IO events pending for the connection.
+ *
+ * Thread safe: this is the only pn_connection_ function that can be
+ * called concurrently.
+ *
+ * Wakes can be "coalesced" - if several pn_connection_wake() calls happen
+ * concurrently, there may be only one PN_CONNECTION_WAKE event.
+ */
+void pn_connection_wake(pn_connection_t *c);
+
+/**
+ * The proactor that created the connection.
+ */
+pn_proactor_t *pn_connection_proactor(pn_connection_t *c);
+
+/**
+ * Call when a proactor event has been handled. Does nothing if not a proactor event.
+ *
+ * Thread safe: May be called from any thread but must be called exactly once
+ * for each event returned by pn_proactor_wait()
+ */
+void pn_event_done(pn_event_t *);
+
+/**
+ * Get the proactor that created the event or NULL.
+ */
+pn_proactor_t *pn_event_proactor(pn_event_t *);
+
+/**
+ * Get the listener for the event or NULL.
+ */
+pn_listener_t *pn_event_listener(pn_event_t *);
+
+/**
+ * @}
+ */
+
+#ifdef __cplusplus
+}
+#endif
+
+#endif // PROTON_PROACTOR_H
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/ca454180/proton-c/include/proton/types.h
----------------------------------------------------------------------
diff --git a/proton-c/include/proton/types.h b/proton-c/include/proton/types.h
index 176af47..378719c 100644
--- a/proton-c/include/proton/types.h
+++ b/proton-c/include/proton/types.h
@@ -67,6 +67,7 @@ typedef struct pn_bytes_t {
} pn_bytes_t;
PN_EXTERN pn_bytes_t pn_bytes(size_t size, const char *start);
+static const pn_bytes_t pn_bytes_null = { 0, NULL };
/** A non-const byte buffer. */
typedef struct pn_rwbytes_t {
@@ -75,6 +76,7 @@ typedef struct pn_rwbytes_t {
} pn_rwbytes_t;
PN_EXTERN pn_rwbytes_t pn_rwbytes(size_t size, char *start);
+static const pn_bytes_t pn_rwbytes_null = { 0, NULL };
/** @}
*/
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/ca454180/proton-c/src/core/connection_driver.c
----------------------------------------------------------------------
diff --git a/proton-c/src/core/connection_driver.c b/proton-c/src/core/connection_driver.c
new file mode 100644
index 0000000..f31ddb0
--- /dev/null
+++ b/proton-c/src/core/connection_driver.c
@@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "engine-internal.h"
+#include <proton/condition.h>
+#include <proton/connection.h>
+#include <proton/connection_engine.h>
+#include <proton/transport.h>
+#include <string.h>
+
+int pn_connection_engine_init(pn_connection_engine_t* ce, pn_connection_t *c, pn_transport_t *t) {
+ ce->connection = c ? c : pn_connection();
+ ce->transport = t ? t : pn_transport();
+ ce->collector = pn_collector();
+ if (!ce->connection || !ce->transport || !ce->collector) {
+ pn_connection_engine_destroy(ce);
+ return PN_OUT_OF_MEMORY;
+ }
+ pn_connection_collect(ce->connection, ce->collector);
+ return 0;
+}
+
+int pn_connection_engine_bind(pn_connection_engine_t *ce) {
+ return pn_transport_bind(ce->transport, ce->connection);
+}
+
+void pn_connection_engine_destroy(pn_connection_engine_t *ce) {
+ if (ce->transport) {
+ pn_transport_unbind(ce->transport);
+ pn_transport_free(ce->transport);
+ }
+ if (ce->collector) pn_collector_free(ce->collector);
+ if (ce->connection) pn_connection_free(ce->connection);
+ memset(ce, 0, sizeof(*ce));
+}
+
+pn_rwbytes_t pn_connection_engine_read_buffer(pn_connection_engine_t *ce) {
+ ssize_t cap = pn_transport_capacity(ce->transport);
+ return (cap > 0) ? pn_rwbytes(cap, pn_transport_tail(ce->transport)) : pn_rwbytes(0, 0);
+}
+
+void pn_connection_engine_read_done(pn_connection_engine_t *ce, size_t n) {
+ if (n > 0) pn_transport_process(ce->transport, n);
+}
+
+bool pn_connection_engine_read_closed(pn_connection_engine_t *ce) {
+ return pn_transport_capacity(ce->transport) < 0;
+}
+
+void pn_connection_engine_read_close(pn_connection_engine_t *ce) {
+ if (!pn_connection_engine_read_closed(ce)) {
+ pn_transport_close_tail(ce->transport);
+ }
+}
+
+pn_bytes_t pn_connection_engine_write_buffer(pn_connection_engine_t *ce) {
+ ssize_t pending = pn_transport_pending(ce->transport);
+ return (pending > 0) ?
+ pn_bytes(pending, pn_transport_head(ce->transport)) : pn_bytes_null;
+}
+
+void pn_connection_engine_write_done(pn_connection_engine_t *ce, size_t n) {
+ if (n > 0)
+ pn_transport_pop(ce->transport, n);
+}
+
+bool pn_connection_engine_write_closed(pn_connection_engine_t *ce) {
+ return pn_transport_pending(ce->transport) < 0;
+}
+
+void pn_connection_engine_write_close(pn_connection_engine_t *ce) {
+ if (!pn_connection_engine_write_closed(ce)) {
+ pn_transport_close_head(ce->transport);
+ }
+}
+
+void pn_connection_engine_close(pn_connection_engine_t *ce) {
+ pn_connection_engine_read_close(ce);
+ pn_connection_engine_write_close(ce);
+}
+
+pn_event_t* pn_connection_engine_event(pn_connection_engine_t *ce) {
+ pn_event_t *e = ce->collector ? pn_collector_peek(ce->collector) : NULL;
+ if (e) {
+ pn_transport_t *t = ce->transport;
+ if (t && t->trace & PN_TRACE_EVT) {
+ /* This can log the same event twice if pn_connection_engine_event is called
+ * twice but for debugging it is much better to log before handling than after.
+ */
+ pn_string_clear(t->scratch);
+ pn_inspect(e, t->scratch);
+ pn_transport_log(t, pn_string_get(t->scratch));
+ }
+ }
+ return e;
+}
+
+bool pn_connection_engine_has_event(pn_connection_engine_t *ce) {
+ return ce->collector && pn_collector_peek(ce->collector);
+}
+
+void pn_connection_engine_pop_event(pn_connection_engine_t *ce) {
+ if (ce->collector) {
+ pn_event_t *e = pn_collector_peek(ce->collector);
+ if (pn_event_type(e) == PN_TRANSPORT_CLOSED) { /* The last event ever */
+ /* Events can accumulate behind the TRANSPORT_CLOSED before the
+ * PN_TRANSPORT_CLOSED event is handled. They can never be processed
+ * so release them.
+ */
+ pn_collector_release(ce->collector);
+ } else {
+ pn_collector_pop(ce->collector);
+ }
+
+ }
+}
+
+bool pn_connection_engine_finished(pn_connection_engine_t *ce) {
+ return pn_transport_closed(ce->transport) && !pn_connection_engine_has_event(ce);
+}
+
+void pn_connection_engine_verrorf(pn_connection_engine_t *ce, const char *name, const char *fmt, va_list ap) {
+ pn_transport_t *t = ce->transport;
+ pn_condition_t *cond = pn_transport_condition(t);
+ pn_string_vformat(t->scratch, fmt, ap);
+ pn_condition_set_name(cond, name);
+ pn_condition_set_description(cond, pn_string_get(t->scratch));
+}
+
+void pn_connection_engine_errorf(pn_connection_engine_t *ce, const char *name, const char *fmt, ...) {
+ va_list ap;
+ va_start(ap, fmt);
+ pn_connection_engine_verrorf(ce, name, fmt, ap);
+ va_end(ap);
+}
+
+void pn_connection_engine_log(pn_connection_engine_t *ce, const char *msg) {
+ pn_transport_log(ce->transport, msg);
+}
+
+void pn_connection_engine_vlogf(pn_connection_engine_t *ce, const char *fmt, va_list ap) {
+ pn_transport_vlogf(ce->transport, fmt, ap);
+}
+
+void pn_connection_engine_vlog(pn_connection_engine_t *ce, const char *msg) {
+ pn_transport_log(ce->transport, msg);
+}
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/ca454180/proton-c/src/core/connection_engine.c
----------------------------------------------------------------------
diff --git a/proton-c/src/core/connection_engine.c b/proton-c/src/core/connection_engine.c
index 5d184a1..f31ddb0 100644
--- a/proton-c/src/core/connection_engine.c
+++ b/proton-c/src/core/connection_engine.c
@@ -16,109 +16,148 @@
* specific language governing permissions and limitations
* under the License.
*/
-#include "engine-internal.h"
+#include "engine-internal.h"
+#include <proton/condition.h>
#include <proton/connection.h>
#include <proton/connection_engine.h>
#include <proton/transport.h>
#include <string.h>
-int pn_connection_engine_init(pn_connection_engine_t* e) {
- memset(e, 0, sizeof(*e));
- e->connection = pn_connection();
- e->transport = pn_transport();
- e->collector = pn_collector();
- if (!e->connection || !e->transport || !e->collector) {
- pn_connection_engine_final(e);
- return PN_OUT_OF_MEMORY;
- }
- pn_connection_collect(e->connection, e->collector);
- return PN_OK;
+int pn_connection_engine_init(pn_connection_engine_t* ce, pn_connection_t *c, pn_transport_t *t) {
+ ce->connection = c ? c : pn_connection();
+ ce->transport = t ? t : pn_transport();
+ ce->collector = pn_collector();
+ if (!ce->connection || !ce->transport || !ce->collector) {
+ pn_connection_engine_destroy(ce);
+ return PN_OUT_OF_MEMORY;
+ }
+ pn_connection_collect(ce->connection, ce->collector);
+ return 0;
}
-void pn_connection_engine_final(pn_connection_engine_t* e) {
- if (e->transport && e->connection) {
- pn_transport_unbind(e->transport);
- pn_decref(e->transport);
- }
- if (e->collector)
- pn_collector_free(e->collector); /* Break cycle with connection */
- if (e->connection)
- pn_decref(e->connection);
- memset(e, 0, sizeof(*e));
+int pn_connection_engine_bind(pn_connection_engine_t *ce) {
+ return pn_transport_bind(ce->transport, ce->connection);
}
-pn_rwbytes_t pn_connection_engine_read_buffer(pn_connection_engine_t* e) {
- ssize_t cap = pn_transport_capacity(e->transport);
- if (cap > 0)
- return pn_rwbytes(cap, pn_transport_tail(e->transport));
- else
- return pn_rwbytes(0, 0);
+void pn_connection_engine_destroy(pn_connection_engine_t *ce) {
+ if (ce->transport) {
+ pn_transport_unbind(ce->transport);
+ pn_transport_free(ce->transport);
+ }
+ if (ce->collector) pn_collector_free(ce->collector);
+ if (ce->connection) pn_connection_free(ce->connection);
+ memset(ce, 0, sizeof(*ce));
}
-void pn_connection_engine_read_done(pn_connection_engine_t* e, size_t n) {
- if (n > 0)
- pn_transport_process(e->transport, n);
+pn_rwbytes_t pn_connection_engine_read_buffer(pn_connection_engine_t *ce) {
+ ssize_t cap = pn_transport_capacity(ce->transport);
+ return (cap > 0) ? pn_rwbytes(cap, pn_transport_tail(ce->transport)) : pn_rwbytes(0, 0);
}
-void pn_connection_engine_read_close(pn_connection_engine_t* e) {
- pn_transport_close_tail(e->transport);
+void pn_connection_engine_read_done(pn_connection_engine_t *ce, size_t n) {
+ if (n > 0) pn_transport_process(ce->transport, n);
}
-pn_bytes_t pn_connection_engine_write_buffer(pn_connection_engine_t* e) {
- ssize_t pending = pn_transport_pending(e->transport);
- if (pending > 0)
- return pn_bytes(pending, pn_transport_head(e->transport));
- else
- return pn_bytes(0, 0);
+bool pn_connection_engine_read_closed(pn_connection_engine_t *ce) {
+ return pn_transport_capacity(ce->transport) < 0;
}
-void pn_connection_engine_write_done(pn_connection_engine_t* e, size_t n) {
- if (n > 0)
- pn_transport_pop(e->transport, n);
+void pn_connection_engine_read_close(pn_connection_engine_t *ce) {
+ if (!pn_connection_engine_read_closed(ce)) {
+ pn_transport_close_tail(ce->transport);
+ }
}
-void pn_connection_engine_write_close(pn_connection_engine_t* e){
- pn_transport_close_head(e->transport);
+pn_bytes_t pn_connection_engine_write_buffer(pn_connection_engine_t *ce) {
+ ssize_t pending = pn_transport_pending(ce->transport);
+ return (pending > 0) ?
+ pn_bytes(pending, pn_transport_head(ce->transport)) : pn_bytes_null;
}
-void pn_connection_engine_disconnected(pn_connection_engine_t* e) {
- pn_connection_engine_read_close(e);
- pn_connection_engine_write_close(e);
+void pn_connection_engine_write_done(pn_connection_engine_t *ce, size_t n) {
+ if (n > 0)
+ pn_transport_pop(ce->transport, n);
}
-static void log_event(pn_connection_engine_t *engine, pn_event_t* event) {
- if (event && engine->transport->trace & PN_TRACE_EVT) {
- pn_string_t *str = pn_string(NULL);
- pn_inspect(event, str);
- pn_transport_log(engine->transport, pn_string_get(str));
- pn_free(str);
+bool pn_connection_engine_write_closed(pn_connection_engine_t *ce) {
+ return pn_transport_pending(ce->transport) < 0;
+}
+
+void pn_connection_engine_write_close(pn_connection_engine_t *ce) {
+ if (!pn_connection_engine_write_closed(ce)) {
+ pn_transport_close_head(ce->transport);
+ }
+}
+
+void pn_connection_engine_close(pn_connection_engine_t *ce) {
+ pn_connection_engine_read_close(ce);
+ pn_connection_engine_write_close(ce);
+}
+
+pn_event_t* pn_connection_engine_event(pn_connection_engine_t *ce) {
+ pn_event_t *e = ce->collector ? pn_collector_peek(ce->collector) : NULL;
+ if (e) {
+ pn_transport_t *t = ce->transport;
+ if (t && t->trace & PN_TRACE_EVT) {
+ /* This can log the same event twice if pn_connection_engine_event is called
+ * twice but for debugging it is much better to log before handling than after.
+ */
+ pn_string_clear(t->scratch);
+ pn_inspect(e, t->scratch);
+ pn_transport_log(t, pn_string_get(t->scratch));
}
+ }
+ return e;
+}
+
+bool pn_connection_engine_has_event(pn_connection_engine_t *ce) {
+ return ce->collector && pn_collector_peek(ce->collector);
}
-pn_event_t* pn_connection_engine_dispatch(pn_connection_engine_t* e) {
- if (e->event) { /* Already returned */
- if (pn_event_type(e->event) == PN_CONNECTION_INIT)
- pn_transport_bind(e->transport, e->connection);
- pn_collector_pop(e->collector);
+void pn_connection_engine_pop_event(pn_connection_engine_t *ce) {
+ if (ce->collector) {
+ pn_event_t *e = pn_collector_peek(ce->collector);
+ if (pn_event_type(e) == PN_TRANSPORT_CLOSED) { /* The last event ever */
+ /* Events can accumulate behind the TRANSPORT_CLOSED before the
+ * PN_TRANSPORT_CLOSED event is handled. They can never be processed
+ * so release them.
+ */
+ pn_collector_release(ce->collector);
+ } else {
+ pn_collector_pop(ce->collector);
}
- e->event = pn_collector_peek(e->collector);
- log_event(e, e->event);
- return e->event;
+
+ }
+}
+
+bool pn_connection_engine_finished(pn_connection_engine_t *ce) {
+ return pn_transport_closed(ce->transport) && !pn_connection_engine_has_event(ce);
+}
+
+void pn_connection_engine_verrorf(pn_connection_engine_t *ce, const char *name, const char *fmt, va_list ap) {
+ pn_transport_t *t = ce->transport;
+ pn_condition_t *cond = pn_transport_condition(t);
+ pn_string_vformat(t->scratch, fmt, ap);
+ pn_condition_set_name(cond, name);
+ pn_condition_set_description(cond, pn_string_get(t->scratch));
}
-bool pn_connection_engine_finished(pn_connection_engine_t* e) {
- return pn_transport_closed(e->transport) && (pn_collector_peek(e->collector) == NULL);
+void pn_connection_engine_errorf(pn_connection_engine_t *ce, const char *name, const char *fmt, ...) {
+ va_list ap;
+ va_start(ap, fmt);
+ pn_connection_engine_verrorf(ce, name, fmt, ap);
+ va_end(ap);
}
-pn_connection_t* pn_connection_engine_connection(pn_connection_engine_t* e) {
- return e->connection;
+void pn_connection_engine_log(pn_connection_engine_t *ce, const char *msg) {
+ pn_transport_log(ce->transport, msg);
}
-pn_transport_t* pn_connection_engine_transport(pn_connection_engine_t* e) {
- return e->transport;
+void pn_connection_engine_vlogf(pn_connection_engine_t *ce, const char *fmt, va_list ap) {
+ pn_transport_vlogf(ce->transport, fmt, ap);
}
-pn_condition_t* pn_connection_engine_condition(pn_connection_engine_t* e) {
- return pn_transport_condition(e->transport);
+void pn_connection_engine_vlog(pn_connection_engine_t *ce, const char *msg) {
+ pn_transport_log(ce->transport, msg);
}
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/ca454180/proton-c/src/core/engine.c
----------------------------------------------------------------------
diff --git a/proton-c/src/core/engine.c b/proton-c/src/core/engine.c
index e238d5c..2836a43 100644
--- a/proton-c/src/core/engine.c
+++ b/proton-c/src/core/engine.c
@@ -32,6 +32,8 @@
#include "platform/platform_fmt.h"
#include "transport.h"
+#include <proton/extra.h>
+
static void pni_session_bound(pn_session_t *ssn);
static void pni_link_bound(pn_link_t *link);
@@ -208,6 +210,12 @@ void pn_condition_init(pn_condition_t *condition)
condition->info = pn_data(0);
}
+pn_condition_t *pn_condition() {
+ pn_condition_t *c = (pn_condition_t*)malloc(sizeof(pn_condition_t));
+ pn_condition_init(c);
+ return c;
+}
+
void pn_condition_tini(pn_condition_t *condition)
{
pn_data_free(condition->info);
@@ -215,6 +223,14 @@ void pn_condition_tini(pn_condition_t *condition)
pn_free(condition->name);
}
+void pn_condition_free(pn_condition_t *c) {
+ if (c) {
+ pn_condition_clear(c);
+ pn_condition_tini(c);
+ free(c);
+ }
+}
+
static void pni_add_session(pn_connection_t *conn, pn_session_t *ssn)
{
pn_list_add(conn->sessions, ssn);
@@ -495,10 +511,15 @@ static void pn_connection_finalize(void *object)
#define pn_connection_compare NULL
#define pn_connection_inspect NULL
-pn_connection_t *pn_connection(void)
+PN_EXTRA_DECLARE(pn_connection_t);
+
+pn_rwbytes_t pn_connection_get_extra(pn_connection_t *c) { return PN_EXTRA_GET(pn_connection_t, c); }
+
+pn_connection_t *pn_connection_with_extra(size_t extra)
{
static const pn_class_t clazz = PN_CLASS(pn_connection);
- pn_connection_t *conn = (pn_connection_t *) pn_class_new(&clazz, sizeof(pn_connection_t));
+ size_t size = PN_EXTRA_SIZEOF(pn_connection_t, extra);
+ pn_connection_t *conn = (pn_connection_t *) pn_class_new(&clazz, size);
if (!conn) return NULL;
conn->endpoint_head = NULL;
@@ -527,6 +548,10 @@ pn_connection_t *pn_connection(void)
return conn;
}
+pn_connection_t *pn_connection(void) {
+ return pn_connection_with_extra(0);
+}
+
static const pn_event_type_t endpoint_init_event_map[] = {
PN_CONNECTION_INIT, /* CONNECTION */
PN_SESSION_INIT, /* SESSION */
@@ -545,6 +570,10 @@ void pn_connection_collect(pn_connection_t *connection, pn_collector_t *collecto
}
}
+pn_collector_t* pn_connection_collector(pn_connection_t *connection) {
+ return connection->collector;
+}
+
pn_state_t pn_connection_state(pn_connection_t *connection)
{
return connection ? connection->endpoint.state : 0;
@@ -2229,3 +2258,15 @@ pn_transport_t *pn_event_transport(pn_event_t *event)
}
}
}
+
+int pn_condition_copy(pn_condition_t *dest, pn_condition_t *src) {
+ assert(dest);
+ assert(src);
+ int err = 0;
+ if (src != dest) {
+ int err = pn_string_copy(dest->name, src->name);
+ if (!err) err = pn_string_copy(dest->description, src->description);
+ if (!err) err = pn_data_copy(dest->info, src->info);
+ }
+ return err;
+}
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/ca454180/proton-c/src/core/event.c
----------------------------------------------------------------------
diff --git a/proton-c/src/core/event.c b/proton-c/src/core/event.c
index c13f287..7882327 100644
--- a/proton-c/src/core/event.c
+++ b/proton-c/src/core/event.c
@@ -371,7 +371,18 @@ const char *pn_event_type_name(pn_event_type_t type)
return "PN_SELECTABLE_EXPIRED";
case PN_SELECTABLE_FINAL:
return "PN_SELECTABLE_FINAL";
+ case PN_CONNECTION_WAKE:
+ return "PN_CONNECTION_WAKE";
+ case PN_LISTENER_CLOSE:
+ return "PN_LISTENER_CLOSE";
+ case PN_PROACTOR_INTERRUPT:
+ return "PN_PROACTOR_INTERRUPT";
+ case PN_PROACTOR_TIMEOUT:
+ return "PN_PROACTOR_TIMEOUT";
+ case PN_PROACTOR_INACTIVE:
+ return "PN_PROACTOR_INACTIVE";
+ default:
+ return "PN_UNKNOWN";
}
-
return NULL;
}
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/ca454180/proton-c/src/core/object/object.c
----------------------------------------------------------------------
diff --git a/proton-c/src/core/object/object.c b/proton-c/src/core/object/object.c
index b0c1b33..a6952b6 100644
--- a/proton-c/src/core/object/object.c
+++ b/proton-c/src/core/object/object.c
@@ -32,10 +32,10 @@ intptr_t pn_object_compare(void *a, void *b) { return (intptr_t) a - (intptr_t)
const pn_class_t PN_OBJECT[] = {PN_CLASS(pn_object)};
#define pn_void_initialize NULL
-static void *pn_void_new(const pn_class_t *clazz, size_t size) { return malloc(size); }
-static void pn_void_incref(void *object) {}
-static void pn_void_decref(void *object) {}
-static int pn_void_refcount(void *object) { return -1; }
+void *pn_void_new(const pn_class_t *clazz, size_t size) { return malloc(size); }
+void pn_void_incref(void* p) {}
+void pn_void_decref(void* p) {}
+int pn_void_refcount(void *object) { return -1; }
#define pn_void_finalize NULL
static void pn_void_free(void *object) { free(object); }
static const pn_class_t *pn_void_reify(void *object) { return PN_VOID; }
@@ -199,7 +199,7 @@ typedef struct {
void *pn_object_new(const pn_class_t *clazz, size_t size)
{
void *object = NULL;
- pni_head_t *head = (pni_head_t *) malloc(sizeof(pni_head_t) + size);
+ pni_head_t *head = (pni_head_t *) calloc(1, sizeof(pni_head_t) + size);
if (head != NULL) {
object = head + 1;
head->clazz = clazz;
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org