You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ac...@apache.org on 2016/01/07 23:29:14 UTC
[04/15] qpid-proton git commit: PROTON-1089: C++ binding link options
part 1 without filters
PROTON-1089: C++ binding link options part 1 without filters
Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/b1b85f6c
Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/b1b85f6c
Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/b1b85f6c
Branch: refs/heads/go1
Commit: b1b85f6c12e70ae70cb2d3df59cdd83d8833bcd5
Parents: 181bb4b
Author: Clifford Jansen <cl...@apache.org>
Authored: Tue Jan 5 14:41:25 2016 -0800
Committer: Clifford Jansen <cl...@apache.org>
Committed: Tue Jan 5 14:48:41 2016 -0800
----------------------------------------------------------------------
examples/cpp/CMakeLists.txt | 1 +
examples/cpp/client.cpp | 2 +-
examples/cpp/queue_browser.cpp | 59 +++++++++
proton-c/bindings/cpp/CMakeLists.txt | 1 +
.../bindings/cpp/include/proton/connection.hpp | 8 +-
.../bindings/cpp/include/proton/container.hpp | 21 +++-
proton-c/bindings/cpp/include/proton/link.hpp | 33 ++++-
.../cpp/include/proton/link_options.hpp | 107 ++++++++++++++++
proton-c/bindings/cpp/include/proton/sender.hpp | 9 ++
.../bindings/cpp/include/proton/session.hpp | 8 +-
.../bindings/cpp/include/proton/terminus.hpp | 17 ++-
proton-c/bindings/cpp/include/proton/types.hpp | 2 +-
proton-c/bindings/cpp/src/blocking_receiver.cpp | 4 +-
proton-c/bindings/cpp/src/connection.cpp | 8 +-
proton-c/bindings/cpp/src/container.cpp | 9 +-
proton-c/bindings/cpp/src/container_impl.cpp | 28 +++--
proton-c/bindings/cpp/src/container_impl.hpp | 9 +-
proton-c/bindings/cpp/src/link.cpp | 32 ++++-
proton-c/bindings/cpp/src/link_options.cpp | 123 +++++++++++++++++++
proton-c/bindings/cpp/src/sender.cpp | 4 +
proton-c/bindings/cpp/src/session.cpp | 11 +-
proton-c/bindings/cpp/src/terminus.cpp | 12 +-
22 files changed, 453 insertions(+), 55 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/b1b85f6c/examples/cpp/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/examples/cpp/CMakeLists.txt b/examples/cpp/CMakeLists.txt
index 903294c..4a66925 100644
--- a/examples/cpp/CMakeLists.txt
+++ b/examples/cpp/CMakeLists.txt
@@ -36,6 +36,7 @@ set(examples
server_direct
recurring_timer
connection_options
+ queue_browser
ssl
ssl_client_cert
encode_decode)
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/b1b85f6c/examples/cpp/client.cpp
----------------------------------------------------------------------
diff --git a/examples/cpp/client.cpp b/examples/cpp/client.cpp
index 3f5c4ab..f8186e0 100644
--- a/examples/cpp/client.cpp
+++ b/examples/cpp/client.cpp
@@ -40,7 +40,7 @@ class client : public proton::messaging_handler {
void on_start(proton::event &e) {
sender = e.container().open_sender(url);
// Create a receiver with a dynamically chosen unique address.
- receiver = sender.connection().open_receiver("", true/*dynamic*/);
+ receiver = sender.connection().open_receiver("", proton::link_options().dynamic_address(true));
}
void send_request() {
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/b1b85f6c/examples/cpp/queue_browser.cpp
----------------------------------------------------------------------
diff --git a/examples/cpp/queue_browser.cpp b/examples/cpp/queue_browser.cpp
new file mode 100644
index 0000000..1206c71
--- /dev/null
+++ b/examples/cpp/queue_browser.cpp
@@ -0,0 +1,59 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "proton/container.hpp"
+#include "proton/messaging_handler.hpp"
+#include "proton/url.hpp"
+#include "proton/link_options.hpp"
+
+#include <iostream>
+
+class browser : public proton::messaging_handler {
+ private:
+ proton::url url;
+
+ public:
+
+ browser(const proton::url& u) : url(u) {}
+
+ void on_start(proton::event &e) {
+ proton::connection conn = e.container().connect(url);
+ conn.open_receiver(url.path(), proton::link_options().browsing(true));
+ }
+
+ void on_message(proton::event &e) {
+ std::cout << e.message().body() << std::endl;
+ if (e.receiver().queued() == 0 && e.receiver().drained() > 0)
+ e.connection().close();
+ }
+};
+
+int main(int argc, char **argv) {
+ try {
+ std::string url = argc > 1 ? argv[1] : "127.0.0.1:5672/examples";
+ browser b(url);
+ proton::container(b).run();
+ return 0;
+ } catch (const std::exception& e) {
+ std::cerr << e.what() << std::endl;
+ }
+ return 1;
+}
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/b1b85f6c/proton-c/bindings/cpp/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/CMakeLists.txt b/proton-c/bindings/cpp/CMakeLists.txt
index 732a8c7..7a345c0 100644
--- a/proton-c/bindings/cpp/CMakeLists.txt
+++ b/proton-c/bindings/cpp/CMakeLists.txt
@@ -50,6 +50,7 @@ set(qpid-proton-cpp-source
src/event.cpp
src/handler.cpp
src/link.cpp
+ src/link_options.cpp
src/message.cpp
src/messaging_adapter.cpp
src/messaging_event.cpp
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/b1b85f6c/proton-c/bindings/cpp/include/proton/connection.hpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/include/proton/connection.hpp b/proton-c/bindings/cpp/include/proton/connection.hpp
index 7a9cf74..67091e3 100644
--- a/proton-c/bindings/cpp/include/proton/connection.hpp
+++ b/proton-c/bindings/cpp/include/proton/connection.hpp
@@ -88,11 +88,11 @@ class connection : public object<pn_connection_t>, endpoint
/** Default session is created on first call and re-used for the lifetime of the connection */
PN_CPP_EXTERN session default_session();
- /** Create a sender on default_session() with target=addr and optional handler h */
- PN_CPP_EXTERN sender open_sender(const std::string &addr, handler *h=0);
+ /** Create a sender on default_session() with target=addr and link options=opts */
+ PN_CPP_EXTERN sender open_sender(const std::string &addr, const link_options &opts = link_options());
- /** Create a receiver on default_session() with target=addr and optional handler h */
- PN_CPP_EXTERN receiver open_receiver(const std::string &addr, bool dynamic=false, handler *h=0);
+ /** Create a receiver on default_session() with target=addr and optional link options opts */
+ PN_CPP_EXTERN receiver open_receiver(const std::string &addr, const link_options &opts = link_options());
/** Return links on this connection matching the state mask. */
PN_CPP_EXTERN link_range find_links(endpoint::state mask) const;
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/b1b85f6c/proton-c/bindings/cpp/include/proton/container.hpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/include/proton/container.hpp b/proton-c/bindings/cpp/include/proton/container.hpp
index fbdafb9..44cc476 100644
--- a/proton-c/bindings/cpp/include/proton/container.hpp
+++ b/proton-c/bindings/cpp/include/proton/container.hpp
@@ -28,6 +28,7 @@
#include "proton/reactor.hpp"
#include "proton/url.hpp"
#include "proton/connection_options.hpp"
+#include "proton/link_options.hpp"
#include <string>
@@ -68,11 +69,17 @@ class container : public event_loop {
/** Run the event loop, return when all connections and acceptors are closed. */
PN_CPP_EXTERN void run();
- /** Open a connection to url and create a sender with target=url.path() */
- PN_CPP_EXTERN sender open_sender(const proton::url &);
+ /** Open a new connection to url and create a sender with target=url.path().
+ Any supplied link or connection options will override the container's
+ template options. */
+ PN_CPP_EXTERN sender open_sender(const proton::url &, const proton::link_options &l = proton::link_options(),
+ const connection_options &c = connection_options());
- /** Create a receiver on connection with source=url.path() */
- PN_CPP_EXTERN receiver open_receiver(const url &);
+ /** Create a receiver on a new connection with source=url.path(). Any
+ supplied link or connection options will override the container's
+ template options. */
+ PN_CPP_EXTERN receiver open_receiver(const url &, const proton::link_options &l = proton::link_options(),
+ const connection_options &c = connection_options());
/// Identifier for the container
PN_CPP_EXTERN std::string id() const;
@@ -94,6 +101,12 @@ class container : public event_loop {
first open event on the connection. */
PN_CPP_EXTERN void server_connection_options(const connection_options &);
+ /** Copy the link options to a template applied to new links created and
+ opened by this container. They are applied before the open event on the
+ link and may be overriden by link options in other methods. */
+ PN_CPP_EXTERN void link_options(const link_options &);
+
+
private:
pn_unique_ptr<container_impl> impl_;
};
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/b1b85f6c/proton-c/bindings/cpp/include/proton/link.hpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/include/proton/link.hpp b/proton-c/bindings/cpp/include/proton/link.hpp
index b775341..aafaf77 100644
--- a/proton-c/bindings/cpp/include/proton/link.hpp
+++ b/proton-c/bindings/cpp/include/proton/link.hpp
@@ -27,6 +27,7 @@
#include "proton/terminus.hpp"
#include "proton/types.h"
#include "proton/object.hpp"
+#include "proton/link_options.hpp"
#include <string>
@@ -39,12 +40,25 @@ class receiver;
class link : public object<pn_link_t> , public endpoint
{
public:
+ /// Sender settlement behaviour for a link
+ enum sender_settle_mode_t {
+ UNSETTLED = PN_SND_UNSETTLED,
+ SETTLED = PN_SND_SETTLED,
+ MIXED = PN_SND_MIXED
+ };
+
+ /// Receiver settlement behaviour for a link
+ enum receiver_settle_mode_t {
+ SETTLE_ALWAYS = PN_RCV_FIRST,
+ SETTLE_SECOND= PN_RCV_SECOND
+ };
+
link(pn_link_t* l=0) : object<pn_link_t>(l) {}
/** Locally open the link, not complete till messaging_handler::on_link_opened or
* proton_handler::link_remote_open
*/
- PN_CPP_EXTERN void open();
+ PN_CPP_EXTERN void open(const link_options &opts = link_options());
/** Locally close the link, not complete till messaging_handler::on_link_closed or
* proton_handler::link_remote_close
@@ -62,8 +76,14 @@ class link : public object<pn_link_t> , public endpoint
/** Credit available on the link */
PN_CPP_EXTERN int credit() const;
- /** Grant credit to the link */
- PN_CPP_EXTERN void flow(int credit);
+ /** The number of queued deliveries for the link */
+ PN_CPP_EXTERN int queued();
+
+ /** The number of unsettled deliveries on the link */
+ PN_CPP_EXTERN int unsettled();
+
+ /** The count of credit returned. */
+ PN_CPP_EXTERN int drained();
/** Local source of the link. */
PN_CPP_EXTERN terminus source() const;
@@ -100,6 +120,13 @@ class link : public object<pn_link_t> , public endpoint
/** Navigate the links in a connection - get next link with state */
PN_CPP_EXTERN link next(endpoint::state) const;
+
+ PN_CPP_EXTERN sender_settle_mode_t sender_settle_mode();
+ PN_CPP_EXTERN void sender_settle_mode(sender_settle_mode_t);
+ PN_CPP_EXTERN receiver_settle_mode_t receiver_settle_mode();
+ PN_CPP_EXTERN void receiver_settle_mode(receiver_settle_mode_t);
+ PN_CPP_EXTERN sender_settle_mode_t remote_sender_settle_mode();
+ PN_CPP_EXTERN receiver_settle_mode_t remote_receiver_settle_mode();
};
/// An iterator for links.
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/b1b85f6c/proton-c/bindings/cpp/include/proton/link_options.hpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/include/proton/link_options.hpp b/proton-c/bindings/cpp/include/proton/link_options.hpp
new file mode 100644
index 0000000..98790db
--- /dev/null
+++ b/proton-c/bindings/cpp/include/proton/link_options.hpp
@@ -0,0 +1,107 @@
+#ifndef PROTON_CPP_LINK_OPTIONS_H
+#define PROTON_CPP_LINK_OPTIONS_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "proton/config.hpp"
+#include "proton/export.hpp"
+#include "proton/pn_unique_ptr.hpp"
+#include "proton/types.hpp"
+#include "proton/terminus.hpp"
+
+#include <vector>
+#include <string>
+
+namespace proton {
+
+/** The message delivery policy to establish when opening the link. */
+enum link_delivery_mode_t {
+ // No set policy. The application must settle messages itself according to its own policy.
+ NONE = 0,
+ // Outgoing messages are settled immediately by the link. There are no duplicates.
+ AT_MOST_ONCE,
+ // The receiver settles the delivery first with an accept/reject/release disposition.
+ // The sender waits to settle until after the disposition notification is received.
+ AT_LEAST_ONCE
+};
+
+class handler;
+class link;
+
+/** Options for creating a link.
+ *
+ * Options can be "chained" like this:
+ *
+ * l = container.create_sender(url, link_options().handler(h).browsing(true));
+ *
+ * You can also create an options object with common settings and use it as a base
+ * for different connections that have mostly the same settings:
+ *
+ * link_options opts;
+ * opts.browsing(true);
+ * l1 = container.open_sender(url1, opts.handler(h1));
+ * c2 = container.open_receiver(url2, opts.handler(h2));
+ *
+ * Normal value semantics, copy or assign creates a separate copy of the options.
+ */
+class link_options {
+ public:
+ PN_CPP_EXTERN link_options();
+ PN_CPP_EXTERN link_options(const link_options&);
+ PN_CPP_EXTERN ~link_options();
+ PN_CPP_EXTERN link_options& operator=(const link_options&);
+
+ /// Override with options from other.
+ PN_CPP_EXTERN void override(const link_options& other);
+
+ /** Set a handler for events scoped to the link. If NULL, link-scoped events on the link are discarded. */
+ PN_CPP_EXTERN link_options& handler(class handler *);
+ /** Receiver-only option to specify whether messages are browsed or
+ consumed. Setting browsing to true is Equivalent to setting
+ distribution_mode(COPY). Setting browsing to false is equivalent to
+ setting distribution_mode(MOVE). */
+ PN_CPP_EXTERN link_options& browsing(bool);
+ /** Set the distribution mode for message transfer. See terminus::distribution_mode_t. */
+ PN_CPP_EXTERN link_options& distribution_mode(terminus::distribution_mode_t);
+ /* Receiver-only option to create a durable subsription on the receiver.
+ Equivalent to setting the terminus durability to termins::DELIVERIES and
+ the expiry policy to terminus::EXPIRE_NEVER. */
+ PN_CPP_EXTERN link_options& durable_subscription(bool);
+ /* Set the delivery mode on the link. */
+ PN_CPP_EXTERN link_options& delivery_mode(link_delivery_mode_t);
+ /* Receiver-only option to request a dynamically generated node at the peer. */
+ PN_CPP_EXTERN link_options& dynamic_address(bool);
+ /* Set the local address for the link. */
+ PN_CPP_EXTERN link_options& local_address(const std::string &addr);
+ // TODO: selector/filter, dynamic node properties
+
+ private:
+ friend class link;
+ void apply(link&) const;
+ class handler* handler() const;
+
+ class impl;
+ pn_unique_ptr<impl> impl_;
+};
+
+} // namespace
+
+#endif /*!PROTON_CPP_LINK_OPTIONS_H*/
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/b1b85f6c/proton-c/bindings/cpp/include/proton/sender.hpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/include/proton/sender.hpp b/proton-c/bindings/cpp/include/proton/sender.hpp
index af8b9d5..a5835fd 100644
--- a/proton-c/bindings/cpp/include/proton/sender.hpp
+++ b/proton-c/bindings/cpp/include/proton/sender.hpp
@@ -41,6 +41,15 @@ class sender : public link
/// Send a message on the link.
PN_CPP_EXTERN delivery send(const message &m);
+
+ /** The number of deliveries that might be able to be sent if sufficient credit were
+ issued on the link. See sender::offered(). Maintained by the application. */
+ PN_CPP_EXTERN int available();
+
+ /** Set the availability of deliveries for a sender. */
+ PN_CPP_EXTERN void offered(int c);
+
+
};
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/b1b85f6c/proton-c/bindings/cpp/include/proton/session.hpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/include/proton/session.hpp b/proton-c/bindings/cpp/include/proton/session.hpp
index 7873775..0f7c946 100644
--- a/proton-c/bindings/cpp/include/proton/session.hpp
+++ b/proton-c/bindings/cpp/include/proton/session.hpp
@@ -71,11 +71,11 @@ class session : public object<pn_session_t>, public endpoint
*/
PN_CPP_EXTERN sender create_sender(const std::string& name=std::string());
- /** Create and open a sender with target=addr and optional handler h */
- PN_CPP_EXTERN sender open_sender(const std::string &addr, handler *h=0);
+ /** Create and open a sender with target=addr and optional link options opts*/
+ PN_CPP_EXTERN sender open_sender(const std::string &addr, const link_options &opts = link_options());
- /** Create and open a receiver with target=addr and optional handler h */
- PN_CPP_EXTERN receiver open_receiver(const std::string &addr, bool dynamic=false, handler *h=0);
+ /** Create and open a receiver with target=addr and optional link options opts */
+ PN_CPP_EXTERN receiver open_receiver(const std::string &addr, const link_options &opts = link_options());
/** Get the endpoint state */
PN_CPP_EXTERN endpoint::state state() const;
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/b1b85f6c/proton-c/bindings/cpp/include/proton/terminus.hpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/include/proton/terminus.hpp b/proton-c/bindings/cpp/include/proton/terminus.hpp
index 0a1d4b8..3aeda23 100644
--- a/proton-c/bindings/cpp/include/proton/terminus.hpp
+++ b/proton-c/bindings/cpp/include/proton/terminus.hpp
@@ -32,7 +32,7 @@ namespace proton {
class link;
/** A terminus represents one end of a link.
- * The source terminus is where originate, the target terminus is where they go.
+ * The source terminus is where messages originate, the target terminus is where they go.
*/
class terminus
{
@@ -46,13 +46,21 @@ class terminus
COORDINATOR = PN_COORDINATOR ///< Transaction co-ordinator
};
- /// Expiry policy
- enum expiry_policy_t {
+ /// Durability
+ enum durability_t {
NONDURABLE = PN_NONDURABLE,
CONFIGURATION = PN_CONFIGURATION,
DELIVERIES = PN_DELIVERIES
};
+ /// Expiry policy
+ enum expiry_policy_t {
+ EXPIRE_WITH_LINK = PN_EXPIRE_WITH_LINK,
+ EXPIRE_WITH_SESSION = PN_EXPIRE_WITH_SESSION,
+ EXPIRE_WITH_CONNECTION = PN_EXPIRE_WITH_CONNECTION,
+ EXPIRE_NEVER = PN_EXPIRE_NEVER
+ };
+
/// Distribution mode
enum distribution_mode_t {
MODE_UNSPECIFIED = PN_DIST_MODE_UNSPECIFIED,
@@ -66,10 +74,13 @@ class terminus
PN_CPP_EXTERN void expiry_policy(expiry_policy_t);
PN_CPP_EXTERN distribution_mode_t distribution_mode() const;
PN_CPP_EXTERN void distribution_mode(distribution_mode_t);
+ PN_CPP_EXTERN durability_t durability();
+ PN_CPP_EXTERN void durability(durability_t);
PN_CPP_EXTERN std::string address() const;
PN_CPP_EXTERN void address(const std::string &);
PN_CPP_EXTERN bool dynamic() const;
PN_CPP_EXTERN void dynamic(bool);
+ // TODO: filter + related selector
private:
pn_terminus_t* object_;
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/b1b85f6c/proton-c/bindings/cpp/include/proton/types.hpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/include/proton/types.hpp b/proton-c/bindings/cpp/include/proton/types.hpp
index 062566a..d0f3da6 100644
--- a/proton-c/bindings/cpp/include/proton/types.hpp
+++ b/proton-c/bindings/cpp/include/proton/types.hpp
@@ -78,7 +78,7 @@ struct type_error : public decode_error {
type_id want; ///< Expected type_id
type_id got; ///< Actual type_id
};
-
+
///@cond INTERNAL
/// Provide a full set of comparison operators for proton:: types that have < and ==.
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/b1b85f6c/proton-c/bindings/cpp/src/blocking_receiver.cpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/src/blocking_receiver.cpp b/proton-c/bindings/cpp/src/blocking_receiver.cpp
index 00d3dce..ed70634 100644
--- a/proton-c/bindings/cpp/src/blocking_receiver.cpp
+++ b/proton-c/bindings/cpp/src/blocking_receiver.cpp
@@ -46,7 +46,7 @@ blocking_receiver::blocking_receiver(
class blocking_connection &c, const std::string& addr, int credit, bool dynamic) :
blocking_link(c), fetcher_(new blocking_fetcher(credit))
{
- open(c.impl_->connection_.open_receiver(addr, dynamic, fetcher_.get()));
+ open(c.impl_->connection_.open_receiver(addr, link_options().dynamic_address(dynamic).handler(fetcher_.get())));
std::string sa = link_.source().address();
std::string rsa = link_.remote_source().address();
if (!sa.empty() && sa.compare(rsa) != 0) {
@@ -56,7 +56,7 @@ blocking_receiver::blocking_receiver(
throw error(MSG(txt));
}
if (credit)
- link_.flow(credit);
+ link_.receiver().flow(credit);
}
blocking_receiver::~blocking_receiver() { link_.detach_handler(); }
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/b1b85f6c/proton-c/bindings/cpp/src/connection.cpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/src/connection.cpp b/proton-c/bindings/cpp/src/connection.cpp
index 41c56f6..7469954 100644
--- a/proton-c/bindings/cpp/src/connection.cpp
+++ b/proton-c/bindings/cpp/src/connection.cpp
@@ -98,13 +98,13 @@ session connection::default_session() {
return ctx.default_session;
}
-sender connection::open_sender(const std::string &addr, handler *h) {
- return default_session().open_sender(addr, h);
+sender connection::open_sender(const std::string &addr, const link_options &opts) {
+ return default_session().open_sender(addr, opts);
}
-receiver connection::open_receiver(const std::string &addr, bool dynamic, handler *h)
+receiver connection::open_receiver(const std::string &addr, const link_options &opts)
{
- return default_session().open_receiver(addr, dynamic, h);
+ return default_session().open_receiver(addr, opts);
}
endpoint::state connection::state() const { return pn_connection_state(pn_object()); }
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/b1b85f6c/proton-c/bindings/cpp/src/container.cpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/src/container.cpp b/proton-c/bindings/cpp/src/container.cpp
index 1924cc0..2eaad07 100644
--- a/proton-c/bindings/cpp/src/container.cpp
+++ b/proton-c/bindings/cpp/src/container.cpp
@@ -21,6 +21,7 @@
#include "proton/container.hpp"
#include "messaging_event.hpp"
#include "proton/connection.hpp"
+#include "proton/link_options.hpp"
#include "proton/session.hpp"
#include "proton/messaging_adapter.hpp"
#include "proton/acceptor.hpp"
@@ -58,12 +59,12 @@ std::string container::id() const { return impl_->id_; }
void container::run() { impl_->reactor_.run(); }
-sender container::open_sender(const proton::url &url) {
- return impl_->open_sender(url);
+sender container::open_sender(const proton::url &url, const proton::link_options &lo, const connection_options &co) {
+ return impl_->open_sender(url, lo, co);
}
-receiver container::open_receiver(const proton::url &url) {
- return impl_->open_receiver(url);
+receiver container::open_receiver(const proton::url &url, const proton::link_options &lo, const connection_options &co) {
+ return impl_->open_receiver(url, lo, co);
}
acceptor container::listen(const proton::url &url, const connection_options &opts) {
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/b1b85f6c/proton-c/bindings/cpp/src/container_impl.cpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/src/container_impl.cpp b/proton-c/bindings/cpp/src/container_impl.cpp
index b6e057e..5a50cdd 100644
--- a/proton-c/bindings/cpp/src/container_impl.cpp
+++ b/proton-c/bindings/cpp/src/container_impl.cpp
@@ -167,21 +167,29 @@ connection container_impl::connect(const proton::url &url, const connection_opti
return conn;
}
-sender container_impl::open_sender(const proton::url &url) {
- connection conn = connect(url, connection_options());
+sender container_impl::open_sender(const proton::url &url, const proton::link_options &o1, const connection_options &o2) {
+ proton::link_options lopts(link_options_);
+ lopts.override(o1);
+ connection_options copts(client_connection_options_);
+ copts.override(o2);
+ connection conn = connect(url, copts);
std::string path = url.path();
- sender snd = conn.default_session().open_sender(id_ + '-' + path);
+ sender snd = conn.default_session().create_sender(id_ + '-' + path);
snd.target().address(path);
- snd.open();
+ snd.open(lopts);
return snd;
}
-receiver container_impl::open_receiver(const proton::url &url) {
- connection conn = connect(url, connection_options());
+receiver container_impl::open_receiver(const proton::url &url, const proton::link_options &o1, const connection_options &o2) {
+ proton::link_options lopts(link_options_);
+ lopts.override(o1);
+ connection_options copts(client_connection_options_);
+ copts.override(o2);
+ connection conn = connect(url, copts);
std::string path = url.path();
- receiver rcv = conn.default_session().open_receiver(id_ + '-' + path);
+ receiver rcv = conn.default_session().create_receiver(id_ + '-' + path);
rcv.source().address(path);
- rcv.open();
+ rcv.open(lopts);
return rcv;
}
@@ -225,6 +233,10 @@ void container_impl::server_connection_options(const connection_options &opts) {
server_connection_options_ = opts;
}
+void container_impl::link_options(const proton::link_options &opts) {
+ link_options_ = opts;
+}
+
void container_impl::configure_server_connection(connection &c) {
pn_acceptor_t *pnp = pn_connection_acceptor(connection_options::pn_connection(c));
listener_context &lc(listener_context::get(pnp));
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/b1b85f6c/proton-c/bindings/cpp/src/container_impl.hpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/src/container_impl.hpp b/proton-c/bindings/cpp/src/container_impl.hpp
index 31ec0cf..66c3aa6 100644
--- a/proton-c/bindings/cpp/src/container_impl.hpp
+++ b/proton-c/bindings/cpp/src/container_impl.hpp
@@ -48,10 +48,8 @@ class container_impl
PN_CPP_EXTERN container_impl(container&, handler *, const std::string& id);
PN_CPP_EXTERN ~container_impl();
PN_CPP_EXTERN connection connect(const url&, const connection_options&);
- PN_CPP_EXTERN sender open_sender(connection &connection, const std::string &addr, handler *h);
- PN_CPP_EXTERN sender open_sender(const url&);
- PN_CPP_EXTERN receiver open_receiver(connection &connection, const std::string &addr, bool dynamic, handler *h);
- PN_CPP_EXTERN receiver open_receiver(const url&);
+ PN_CPP_EXTERN sender open_sender(const url&, const proton::link_options &, const connection_options &);
+ PN_CPP_EXTERN receiver open_receiver(const url&, const proton::link_options &, const connection_options &);
PN_CPP_EXTERN class acceptor listen(const url&, const connection_options &);
PN_CPP_EXTERN duration timeout();
PN_CPP_EXTERN void timeout(duration timeout);
@@ -59,6 +57,8 @@ class container_impl
const connection_options& client_connection_options() { return client_connection_options_; }
void server_connection_options(const connection_options &);
const connection_options& server_connection_options() { return server_connection_options_; }
+ void link_options(const proton::link_options&);
+ const proton::link_options& link_options() { return link_options_; }
void configure_server_connection(connection &c);
task schedule(int delay, handler *h);
@@ -78,6 +78,7 @@ class container_impl
uint64_t link_id_;
connection_options client_connection_options_;
connection_options server_connection_options_;
+ proton::link_options link_options_;
friend class container;
};
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/b1b85f6c/proton-c/bindings/cpp/src/link.cpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/src/link.cpp b/proton-c/bindings/cpp/src/link.cpp
index cb66933..37337c0 100644
--- a/proton-c/bindings/cpp/src/link.cpp
+++ b/proton-c/bindings/cpp/src/link.cpp
@@ -31,7 +31,8 @@
namespace proton {
-void link::open() {
+void link::open(const link_options &lo) {
+ lo.apply(*this);
pn_link_open(pn_object());
}
@@ -59,9 +60,9 @@ int link::credit() const {
return pn_link_credit(pn_object());
}
-void link::flow(int credit) {
- pn_link_flow(pn_object(), credit);
-}
+int link::queued() { return pn_link_queued(pn_object()); }
+int link::unsettled() { return pn_link_unsettled(pn_object()); }
+int link::drained() { return pn_link_drained(pn_object()); }
terminus link::source() const { return pn_link_source(pn_object()); }
terminus link::target() const { return pn_link_target(pn_object()); }
@@ -107,5 +108,28 @@ link link::next(endpoint::state s) const
return pn_link_next(pn_object(), s);
}
+link::sender_settle_mode_t link::sender_settle_mode() {
+ return (sender_settle_mode_t) pn_link_snd_settle_mode(pn_object());
+}
+
+void link::sender_settle_mode(sender_settle_mode_t mode) {
+ pn_link_set_snd_settle_mode(pn_object(), (pn_snd_settle_mode_t) mode);
+}
+
+link::receiver_settle_mode_t link::receiver_settle_mode() {
+ return (receiver_settle_mode_t) pn_link_rcv_settle_mode(pn_object());
+}
+
+void link::receiver_settle_mode(receiver_settle_mode_t mode) {
+ pn_link_set_rcv_settle_mode(pn_object(), (pn_rcv_settle_mode_t) mode);
+}
+
+link::sender_settle_mode_t link::remote_sender_settle_mode() {
+ return (sender_settle_mode_t) pn_link_remote_snd_settle_mode(pn_object());
}
+link::receiver_settle_mode_t link::remote_receiver_settle_mode() {
+ return (receiver_settle_mode_t) pn_link_remote_rcv_settle_mode(pn_object());
+}
+
+}
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/b1b85f6c/proton-c/bindings/cpp/src/link_options.cpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/src/link_options.cpp b/proton-c/bindings/cpp/src/link_options.cpp
new file mode 100644
index 0000000..1eabdf4
--- /dev/null
+++ b/proton-c/bindings/cpp/src/link_options.cpp
@@ -0,0 +1,123 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "proton/link_options.hpp"
+#include "proton/link.hpp"
+#include "msg.hpp"
+
+
+namespace proton {
+
+template <class T> struct option {
+ T value;
+ bool set;
+
+ option() : value(), set(false) {}
+ option& operator=(const T& x) { value = x; set = true; return *this; }
+ void override(const option<T>& x) { if (x.set) *this = x.value; }
+};
+
+class link_options::impl {
+ public:
+ option<class handler*> handler;
+ option<terminus::distribution_mode_t> distribution_mode;
+ option<bool> durable_subscription;
+ option<link_delivery_mode_t> delivery_mode;
+ option<bool> dynamic_address;
+ option<std::string> local_address;
+
+ void apply(link& l) {
+ if (l.state() & endpoint::LOCAL_UNINIT) {
+ bool sender = !l.receiver();
+ if (local_address.set) {
+ const char *addr = local_address.value.empty() ? NULL : local_address.value.c_str();
+ if (sender)
+ l.target().address(addr);
+ else
+ l.source().address(addr);
+ }
+ if (delivery_mode.set) {
+ switch (delivery_mode.value) {
+ case AT_MOST_ONCE:
+ l.sender_settle_mode(link::SETTLED);
+ break;
+ case AT_LEAST_ONCE:
+ l.sender_settle_mode(link::UNSETTLED);
+ l.receiver_settle_mode(link::SETTLE_ALWAYS);
+ break;
+ default:
+ break;
+ }
+ }
+ if (handler.set) {
+ if (handler.value)
+ l.handler(*handler.value);
+ else
+ l.detach_handler();
+ }
+ if (!sender) {
+ // receiver only options
+ if (distribution_mode.set) l.source().distribution_mode(distribution_mode.value);
+ if (durable_subscription.set && durable_subscription.value) {
+ l.source().durability(terminus::DELIVERIES);
+ l.source().expiry_policy(terminus::EXPIRE_NEVER);
+ }
+ if (dynamic_address.set)
+ l.source().dynamic(dynamic_address.value);
+ }
+ }
+ }
+
+ void override(const impl& x) {
+ handler.override(x.handler);
+ distribution_mode.override(x.distribution_mode);
+ durable_subscription.override(x.durable_subscription);
+ delivery_mode.override(x.delivery_mode);
+ dynamic_address.override(x.dynamic_address);
+ local_address.override(x.local_address);
+ }
+
+};
+
+link_options::link_options() : impl_(new impl()) {}
+link_options::link_options(const link_options& x) : impl_(new impl()) {
+ *this = x;
+}
+link_options::~link_options() {}
+
+link_options& link_options::operator=(const link_options& x) {
+ *impl_ = *x.impl_;
+ return *this;
+}
+
+void link_options::override(const link_options& x) { impl_->override(*x.impl_); }
+
+link_options& link_options::handler(class handler *h) { impl_->handler = h; return *this; }
+link_options& link_options::browsing(bool b) { distribution_mode(b ? terminus::COPY : terminus::MOVE); return *this; }
+link_options& link_options::distribution_mode(terminus::distribution_mode_t m) { impl_->distribution_mode = m; return *this; }
+link_options& link_options::durable_subscription(bool b) {impl_->durable_subscription = b; return *this; }
+link_options& link_options::delivery_mode(link_delivery_mode_t m) {impl_->delivery_mode = m; return *this; }
+link_options& link_options::dynamic_address(bool b) {impl_->dynamic_address = b; return *this; }
+link_options& link_options::local_address(const std::string &addr) {impl_->local_address = addr; return *this; }
+
+void link_options::apply(link& l) const { impl_->apply(l); }
+handler* link_options::handler() const { return impl_->handler.value; }
+
+} // namespace proton
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/b1b85f6c/proton-c/bindings/cpp/src/sender.cpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/src/sender.cpp b/proton-c/bindings/cpp/src/sender.cpp
index 177e6f0..56f737b 100644
--- a/proton-c/bindings/cpp/src/sender.cpp
+++ b/proton-c/bindings/cpp/src/sender.cpp
@@ -54,4 +54,8 @@ delivery sender::send(const message &message) {
return dlv;
}
+int sender::available() { return pn_link_available(pn_object()); }
+void sender::offered(int c) { pn_link_offered(pn_object(), c); }
+
+
}
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/b1b85f6c/proton-c/bindings/cpp/src/session.cpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/src/session.cpp b/proton-c/bindings/cpp/src/session.cpp
index c6c2233..c851ef7 100644
--- a/proton-c/bindings/cpp/src/session.cpp
+++ b/proton-c/bindings/cpp/src/session.cpp
@@ -53,21 +53,18 @@ sender session::create_sender(const std::string& name) {
return pn_sender(pn_object(), set_name(name, this).c_str());
}
-sender session::open_sender(const std::string &addr, handler *h) {
+sender session::open_sender(const std::string &addr, const link_options &lo) {
sender snd = create_sender();
snd.target().address(addr);
- if (h) snd.handler(*h);
- snd.open();
+ snd.open(lo);
return snd;
}
-receiver session::open_receiver(const std::string &addr, bool dynamic, handler *h)
+receiver session::open_receiver(const std::string &addr, const link_options &lo)
{
receiver rcv = create_receiver();
rcv.source().address(addr);
- if (dynamic) rcv.source().dynamic(true);
- if (h) rcv.handler(*h);
- rcv.open();
+ rcv.open(lo);
return rcv;
}
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/b1b85f6c/proton-c/bindings/cpp/src/terminus.cpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/src/terminus.cpp b/proton-c/bindings/cpp/src/terminus.cpp
index 1a8ed8a..cae148a 100644
--- a/proton-c/bindings/cpp/src/terminus.cpp
+++ b/proton-c/bindings/cpp/src/terminus.cpp
@@ -33,7 +33,7 @@ void terminus::type(type_t type) {
}
terminus::expiry_policy_t terminus::expiry_policy() const {
- return expiry_policy_t(pn_terminus_get_type(object_));
+ return expiry_policy_t(pn_terminus_get_expiry_policy(object_));
}
void terminus::expiry_policy(expiry_policy_t policy) {
@@ -41,13 +41,21 @@ void terminus::expiry_policy(expiry_policy_t policy) {
}
terminus::distribution_mode_t terminus::distribution_mode() const {
- return distribution_mode_t(pn_terminus_get_type(object_));
+ return distribution_mode_t(pn_terminus_get_distribution_mode(object_));
}
void terminus::distribution_mode(distribution_mode_t mode) {
pn_terminus_set_distribution_mode(object_, pn_distribution_mode_t(mode));
}
+terminus::durability_t terminus::durability() {
+ return (durability_t) pn_terminus_get_durability(object_);
+}
+
+void terminus::durability(durability_t mode) {
+ pn_terminus_set_durability(object_, (pn_durability_t) mode);
+}
+
std::string terminus::address() const {
const char *addr = pn_terminus_get_address(object_);
return addr ? std::string(addr) : std::string();
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org