You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ac...@apache.org on 2018/07/03 22:12:54 UTC
[05/89] [abbrv] [partial] qpid-proton git commit: PROTON-1728:
Reorganize the source tree
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/37136940/cpp/examples/flow_control.cpp
----------------------------------------------------------------------
diff --git a/cpp/examples/flow_control.cpp b/cpp/examples/flow_control.cpp
new file mode 100644
index 0000000..c74070c
--- /dev/null
+++ b/cpp/examples/flow_control.cpp
@@ -0,0 +1,261 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "options.hpp"
+
+#include <proton/connection.hpp>
+#include <proton/connection_options.hpp>
+#include <proton/container.hpp>
+#include <proton/delivery.hpp>
+#include <proton/listen_handler.hpp>
+#include <proton/listener.hpp>
+#include <proton/message.hpp>
+#include <proton/messaging_handler.hpp>
+#include <proton/receiver_options.hpp>
+#include <proton/sender.hpp>
+#include <proton/tracker.hpp>
+
+#include <iostream>
+#include <sstream>
+
+#include "fake_cpp11.hpp"
+
+namespace {
+
+bool verbose = true;
+
+void verify(bool success, const std::string &msg) {
+ if (!success)
+ throw std::runtime_error("example failure:" + msg);
+ else {
+ std::cout << "success: " << msg << std::endl;
+ if (verbose) std::cout << std::endl;
+ }
+}
+
+}
+
+// flow_sender manages the incoming connection and acts as the message sender.
+class flow_sender : public proton::messaging_handler {
+ private:
+ int available; // Number of messages the sender may send assuming sufficient credit.
+ int sequence;
+
+ public:
+ flow_sender() : available(0), sequence(0) {}
+
+ void send_available_messages(proton::sender &s) {
+ for (int i = sequence; available && s.credit() > 0; i++) {
+ std::ostringstream mbody;
+ mbody << "flow_sender message " << sequence++;
+ proton::message m(mbody.str());
+ s.send(m);
+ available--;
+ }
+ }
+
+ void on_sendable(proton::sender &s) OVERRIDE {
+ if (verbose)
+ std::cout << "flow_sender in \"on_sendable\" with credit " << s.credit()
+ << " and " << available << " available messages" << std::endl;
+ send_available_messages(s);
+ }
+
+ void on_sender_drain_start(proton::sender &s) OVERRIDE {
+ if (verbose)
+ std::cout << "flow_sender in \"on_drain_start\" with credit " << s.credit()
+ << " and " << available << " available messages" << std::endl;
+ send_available_messages(s);
+ if (s.credit()) {
+ s.return_credit(); // return the rest
+ }
+ }
+
+ void set_available(int n) { available = n; }
+};
+
+class flow_receiver : public proton::messaging_handler {
+ public:
+ int stage;
+ int received;
+ flow_sender &sender;
+
+ flow_receiver(flow_sender &s) : stage(0), received(0), sender(s) {}
+
+ void example_setup(int n) {
+ received = 0;
+ sender.set_available(n);
+ }
+
+ void run_stage(proton::receiver &r, const std::string &caller) {
+ // Serialize the progression of the flow control examples.
+ switch (stage) {
+ case 0:
+ if (verbose) std::cout << "Example 1. Simple use of credit." << std::endl;
+ // TODO: add timeout callbacks, show no messages until credit.
+ example_setup(2);
+ r.add_credit(2);
+ break;
+ case 1:
+ if (r.credit() > 0) return;
+ verify(received == 2, "Example 1: simple credit");
+
+ if (verbose) std::cout << "Example 2. Use basic drain, sender has 3 \"immediate\" messages." << std::endl;
+ example_setup(3);
+ r.add_credit(5); // ask for up to 5
+ r.drain(); // but only use what's available
+ break;
+ case 2:
+ if (caller == "on_message") return;
+ if (caller == "on_receiver_drain_finish") {
+ // Note that unused credit of 2 at sender is returned and is now 0.
+ verify(received == 3 && r.credit() == 0, "Example 2: basic drain");
+
+ if (verbose) std::cout << "Example 3. Drain use with no credit." << std::endl;
+ example_setup(0);
+ r.drain();
+ break;
+ }
+ verify(false, "example 2 run_stage");
+ return;
+
+ case 3:
+ verify(caller == "on_receiver_drain_finish" && received == 0, "Example 3: drain without credit");
+
+ if (verbose) std::cout << "Example 4. Show using high(10)/low(3) watermark for 25 messages." << std::endl;
+ example_setup(25);
+ r.add_credit(10);
+ break;
+
+ case 4:
+ if (received < 25) {
+ // Top up credit as needed.
+ uint32_t credit = r.credit();
+ if (credit <= 3) {
+ uint32_t new_credit = 10;
+ uint32_t remaining = 25 - received;
+ if (new_credit > remaining)
+ new_credit = remaining;
+ if (new_credit > credit) {
+ r.add_credit(new_credit - credit);
+ if (verbose)
+ std::cout << "flow_receiver adding credit for " << new_credit - credit
+ << " messages" << std::endl;
+ }
+ }
+ return;
+ }
+
+ verify(received == 25 && r.credit() == 0, "Example 4: high/low watermark");
+ r.connection().close();
+ break;
+
+ default:
+ throw std::runtime_error("run_stage sequencing error");
+ }
+ stage++;
+ }
+
+ void on_receiver_open(proton::receiver &r) OVERRIDE {
+ run_stage(r, "on_receiver_open");
+ }
+
+ void on_message(proton::delivery &d, proton::message &m) OVERRIDE {
+ if (verbose)
+ std::cout << "flow_receiver in \"on_message\" with " << m.body() << std::endl;
+ proton::receiver r(d.receiver());
+ received++;
+ run_stage(r, "on_message");
+ }
+
+ void on_receiver_drain_finish(proton::receiver &r) OVERRIDE {
+ if (verbose)
+ std::cout << "flow_receiver in \"on_receiver_drain_finish\"" << std::endl;
+ run_stage(r, "on_receiver_drain_finish");
+ }
+};
+
+class flow_listener : public proton::listen_handler {
+ proton::connection_options opts;
+ public:
+ flow_listener(flow_sender& sh) {
+ opts.handler(sh);
+ }
+
+ void on_open(proton::listener& l) OVERRIDE {
+ std::ostringstream url;
+ url << "//:" << l.port() << "/example"; // Connect to the actual listening port
+ l.container().connect(url.str());
+ }
+
+ proton::connection_options on_accept(proton::listener&) OVERRIDE { return opts; }
+};
+
+class flow_control : public proton::messaging_handler {
+ private:
+ proton::listener listener;
+ flow_sender send_handler;
+ flow_receiver receive_handler;
+ flow_listener listen_handler;
+
+ public:
+ flow_control() : receive_handler(send_handler), listen_handler(send_handler) {}
+
+ void on_container_start(proton::container &c) OVERRIDE {
+ // Listen on a dynamic port on the local host.
+ listener = c.listen("//:0", listen_handler);
+ }
+
+ void on_connection_open(proton::connection &c) OVERRIDE {
+ if (c.active()) {
+ // outbound connection
+ c.open_receiver("flow_example", proton::receiver_options().handler(receive_handler).credit_window(0));
+ }
+ }
+
+ void on_connection_close(proton::connection &) OVERRIDE {
+ listener.stop();
+ }
+};
+
+int main(int argc, char **argv) {
+ // Pick an "unusual" port since we are going to be talking to
+ // ourselves, not a broker.
+ bool quiet = false;
+
+ example::options opts(argc, argv);
+ opts.add_flag(quiet, 'q', "quiet", "suppress additional commentary of credit allocation and consumption");
+
+ try {
+ opts.parse();
+ if (quiet)
+ verbose = false;
+
+ flow_control fc;
+ proton::container(fc).run();
+
+ return 0;
+ } catch (const std::exception& e) {
+ std::cerr << e.what() << std::endl;
+ }
+
+ return 1;
+}
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/37136940/cpp/examples/helloworld.cpp
----------------------------------------------------------------------
diff --git a/cpp/examples/helloworld.cpp b/cpp/examples/helloworld.cpp
new file mode 100644
index 0000000..5962826
--- /dev/null
+++ b/cpp/examples/helloworld.cpp
@@ -0,0 +1,76 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include <proton/connection.hpp>
+#include <proton/container.hpp>
+#include <proton/delivery.hpp>
+#include <proton/message.hpp>
+#include <proton/messaging_handler.hpp>
+#include <proton/tracker.hpp>
+
+#include <iostream>
+
+#include "fake_cpp11.hpp"
+
+class hello_world : public proton::messaging_handler {
+ std::string conn_url_;
+ std::string addr_;
+
+ public:
+ hello_world(const std::string& u, const std::string& a) :
+ conn_url_(u), addr_(a) {}
+
+ void on_container_start(proton::container& c) OVERRIDE {
+ c.connect(conn_url_);
+ }
+
+ void on_connection_open(proton::connection& c) OVERRIDE {
+ c.open_receiver(addr_);
+ c.open_sender(addr_);
+ }
+
+ void on_sendable(proton::sender &s) OVERRIDE {
+ proton::message m("Hello World!");
+ s.send(m);
+ s.close();
+ }
+
+ void on_message(proton::delivery &d, proton::message &m) OVERRIDE {
+ std::cout << m.body() << std::endl;
+ d.connection().close();
+ }
+};
+
+int main(int argc, char **argv) {
+ try {
+ std::string conn_url = argc > 1 ? argv[1] : "//127.0.0.1:5672";
+ std::string addr = argc > 2 ? argv[2] : "examples";
+
+ hello_world hw(conn_url, addr);
+ proton::container(hw).run();
+
+ return 0;
+ } catch (const std::exception& e) {
+ std::cerr << e.what() << std::endl;
+ }
+
+ return 1;
+}
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/37136940/cpp/examples/message_properties.cpp
----------------------------------------------------------------------
diff --git a/cpp/examples/message_properties.cpp b/cpp/examples/message_properties.cpp
new file mode 100644
index 0000000..cb5c6b8
--- /dev/null
+++ b/cpp/examples/message_properties.cpp
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include <proton/message.hpp>
+#include <proton/types.hpp>
+#include <iostream>
+#include <map>
+
+int main(int argc, char **argv) {
+ try {
+ proton::message m;
+
+ // Setting properties: legal types are converted automatically to their
+ // AMQP counterpart.
+ m.properties().put("short", int16_t(123));
+ m.properties().put("string", "foo");
+ m.properties().put("symbol", proton::symbol("sym"));
+
+ // Examining properties using proton::get()
+
+ // 1 argument get<>() template specifies expected type of property.
+ std::string s = proton::get<std::string>(m.properties().get("string"));
+
+ // 2 argument get, property must have matching type to output argument.
+ int16_t i;
+ proton::get(m.properties().get("short"), i);
+
+ // Checking property types
+ proton::type_id type = m.properties().get("symbol").type();
+ if (type != proton::SYMBOL) {
+ throw std::logic_error("wrong type!");
+ }
+
+ // proton::scalar has its own ostream <<
+ std::cout << "using put/get:"
+ << " short=" << i
+ << " string=" << s
+ << " symbol=" << m.properties().get("symbol")
+ << std::endl;
+
+ // Converting properties to a convertible type
+ std::cout << "using coerce:"
+ << " short(as long)="
+ << proton::coerce<long>(m.properties().get("short"))
+ << std::endl;
+
+ // Extract the properties as a std::map for more complex map operations.
+ // You can use other map and sequence types to hold a map, see @ref types_page
+ typedef std::map<std::string, proton::scalar> property_map;
+ property_map props;
+ proton::get(m.properties(), props);
+ for (property_map::iterator i = props.begin(); i != props.end(); ++i) {
+ std::cout << "props[" << i->first << "]=" << i->second << std::endl;
+ }
+ props["string"] = "bar";
+ props["short"] = 42;
+ // Update the properties in the message from the modified props map
+ m.properties() = props;
+
+ std::cout << "short=" << m.properties().get("short")
+ << " string=" << m.properties().get("string")
+ << std::endl;
+
+ // proton::get throws an exception if types do not match exactly.
+ try {
+ proton::get<uint32_t>(m.properties().get("short")); // bad: uint32_t != int16_t
+ throw std::logic_error("expected exception");
+ } catch (const proton::conversion_error& e) {
+ std::cout << "expected conversion_error: \"" << e.what() << '"' << std::endl;
+ }
+
+ // proton::coerce throws an exception if types are not convertible.
+ try {
+ proton::get<uint32_t>(m.properties().get("string")); // bad: string to uint32_t
+ throw std::logic_error("expected exception");
+ } catch (const proton::conversion_error& e) {
+ std::cout << "expected conversion_error: \"" << e.what() << '"' << std::endl;
+ }
+
+ return 0;
+ } catch (const std::exception& e) {
+ std::cerr << "unexpected exception: " << e.what() << std::endl;
+ return 1;
+ }
+}
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/37136940/cpp/examples/multithreaded_client.cpp
----------------------------------------------------------------------
diff --git a/cpp/examples/multithreaded_client.cpp b/cpp/examples/multithreaded_client.cpp
new file mode 100644
index 0000000..78085e2
--- /dev/null
+++ b/cpp/examples/multithreaded_client.cpp
@@ -0,0 +1,185 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+//
+// C++11 or greater
+//
+// A multi-threaded client that calls proton::container::run() in one thread, sends
+// messages in another and receives messages in a third.
+//
+// Note this client does not deal with flow-control. If the sender is faster
+// than the receiver, messages will build up in memory on the sending side.
+// See @ref multithreaded_client_flow_control.cpp for a more complex example with
+// flow control.
+//
+// NOTE: no proper error handling
+
+#include <proton/connection.hpp>
+#include <proton/connection_options.hpp>
+#include <proton/container.hpp>
+#include <proton/message.hpp>
+#include <proton/messaging_handler.hpp>
+#include <proton/receiver.hpp>
+#include <proton/sender.hpp>
+#include <proton/work_queue.hpp>
+
+#include <condition_variable>
+#include <iostream>
+#include <mutex>
+#include <queue>
+#include <sstream>
+#include <string>
+#include <thread>
+
+// Lock output from threads to avoid scrambling
+std::mutex out_lock;
+#define OUT(x) do { std::lock_guard<std::mutex> l(out_lock); x; } while (false)
+
+// Handler for a single thread-safe sending and receiving connection.
+class client : public proton::messaging_handler {
+ // Invariant
+ const std::string url_;
+ const std::string address_;
+
+ // Only used in proton handler thread
+ proton::sender sender_;
+
+ // Shared by proton and user threads, protected by lock_
+ std::mutex lock_;
+ proton::work_queue *work_queue_;
+ std::condition_variable sender_ready_;
+ std::queue<proton::message> messages_;
+ std::condition_variable messages_ready_;
+
+ public:
+ client(const std::string& url, const std::string& address) : url_(url), address_(address), work_queue_(0) {}
+
+ // Thread safe
+ void send(const proton::message& msg) {
+ // Use [=] to copy the message, we cannot pass it by reference since it
+ // will be used in another thread.
+ work_queue()->add([=]() { sender_.send(msg); });
+ }
+
+ // Thread safe
+ proton::message receive() {
+ std::unique_lock<std::mutex> l(lock_);
+ while (messages_.empty()) messages_ready_.wait(l);
+ auto msg = std::move(messages_.front());
+ messages_.pop();
+ return msg;
+ }
+
+ // Thread safe
+ void close() {
+ work_queue()->add([=]() { sender_.connection().close(); });
+ }
+
+ private:
+
+ proton::work_queue* work_queue() {
+ // Wait till work_queue_ and sender_ are initialized.
+ std::unique_lock<std::mutex> l(lock_);
+ while (!work_queue_) sender_ready_.wait(l);
+ return work_queue_;
+ }
+
+ // == messaging_handler overrides, only called in proton handler thread
+
+ // Note: this example creates a connection when the container starts.
+ // To create connections after the container has started, use
+ // container::connect().
+ // See @ref multithreaded_client_flow_control.cpp for an example.
+ void on_container_start(proton::container& cont) override {
+ cont.connect(url_);
+ }
+
+ void on_connection_open(proton::connection& conn) override {
+ conn.open_sender(address_);
+ conn.open_receiver(address_);
+ }
+
+ void on_sender_open(proton::sender& s) override {
+ // sender_ and work_queue_ must be set atomically
+ std::lock_guard<std::mutex> l(lock_);
+ sender_ = s;
+ work_queue_ = &s.work_queue();
+ sender_ready_.notify_all();
+ }
+
+ void on_message(proton::delivery& dlv, proton::message& msg) override {
+ std::lock_guard<std::mutex> l(lock_);
+ messages_.push(msg);
+ messages_ready_.notify_all();
+ }
+
+ void on_error(const proton::error_condition& e) override {
+ OUT(std::cerr << "unexpected error: " << e << std::endl);
+ exit(1);
+ }
+};
+
+int main(int argc, const char** argv) {
+ try {
+ if (argc != 4) {
+ std ::cerr <<
+ "Usage: " << argv[0] << " CONNECTION-URL AMQP-ADDRESS MESSAGE-COUNT\n"
+ "CONNECTION-URL: connection address, e.g.'amqp://127.0.0.1'\n"
+ "AMQP-ADDRESS: AMQP node address, e.g. 'examples'\n"
+ "MESSAGE-COUNT: number of messages to send\n";
+ return 1;
+ }
+ const char *url = argv[1];
+ const char *address = argv[2];
+ int n_messages = atoi(argv[3]);
+
+ client cl(url, address);
+ proton::container container(cl);
+ std::thread container_thread([&]() { container.run(); });
+
+ std::thread sender([&]() {
+ for (int i = 0; i < n_messages; ++i) {
+ proton::message msg(std::to_string(i + 1));
+ cl.send(msg);
+ OUT(std::cout << "sent \"" << msg.body() << '"' << std::endl);
+ }
+ });
+
+ int received = 0;
+ std::thread receiver([&]() {
+ for (int i = 0; i < n_messages; ++i) {
+ auto msg = cl.receive();
+ OUT(std::cout << "received \"" << msg.body() << '"' << std::endl);
+ ++received;
+ }
+ });
+
+ sender.join();
+ receiver.join();
+ cl.close();
+ container_thread.join();
+ std::cout << received << " messages sent and received" << std::endl;
+
+ return 0;
+ } catch (const std::exception& e) {
+ std::cerr << e.what() << std::endl;
+ }
+
+ return 1;
+}
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/37136940/cpp/examples/multithreaded_client_flow_control.cpp
----------------------------------------------------------------------
diff --git a/cpp/examples/multithreaded_client_flow_control.cpp b/cpp/examples/multithreaded_client_flow_control.cpp
new file mode 100644
index 0000000..93c6b3d
--- /dev/null
+++ b/cpp/examples/multithreaded_client_flow_control.cpp
@@ -0,0 +1,298 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+//
+// C++11 or greater
+//
+// A multi-threaded client that sends and receives messages from multiple AMQP
+// addresses.
+//
+// Demonstrates how to:
+//
+// - implement proton handlers that interact with user threads safely
+// - block sender threads to respect AMQP flow control
+// - use AMQP flow control to limit message buffering for receivers threads
+//
+// We define sender and receiver classes with simple, thread-safe blocking
+// send() and receive() functions.
+//
+// These classes are also privately proton::message_handler instances. They use
+// the thread-safe proton::work_queue and standard C++ synchronization (std::mutex
+// etc.) to pass messages between user and proton::container threads.
+//
+// NOTE: no proper error handling
+
+#include <proton/connection.hpp>
+#include <proton/connection_options.hpp>
+#include <proton/container.hpp>
+#include <proton/message.hpp>
+#include <proton/messaging_handler.hpp>
+#include <proton/receiver.hpp>
+#include <proton/receiver_options.hpp>
+#include <proton/sender.hpp>
+#include <proton/work_queue.hpp>
+
+#include <atomic>
+#include <condition_variable>
+#include <iostream>
+#include <mutex>
+#include <queue>
+#include <sstream>
+#include <string>
+#include <thread>
+
+// Lock output from threads to avoid scrambling
+std::mutex out_lock;
+#define OUT(x) do { std::lock_guard<std::mutex> l(out_lock); x; } while (false)
+
+// A thread-safe sending connection that blocks sending threads when there
+// is no AMQP credit to send messages.
+class sender : private proton::messaging_handler {
+ // Only used in proton handler thread
+ proton::sender sender_;
+
+ // Shared by proton and user threads, protected by lock_
+ std::mutex lock_;
+ proton::work_queue *work_queue_;
+ std::condition_variable sender_ready_;
+ int queued_; // Queued messages waiting to be sent
+ int credit_; // AMQP credit - number of messages we can send
+
+ public:
+ sender(proton::container& cont, const std::string& url, const std::string& address)
+ : work_queue_(0), queued_(0), credit_(0)
+ {
+ cont.open_sender(url+"/"+address, proton::connection_options().handler(*this));
+ }
+
+ // Thread safe
+ void send(const proton::message& m) {
+ {
+ std::unique_lock<std::mutex> l(lock_);
+ // Don't queue up more messages than we have credit for
+ while (!work_queue_ || queued_ >= credit_) sender_ready_.wait(l);
+ ++queued_;
+ }
+ work_queue_->add([=]() { this->do_send(m); }); // work_queue_ is thread safe
+ }
+
+ // Thread safe
+ void close() {
+ work_queue()->add([=]() { sender_.connection().close(); });
+ }
+
+ private:
+
+ proton::work_queue* work_queue() {
+ // Wait till work_queue_ and sender_ are initialized.
+ std::unique_lock<std::mutex> l(lock_);
+ while (!work_queue_) sender_ready_.wait(l);
+ return work_queue_;
+ }
+
+ // == messaging_handler overrides, only called in proton handler thread
+
+ void on_sender_open(proton::sender& s) override {
+ // Make sure sender_ and work_queue_ are set atomically
+ std::lock_guard<std::mutex> l(lock_);
+ sender_ = s;
+ work_queue_ = &s.work_queue();
+ }
+
+ void on_sendable(proton::sender& s) override {
+ std::lock_guard<std::mutex> l(lock_);
+ credit_ = s.credit();
+ sender_ready_.notify_all(); // Notify senders we have credit
+ }
+
+ // work_queue work items is are automatically dequeued and called by proton
+ // This function is called because it was queued by send()
+ void do_send(const proton::message& m) {
+ sender_.send(m);
+ std::lock_guard<std::mutex> l(lock_);
+ --queued_; // work item was consumed from the work_queue
+ credit_ = sender_.credit(); // update credit
+ sender_ready_.notify_all(); // Notify senders we have space on queue
+ }
+
+ void on_error(const proton::error_condition& e) override {
+ OUT(std::cerr << "unexpected error: " << e << std::endl);
+ exit(1);
+ }
+};
+
+// A thread safe receiving connection that blocks receiving threads when there
+// are no messages available, and maintains a bounded buffer of incoming
+// messages by issuing AMQP credit only when there is space in the buffer.
+class receiver : private proton::messaging_handler {
+ static const size_t MAX_BUFFER = 100; // Max number of buffered messages
+
+ // Used in proton threads only
+ proton::receiver receiver_;
+
+ // Used in proton and user threads, protected by lock_
+ std::mutex lock_;
+ proton::work_queue* work_queue_;
+ std::queue<proton::message> buffer_; // Messages not yet returned by receive()
+ std::condition_variable can_receive_; // Notify receivers of messages
+
+ public:
+
+ // Connect to url
+ receiver(proton::container& cont, const std::string& url, const std::string& address)
+ : work_queue_()
+ {
+ // NOTE:credit_window(0) disables automatic flow control.
+ // We will use flow control to match AMQP credit to buffer capacity.
+ cont.open_receiver(url+"/"+address, proton::receiver_options().credit_window(0),
+ proton::connection_options().handler(*this));
+ }
+
+ // Thread safe receive
+ proton::message receive() {
+ std::unique_lock<std::mutex> l(lock_);
+ // Wait for buffered messages
+ while (!work_queue_ || buffer_.empty())
+ can_receive_.wait(l);
+ proton::message m = std::move(buffer_.front());
+ buffer_.pop();
+ // Add a lambda to the work queue to call receive_done().
+ // This will tell the handler to add more credit.
+ work_queue_->add([=]() { this->receive_done(); });
+ return m;
+ }
+
+ void close() {
+ std::lock_guard<std::mutex> l(lock_);
+ if (work_queue_) work_queue_->add([this]() { this->receiver_.connection().close(); });
+ }
+
+ private:
+ // ==== The following are called by proton threads only.
+
+ void on_receiver_open(proton::receiver& r) override {
+ receiver_ = r;
+ std::lock_guard<std::mutex> l(lock_);
+ work_queue_ = &receiver_.work_queue();
+ receiver_.add_credit(MAX_BUFFER); // Buffer is empty, initial credit is the limit
+ }
+
+ void on_message(proton::delivery &d, proton::message &m) override {
+ // Proton automatically reduces credit by 1 before calling on_message
+ std::lock_guard<std::mutex> l(lock_);
+ buffer_.push(m);
+ can_receive_.notify_all();
+ }
+
+ // called via work_queue
+ void receive_done() {
+ // Add 1 credit, a receiver has taken a message out of the buffer.
+ receiver_.add_credit(1);
+ }
+
+ void on_error(const proton::error_condition& e) override {
+ OUT(std::cerr << "unexpected error: " << e << std::endl);
+ exit(1);
+ }
+};
+
+// ==== Example code using the sender and receiver
+
+// Send n messages
+void send_thread(sender& s, int n) {
+ auto id = std::this_thread::get_id();
+ for (int i = 0; i < n; ++i) {
+ std::ostringstream ss;
+ ss << std::this_thread::get_id() << "-" << i;
+ s.send(proton::message(ss.str()));
+ OUT(std::cout << id << " sent \"" << ss.str() << '"' << std::endl);
+ }
+ OUT(std::cout << id << " sent " << n << std::endl);
+}
+
+// Receive messages till atomic remaining count is 0.
+// remaining is shared among all receiving threads
+void receive_thread(receiver& r, std::atomic_int& remaining) {
+ auto id = std::this_thread::get_id();
+ int n = 0;
+ // atomically check and decrement remaining *before* receiving.
+ // If it is 0 or less then return, as there are no more
+ // messages to receive so calling r.receive() would block forever.
+ while (remaining-- > 0) {
+ auto m = r.receive();
+ ++n;
+ OUT(std::cout << id << " received \"" << m.body() << '"' << std::endl);
+ }
+ OUT(std::cout << id << " received " << n << " messages" << std::endl);
+}
+
+int main(int argc, const char **argv) {
+ try {
+ if (argc != 5) {
+ std::cerr <<
+ "Usage: " << argv[0] << " MESSAGE-COUNT THREAD-COUNT URL\n"
+ "CONNECTION-URL: connection address, e.g.'amqp://127.0.0.1'\n"
+ "AMQP-ADDRESS: AMQP node address, e.g. 'examples'\n"
+ "MESSAGE-COUNT: number of messages to send\n"
+ "THREAD-COUNT: number of sender/receiver thread pairs\n";
+ return 1;
+ }
+
+ const char *url = argv[1];
+ const char *address = argv[2];
+ int n_messages = atoi(argv[3]);
+ int n_threads = atoi(argv[4]);
+ int count = n_messages * n_threads;
+
+ // Total messages to be received, multiple receiver threads will decrement this.
+ std::atomic_int remaining;
+ remaining.store(count);
+
+ // Run the proton container
+ proton::container container;
+ auto container_thread = std::thread([&]() { container.run(); });
+
+ // A single sender and receiver to be shared by all the threads
+ sender send(container, url, address);
+ receiver recv(container, url, address);
+
+ // Start receiver threads, then sender threads.
+ // Starting receivers first gives all receivers a chance to compete for messages.
+ std::vector<std::thread> threads;
+ threads.reserve(n_threads*2); // Avoid re-allocation once threads are started
+ for (int i = 0; i < n_threads; ++i)
+ threads.push_back(std::thread([&]() { receive_thread(recv, remaining); }));
+ for (int i = 0; i < n_threads; ++i)
+ threads.push_back(std::thread([&]() { send_thread(send, n_messages); }));
+
+ // Wait for threads to finish
+ for (auto& t : threads) t.join();
+ send.close();
+ recv.close();
+ container_thread.join();
+ if (remaining > 0)
+ throw std::runtime_error("not all messages were received");
+ std::cout << count << " messages sent and received" << std::endl;
+
+ return 0;
+ } catch (const std::exception& e) {
+ std::cerr << e.what() << std::endl;
+ }
+ return 1;
+}
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/37136940/cpp/examples/options.hpp
----------------------------------------------------------------------
diff --git a/cpp/examples/options.hpp b/cpp/examples/options.hpp
new file mode 100644
index 0000000..dab1bc2
--- /dev/null
+++ b/cpp/examples/options.hpp
@@ -0,0 +1,175 @@
+#ifndef OPTIONS_HPP
+#define OPTIONS_HPP
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include <string>
+#include <sstream>
+#include <ostream>
+#include <vector>
+#include <stdexcept>
+
+namespace example {
+/** bad_option is thrown for option parsing errors */
+struct bad_option : public std::runtime_error {
+ bad_option(const std::string& s) : std::runtime_error(s) {}
+};
+
+/** Simple command-line option parser for example programs */
+class options {
+ public:
+
+ options(int argc, char const * const * argv) : argc_(argc), argv_(argv), prog_(argv[0]), help_() {
+ size_t slash = prog_.find_last_of("/\\");
+ if (slash != std::string::npos)
+ prog_ = prog_.substr(slash+1); // Extract prog name from path
+ add_flag(help_, 'h', "help", "Print the help message");
+ }
+
+ ~options() {
+ for (opts::iterator i = opts_.begin(); i != opts_.end(); ++i)
+ delete *i;
+ }
+
+ /** Updates value when parse() is called if option is present with a value. */
+ template<class T>
+ void add_value(T& value, char short_name, const std::string& long_name, const std::string& description, const std::string var) {
+ opts_.push_back(new option_value<T>(value, short_name, long_name, description, var));
+ }
+
+ /** Sets flag when parse() is called if option is present. */
+ void add_flag(bool& flag, char short_name, const std::string& long_name, const std::string& description) {
+ opts_.push_back(new option_flag(flag, short_name, long_name, description));
+ }
+
+ /** Parse the command line, return the index of the first non-option argument.
+ *@throws bad_option if there is a parsing error or unknown option.
+ */
+ int parse() {
+ int arg = 1;
+ for (; arg < argc_ && argv_[arg][0] == '-'; ++arg) {
+ opts::iterator i = opts_.begin();
+ while (i != opts_.end() && !(*i)->parse(argc_, argv_, arg))
+ ++i;
+ if (i == opts_.end())
+ throw bad_option(std::string("unknown option ") + argv_[arg]);
+ }
+ if (help_) throw bad_option("");
+ return arg;
+ }
+
+ /** Print a usage message */
+ friend std::ostream& operator<<(std::ostream& os, const options& op) {
+ os << std::endl << "usage: " << op.prog_ << " [options]" << std::endl;
+ os << std::endl << "options:" << std::endl;
+ for (opts::const_iterator i = op.opts_.begin(); i < op.opts_.end(); ++i)
+ os << **i << std::endl;
+ return os;
+ }
+
+ private:
+ class option {
+ public:
+ option(char s, const std::string& l, const std::string& d, const std::string v) :
+ short_(std::string("-") + s), long_("--" + l), desc_(d), var_(v) {}
+ virtual ~option() {}
+
+ virtual bool parse(int argc, char const * const * argv, int &i) = 0;
+ virtual void print_default(std::ostream&) const {}
+
+ friend std::ostream& operator<<(std::ostream& os, const option& op) {
+ os << " " << op.short_;
+ if (!op.var_.empty()) os << " " << op.var_;
+ os << ", " << op.long_;
+ if (!op.var_.empty()) os << "=" << op.var_;
+ os << std::endl << " " << op.desc_;
+ op.print_default(os);
+ return os;
+ }
+
+ protected:
+ std::string short_, long_, desc_, var_;
+ };
+
+ template <class T>
+ class option_value : public option {
+ public:
+ option_value(T& value, char s, const std::string& l, const std::string& d, const std::string& v) :
+ option(s, l, d, v), value_(value) {}
+
+ bool parse(int argc, char const * const * argv, int &i) {
+ std::string arg(argv[i]);
+ if (arg == short_ || arg == long_) {
+ if (i < argc-1) {
+ set_value(arg, argv[++i]);
+ return true;
+ } else {
+ throw bad_option("missing value for " + arg);
+ }
+ }
+ if (arg.compare(0, long_.size(), long_) == 0 && arg[long_.size()] == '=' ) {
+ set_value(long_, arg.substr(long_.size()+1));
+ return true;
+ }
+ return false;
+ }
+
+ virtual void print_default(std::ostream& os) const { os << " (default " << value_ << ")"; }
+
+ void set_value(const std::string& opt, const std::string& s) {
+ std::istringstream is(s);
+ is >> value_;
+ if (is.fail() || is.bad())
+ throw bad_option("bad value for " + opt + ": " + s);
+ }
+
+ private:
+ T& value_;
+ };
+
+ class option_flag: public option {
+ public:
+ option_flag(bool& flag, const char s, const std::string& l, const std::string& d) :
+ option(s, l, d, ""), flag_(flag)
+ { flag_ = false; }
+
+ bool parse(int /*argc*/, char const * const * argv, int &i) {
+ if (argv[i] == short_ || argv[i] == long_) {
+ flag_ = true;
+ return true;
+ } else {
+ return false;
+ }
+ }
+
+ private:
+ bool &flag_;
+ };
+
+ typedef std::vector<option*> opts;
+
+ int argc_;
+ char const * const * argv_;
+ std::string prog_;
+ opts opts_;
+ bool help_;
+};
+}
+
+#endif // OPTIONS_HPP
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/37136940/cpp/examples/queue_browser.cpp
----------------------------------------------------------------------
diff --git a/cpp/examples/queue_browser.cpp b/cpp/examples/queue_browser.cpp
new file mode 100644
index 0000000..b306e76
--- /dev/null
+++ b/cpp/examples/queue_browser.cpp
@@ -0,0 +1,70 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include <proton/connection.hpp>
+#include <proton/container.hpp>
+#include <proton/delivery.hpp>
+#include <proton/message.hpp>
+#include <proton/messaging_handler.hpp>
+#include <proton/receiver_options.hpp>
+#include <proton/source_options.hpp>
+
+#include <iostream>
+
+#include "fake_cpp11.hpp"
+
+class queue_browser : public proton::messaging_handler {
+ std::string conn_url_;
+ std::string addr_;
+
+ public:
+ queue_browser(const std::string& u, const std::string& a) :
+ conn_url_(u), addr_(a) {}
+
+ void on_container_start(proton::container& c) OVERRIDE {
+ proton::receiver_options ropts;
+ proton::source_options sopts;
+ ropts.source(sopts.distribution_mode(proton::source::COPY));
+
+ proton::connection conn = c.connect(conn_url_);
+ conn.open_receiver(addr_, ropts);
+ }
+
+ void on_message(proton::delivery&, proton::message& m) OVERRIDE {
+ std::cout << m.body() << std::endl;
+ }
+};
+
+int main(int argc, char** argv) {
+ try {
+ std::string conn_url = argc > 1 ? argv[1] : "//127.0.0.1:5672";
+ std::string addr = argc > 2 ? argv[2] : "examples";
+
+ queue_browser qb(conn_url, addr);
+ proton::container(qb).run();
+
+ return 0;
+ } catch (const std::exception& e) {
+ std::cerr << e.what() << std::endl;
+ }
+
+ return 1;
+}
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/37136940/cpp/examples/reconnect_client.cpp
----------------------------------------------------------------------
diff --git a/cpp/examples/reconnect_client.cpp b/cpp/examples/reconnect_client.cpp
new file mode 100644
index 0000000..ed93214
--- /dev/null
+++ b/cpp/examples/reconnect_client.cpp
@@ -0,0 +1,143 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "options.hpp"
+
+#include <proton/connection.hpp>
+#include <proton/connection_options.hpp>
+#include <proton/container.hpp>
+#include <proton/delivery.hpp>
+#include <proton/link.hpp>
+#include <proton/message.hpp>
+#include <proton/message_id.hpp>
+#include <proton/messaging_handler.hpp>
+#include <proton/reconnect_options.hpp>
+#include <proton/value.hpp>
+#include <proton/types.hpp>
+
+#include <iostream>
+#include <map>
+#include <string>
+
+#include "fake_cpp11.hpp"
+
+class reconnect_client : public proton::messaging_handler {
+ std::string url;
+ std::string address;
+ std::vector<std::string> failovers;
+ proton::sender sender;
+ int sent;
+ int expected;
+ int received;
+
+ public:
+ reconnect_client(const std::string &u, const std::string& a, int c, const std::vector<std::string>& f) :
+ url(u), address(a), failovers(f), sent(0), expected(c), received(0) {}
+
+ private:
+ void on_container_start(proton::container &c) OVERRIDE {
+ proton::connection_options co;
+ proton::reconnect_options ro;
+
+ ro.failover_urls(failovers);
+ co.reconnect(ro);
+ c.connect(url, co);
+ }
+
+ void on_connection_open(proton::connection & c) OVERRIDE {
+ c.open_receiver(address);
+ c.open_sender(address);
+ // reconnect we probably lost the last message sent
+ sent = received;
+ std::cout << "simple_recv listening on " << url << std::endl;
+ }
+
+ void on_message(proton::delivery &d, proton::message &msg) OVERRIDE {
+ if (proton::coerce<int>(msg.id()) < received) {
+ return; // Ignore duplicate
+ }
+
+ if (expected == 0 || received < expected) {
+ std::cout << msg.body() << std::endl;
+ received++;
+
+ if (received == expected) {
+ d.receiver().close();
+ sender.close();
+ d.connection().close();
+ } else {
+ // See if we can send any messages now
+ send(sender);
+ }
+ }
+ }
+
+ void send(proton::sender& s) {
+ // Only send with credit and only allow one outstanding message
+ while (s.credit() && sent < received+1) {
+ std::map<std::string, int> m;
+ m["sequence"] = sent + 1;
+
+ proton::message msg;
+ msg.id(sent + 1);
+ msg.body(m);
+
+ std::cout << "Sending: " << sent+1 << std::endl;
+ s.send(msg);
+ sent++;
+ }
+ }
+
+ void on_sender_open(proton::sender & s) OVERRIDE {
+ sender = s;
+ }
+
+ void on_sendable(proton::sender &s) OVERRIDE {
+ send(s);
+ }
+};
+
+int main(int argc, const char** argv) {
+ try {
+ if (argc < 4) {
+ std ::cerr <<
+ "Usage: " << argv[0] << " CONNECTION-URL AMQP-ADDRESS MESSAGE-COUNT FAILOVER-URL...\n"
+ "CONNECTION-URL: connection address, e.g.'amqp://127.0.0.1'\n"
+ "AMQP-ADDRESS: AMQP node address, e.g. 'examples'\n"
+ "MESSAGE-COUNT: number of messages to receive\n"
+ "FAILOVER_URL...: zero or more failover urls\n";
+ return 1;
+ }
+ const char *url = argv[1];
+ const char *address = argv[2];
+ int message_count = atoi(argv[3]);
+ std::vector<std::string> failovers(&argv[4], &argv[argc]);
+
+ reconnect_client client(url, address, message_count, failovers);
+ proton::container(client).run();
+
+ return 0;
+ } catch (const std::exception& e) {
+ std::cerr << e.what() << std::endl;
+ }
+
+ return 1;
+}
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/37136940/cpp/examples/scheduled_send.cpp
----------------------------------------------------------------------
diff --git a/cpp/examples/scheduled_send.cpp b/cpp/examples/scheduled_send.cpp
new file mode 100644
index 0000000..3244540
--- /dev/null
+++ b/cpp/examples/scheduled_send.cpp
@@ -0,0 +1,122 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "options.hpp"
+
+#include <proton/container.hpp>
+#include <proton/connection.hpp>
+#include <proton/message.hpp>
+#include <proton/messaging_handler.hpp>
+#include <proton/sender.hpp>
+#include <proton/tracker.hpp>
+#include <proton/work_queue.hpp>
+
+#include <iostream>
+
+#include "fake_cpp11.hpp"
+
+// Send messages at a constant rate one per interval. cancel after a timeout.
+class scheduled_sender : public proton::messaging_handler {
+ private:
+ std::string url;
+ proton::sender sender;
+ proton::duration interval, timeout;
+ proton::work_queue* work_queue;
+ bool ready, canceled;
+
+ public:
+
+ scheduled_sender(const std::string &s, double d, double t) :
+ url(s),
+ interval(int(d*proton::duration::SECOND.milliseconds())), // Send interval.
+ timeout(int(t*proton::duration::SECOND.milliseconds())), // Cancel after timeout.
+ work_queue(0),
+ ready(true), // Ready to send.
+ canceled(false) // Canceled.
+ {}
+
+ // The awkward looking double lambda is necessary because the scheduled lambdas run in the container context
+ // and must arrange lambdas for send and close to happen in the connection context.
+ void on_container_start(proton::container &c) OVERRIDE {
+ c.open_sender(url);
+ }
+
+ void on_sender_open(proton::sender &s) OVERRIDE {
+ sender = s;
+ work_queue = &s.work_queue();
+ // Call this->cancel after timeout.
+ s.container().schedule(timeout, [this]() { this->work_queue->add( [this]() { this->cancel(); }); });
+ // Start regular ticks every interval.
+ s.container().schedule(interval, [this]() { this->work_queue->add( [this]() { this->tick(); }); });
+ }
+
+ void cancel() {
+ canceled = true;
+ sender.connection().close();
+ }
+
+ void tick() {
+ // Schedule the next tick unless we have been cancelled.
+ if (!canceled)
+ sender.container().schedule(interval, [this]() { this->work_queue->add( [this]() { this->tick(); }); });
+ if (sender.credit() > 0) // Only send if we have credit
+ send();
+ else
+ ready = true; // Set the ready flag, send as soon as we get credit.
+ }
+
+ void on_sendable(proton::sender &) OVERRIDE {
+ if (ready) // We have been ticked since the last send.
+ send();
+ }
+
+ void send() {
+ std::cout << "send" << std::endl;
+ sender.send(proton::message("ping"));
+ ready = false;
+ }
+};
+
+
+int main(int argc, char **argv) {
+ std::string address("127.0.0.1:5672/examples");
+ double interval = 1.0;
+ double timeout = 5.0;
+
+ example::options opts(argc, argv);
+
+ opts.add_value(address, 'a', "address", "connect and send to URL", "URL");
+ opts.add_value(interval, 'i', "interval", "send a message every INTERVAL seconds", "INTERVAL");
+ opts.add_value(timeout, 't', "timeout", "stop after T seconds", "T");
+
+ try {
+ opts.parse();
+ scheduled_sender h(address, interval, timeout);
+ proton::container(h).run();
+ return 0;
+ } catch (const example::bad_option& e) {
+ std::cout << opts << std::endl << e.what() << std::endl;
+ } catch (const std::exception& e) {
+ std::cerr << e.what() << std::endl;
+ }
+
+ return 1;
+}
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/37136940/cpp/examples/scheduled_send_03.cpp
----------------------------------------------------------------------
diff --git a/cpp/examples/scheduled_send_03.cpp b/cpp/examples/scheduled_send_03.cpp
new file mode 100644
index 0000000..9050429
--- /dev/null
+++ b/cpp/examples/scheduled_send_03.cpp
@@ -0,0 +1,118 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "options.hpp"
+
+#include <proton/container.hpp>
+#include <proton/connection.hpp>
+#include <proton/duration.hpp>
+#include <proton/message.hpp>
+#include <proton/messaging_handler.hpp>
+#include <proton/sender.hpp>
+#include <proton/tracker.hpp>
+#include <proton/work_queue.hpp>
+
+#include <iostream>
+
+#include "fake_cpp11.hpp"
+
+// Send messages at a constant rate one per interval. cancel after a timeout.
+// This example uses only C++03 features.
+class scheduled_sender : public proton::messaging_handler {
+ private:
+ std::string url;
+ proton::duration interval, timeout;
+ proton::work_queue *work_queue;
+ bool ready, canceled;
+
+ public:
+ scheduled_sender(const std::string &s, double d, double t) :
+ url(s),
+ interval(int(d*proton::duration::SECOND.milliseconds())), // Send interval.
+ timeout(int(t*proton::duration::SECOND.milliseconds())), // Cancel after timeout.
+ work_queue(0),
+ ready(true), // Ready to send.
+ canceled(false) // Canceled.
+ {}
+
+ void on_container_start(proton::container &c) OVERRIDE {
+ c.open_sender(url);
+ }
+
+ void on_sender_open(proton::sender & s) OVERRIDE {
+ work_queue = &s.work_queue();
+
+ work_queue->schedule(timeout, make_work(&scheduled_sender::cancel, this, s));
+ work_queue->schedule(interval, make_work(&scheduled_sender::tick, this, s));
+ }
+
+ void cancel(proton::sender sender) {
+ canceled = true;
+ sender.connection().close();
+ }
+
+ void tick(proton::sender sender) {
+ if (!canceled) {
+ work_queue->schedule(interval, make_work(&scheduled_sender::tick, this, sender)); // Next tick
+ if (sender.credit() > 0) // Only send if we have credit
+ send(sender);
+ else
+ ready = true; // Set the ready flag, send as soon as we get credit.
+ }
+ }
+
+ void on_sendable(proton::sender &sender) OVERRIDE {
+ if (ready) // We have been ticked since the last send.
+ send(sender);
+ }
+
+ void send(proton::sender& sender) {
+ std::cout << "send" << std::endl;
+ sender.send(proton::message("ping"));
+ ready = false;
+ }
+};
+
+
+int main(int argc, char **argv) {
+ std::string address("127.0.0.1:5672/examples");
+ double interval = 1.0;
+ double timeout = 5.0;
+
+ example::options opts(argc, argv);
+
+ opts.add_value(address, 'a', "address", "connect and send to URL", "URL");
+ opts.add_value(interval, 'i', "interval", "send a message every INTERVAL seconds", "INTERVAL");
+ opts.add_value(timeout, 't', "timeout", "stop after T seconds", "T");
+
+ try {
+ opts.parse();
+ scheduled_sender h(address, interval, timeout);
+ proton::container(h).run();
+ return 0;
+ } catch (const example::bad_option& e) {
+ std::cout << opts << std::endl << e.what() << std::endl;
+ } catch (const std::exception& e) {
+ std::cerr << e.what() << std::endl;
+ }
+
+ return 1;
+}
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/37136940/cpp/examples/selected_recv.cpp
----------------------------------------------------------------------
diff --git a/cpp/examples/selected_recv.cpp b/cpp/examples/selected_recv.cpp
new file mode 100644
index 0000000..a7f9cea
--- /dev/null
+++ b/cpp/examples/selected_recv.cpp
@@ -0,0 +1,90 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include <proton/connection.hpp>
+#include <proton/container.hpp>
+#include <proton/message.hpp>
+#include <proton/messaging_handler.hpp>
+#include <proton/receiver_options.hpp>
+#include <proton/source_options.hpp>
+
+#include <iostream>
+
+#include "fake_cpp11.hpp"
+
+namespace {
+
+ // Example custom function to configure an AMQP filter,
+ // specifically an APACHE.ORG:SELECTOR
+ // (http://www.amqp.org/specification/1.0/filters)
+
+ void set_filter(proton::source_options &opts, const std::string& selector_str) {
+ proton::source::filter_map map;
+ proton::symbol filter_key("selector");
+ proton::value filter_value;
+ // The value is a specific AMQP "described type": binary string with symbolic descriptor
+ proton::codec::encoder enc(filter_value);
+ enc << proton::codec::start::described()
+ << proton::symbol("apache.org:selector-filter:string")
+ << selector_str
+ << proton::codec::finish();
+ // In our case, the map has this one element
+ map.put(filter_key, filter_value);
+ opts.filters(map);
+ }
+}
+
+
+class selected_recv : public proton::messaging_handler {
+ std::string conn_url_;
+ std::string addr_;
+
+ public:
+ selected_recv(const std::string& u, const std::string& a) :
+ conn_url_(u), addr_(a) {}
+
+ void on_container_start(proton::container &c) OVERRIDE {
+ proton::source_options opts;
+ set_filter(opts, "colour = 'green'");
+ proton::connection conn = c.connect(conn_url_);
+ conn.open_receiver(addr_, proton::receiver_options().source(opts));
+ }
+
+ void on_message(proton::delivery &, proton::message &m) OVERRIDE {
+ std::cout << m.body() << std::endl;
+ }
+};
+
+int main(int argc, char **argv) {
+ try {
+ std::string conn_url = argc > 1 ? argv[1] : "//127.0.0.1:5672";
+ std::string addr = argc > 2 ? argv[2] : "examples";
+
+ selected_recv recv(conn_url, addr);
+ proton::container(recv).run();
+
+ return 0;
+ } catch (const std::exception& e) {
+ std::cerr << e.what() << std::endl;
+ }
+
+ return 1;
+}
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/37136940/cpp/examples/server.cpp
----------------------------------------------------------------------
diff --git a/cpp/examples/server.cpp b/cpp/examples/server.cpp
new file mode 100644
index 0000000..8e177df
--- /dev/null
+++ b/cpp/examples/server.cpp
@@ -0,0 +1,95 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include <proton/connection.hpp>
+#include <proton/container.hpp>
+#include <proton/message.hpp>
+#include <proton/message_id.hpp>
+#include <proton/messaging_handler.hpp>
+
+#include <iostream>
+#include <map>
+#include <string>
+#include <cctype>
+
+#include "fake_cpp11.hpp"
+
+class server : public proton::messaging_handler {
+ std::string conn_url_;
+ std::string addr_;
+ proton::connection conn_;
+ std::map<std::string, proton::sender> senders_;
+
+ public:
+ server(const std::string& u, const std::string& a) :
+ conn_url_(u), addr_(a) {}
+
+ void on_container_start(proton::container& c) OVERRIDE {
+ conn_ = c.connect(conn_url_);
+ conn_.open_receiver(addr_);
+
+ std::cout << "Server connected to " << conn_url_ << std::endl;
+ }
+
+ std::string to_upper(const std::string& s) {
+ std::string uc(s);
+ size_t l = uc.size();
+
+ for (size_t i=0; i<l; i++) {
+ uc[i] = static_cast<char>(std::toupper(uc[i]));
+ }
+
+ return uc;
+ }
+
+ void on_message(proton::delivery&, proton::message& m) OVERRIDE {
+ std::cout << "Received " << m.body() << std::endl;
+
+ std::string reply_to = m.reply_to();
+ proton::message reply;
+
+ reply.to(reply_to);
+ reply.body(to_upper(proton::get<std::string>(m.body())));
+ reply.correlation_id(m.correlation_id());
+
+ if (!senders_[reply_to]) {
+ senders_[reply_to] = conn_.open_sender(reply_to);
+ }
+
+ senders_[reply_to].send(reply);
+ }
+};
+
+int main(int argc, char** argv) {
+ try {
+ std::string conn_url = argc > 1 ? argv[1] : "//127.0.0.1:5672";
+ std::string addr = argc > 2 ? argv[2] : "examples";
+
+ server srv(conn_url, addr);
+ proton::container(srv).run();
+
+ return 0;
+ } catch (const std::exception& e) {
+ std::cerr << e.what() << std::endl;
+ }
+
+ return 1;
+}
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/37136940/cpp/examples/server_direct.cpp
----------------------------------------------------------------------
diff --git a/cpp/examples/server_direct.cpp b/cpp/examples/server_direct.cpp
new file mode 100644
index 0000000..d46fc29
--- /dev/null
+++ b/cpp/examples/server_direct.cpp
@@ -0,0 +1,130 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "options.hpp"
+
+#include <proton/container.hpp>
+#include <proton/listen_handler.hpp>
+#include <proton/listener.hpp>
+#include <proton/message.hpp>
+#include <proton/message_id.hpp>
+#include <proton/messaging_handler.hpp>
+#include <proton/sender.hpp>
+#include <proton/sender_options.hpp>
+#include <proton/source_options.hpp>
+#include <proton/tracker.hpp>
+
+#include <iostream>
+#include <map>
+#include <string>
+#include <sstream>
+#include <cctype>
+
+#include "fake_cpp11.hpp"
+
+class server : public proton::messaging_handler {
+ private:
+ class listener_ready_handler : public proton::listen_handler {
+ void on_open(proton::listener& l) OVERRIDE {
+ std::cout << "listening on " << l.port() << std::endl;
+ }
+ };
+
+ typedef std::map<std::string, proton::sender> sender_map;
+ listener_ready_handler listen_handler;
+ std::string url;
+ sender_map senders;
+ int address_counter;
+
+ public:
+ server(const std::string &u) : url(u), address_counter(0) {}
+
+ void on_container_start(proton::container &c) OVERRIDE {
+ c.listen(url, listen_handler);
+ }
+
+ std::string to_upper(const std::string &s) {
+ std::string uc(s);
+ size_t l = uc.size();
+
+ for (size_t i=0; i<l; i++)
+ uc[i] = static_cast<char>(std::toupper(uc[i]));
+
+ return uc;
+ }
+
+ std::string generate_address() {
+ std::ostringstream addr;
+ addr << "server" << address_counter++;
+
+ return addr.str();
+ }
+
+ void on_sender_open(proton::sender &sender) OVERRIDE {
+ if (sender.source().dynamic()) {
+ std::string addr = generate_address();
+ sender.open(proton::sender_options().source(proton::source_options().address(addr)));
+ senders[addr] = sender;
+ }
+ }
+
+ void on_message(proton::delivery &, proton::message &m) OVERRIDE {
+ std::cout << "Received " << m.body() << std::endl;
+
+ std::string reply_to = m.reply_to();
+ sender_map::iterator it = senders.find(reply_to);
+
+ if (it == senders.end()) {
+ std::cout << "No link for reply_to: " << reply_to << std::endl;
+ } else {
+ proton::sender sender = it->second;
+ proton::message reply;
+
+ reply.to(reply_to);
+ reply.body(to_upper(proton::get<std::string>(m.body())));
+ reply.correlation_id(m.correlation_id());
+
+ sender.send(reply);
+ }
+ }
+};
+
+int main(int argc, char **argv) {
+ std::string address("amqp://127.0.0.1:5672/examples");
+ example::options opts(argc, argv);
+
+ opts.add_value(address, 'a', "address", "listen on URL", "URL");
+
+ try {
+ opts.parse();
+
+ server srv(address);
+ proton::container(srv).run();
+
+ return 0;
+ } catch (const example::bad_option& e) {
+ std::cout << opts << std::endl << e.what() << std::endl;
+ } catch (const std::exception& e) {
+ std::cerr << e.what() << std::endl;
+ }
+
+ return 1;
+}
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/37136940/cpp/examples/service_bus.cpp
----------------------------------------------------------------------
diff --git a/cpp/examples/service_bus.cpp b/cpp/examples/service_bus.cpp
new file mode 100644
index 0000000..c99bca6
--- /dev/null
+++ b/cpp/examples/service_bus.cpp
@@ -0,0 +1,322 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+/*
+ * Service Bus example.
+ *
+ * This is an example of using "Service Bus sessions" (not the same thing as an
+ * AMQP session) to selectively retrieve messages from a queue. The queue must
+ * be configured within Service Bus to support sessions. Service Bus uses the
+ * AMQP group_id message property to associate messages with a particular
+ * Service Bus session. It uses AMQP filters to specify which session is
+ * associated with a receiver.
+ *
+ * The mechanics for sending and receiving to other types of service bus queue
+ * are broadly the same, as long as the step using the
+ * receiver.source().filters() is omitted.
+ *
+ * Other Service Bus notes: There is no drain support, hence the need to to use
+ * timeouts in this example to detect the end of the message stream. There is
+ * no browse support when setting the AMQP link distribution mode to COPY.
+ * Service Bus claims to support browsing, but it is unclear how to manage that
+ * with an AMQP client. Maximum message sizes (for body and headers) vary
+ * between queue types and fee tier ranging from 64KB to 1MB. Due to the
+ * distributed nature of Service Bus, queues do not automatically preserve FIFO
+ * order of messages unless the user takes steps to force the message stream to
+ * a single partition of the queue or creates the queue with partitioning disabled.
+ *
+ * This example shows use of the simpler SAS (Shared Access Signature)
+ * authentication scheme where the credentials are supplied on the connection.
+ * Service Bus does not actually check these credentials when setting up the
+ * connection, it merely caches the SAS key and policy (AKA key name) for later
+ * access authorization when creating senders and receivers. There is a second
+ * authentication scheme that allows for multiple tokens and even updating them
+ * within a long-lived connection which uses special management request-response
+ * queues in Service Bus. The format of this exchange may be documented
+ * somewhere but is also available by working through the CbsAsyncExample.cs
+ * program in the Amqp.Net Lite project.
+ *
+ * The sample output for this program is:
+
+ sent message: message 0 in service bus session "red"
+ sent message: message 1 in service bus session "green"
+ sent message: message 2 in service bus session "blue"
+ sent message: message 3 in service bus session "red"
+ sent message: message 4 in service bus session "black"
+ sent message: message 5 in service bus session "blue"
+ sent message: message 6 in service bus session "yellow"
+receiving messages with session identifier "green" from queue ses_q1
+ received message: message 1 in service bus session "green"
+receiving messages with session identifier "red" from queue ses_q1
+ received message: message 0 in service bus session "red"
+ received message: message 3 in service bus session "red"
+receiving messages with session identifier "blue" from queue ses_q1
+ received message: message 2 in service bus session "blue"
+ received message: message 5 in service bus session "blue"
+receiving messages with session identifier "black" from queue ses_q1
+ received message: message 4 in service bus session "black"
+receiving messages with session identifier "yellow" from queue ses_q1
+ received message: message 6 in service bus session "yellow"
+Done. No more messages.
+
+ *
+ */
+
+#include "options.hpp"
+
+#include <proton/connection.hpp>
+#include <proton/connection_options.hpp>
+#include <proton/container.hpp>
+#include <proton/delivery.hpp>
+#include <proton/message.hpp>
+#include <proton/messaging_handler.hpp>
+#include <proton/receiver_options.hpp>
+#include <proton/sender.hpp>
+#include <proton/sender_options.hpp>
+#include <proton/source_options.hpp>
+#include <proton/tracker.hpp>
+#include <proton/work_queue.hpp>
+
+#include <iostream>
+#include <sstream>
+
+#include "fake_cpp11.hpp"
+
+using proton::source_options;
+using proton::connection_options;
+using proton::sender_options;
+using proton::receiver_options;
+
+void do_next_sequence();
+
+namespace {
+void check_arg(const std::string &value, const std::string &name) {
+ if (value.empty())
+ throw std::runtime_error("missing argument for \"" + name + "\"");
+}
+}
+
+/// Connect to Service Bus queue and retrieve messages in a particular session.
+class session_receiver : public proton::messaging_handler {
+ private:
+ const std::string &connection_url;
+ const std::string &entity;
+ proton::value session_identifier; // AMQP null type by default, matches any Service Bus sequence identifier
+ int message_count;
+ bool closed;
+ proton::duration read_timeout;
+ proton::timestamp last_read;
+ proton::container *container;
+ proton::receiver receiver;
+
+ public:
+ session_receiver(const std::string &c, const std::string &e,
+ const char *sid) : connection_url(c), entity(e), message_count(0), closed(false), read_timeout(5000), last_read(0), container(0) {
+ if (sid)
+ session_identifier = std::string(sid);
+ // session_identifier is now either empty/null or an AMQP string type.
+ // If null, Service Bus will pick the first available message and create
+ // a filter at its end with that message's session identifier.
+ // Technically, an AMQP string is not a valid filter-set value unless it
+ // is annotated as an AMQP described type, so this may change.
+
+ }
+
+ void run (proton::container &c) {
+ message_count = 0;
+ closed = false;
+ c.connect(connection_url, connection_options().handler(*this));
+ container = &c;
+ }
+
+ void on_connection_open(proton::connection &connection) OVERRIDE {
+ proton::source::filter_map sb_filter_map;
+ proton::symbol key("com.microsoft:session-filter");
+ sb_filter_map.put(key, session_identifier);
+ receiver = connection.open_receiver(entity, receiver_options().source(source_options().filters(sb_filter_map)));
+
+ // Start timeout processing here. If Service Bus has no pending
+ // messages, it may defer completing the receiver open until a message
+ // becomes available (e.g. to be able to set the actual session
+ // identifier if none was specified).
+ last_read = proton::timestamp::now();
+ // Call this->process_timeout after read_timeout.
+ container->schedule(read_timeout, [this]() { this->process_timeout(); });
+ }
+
+ void on_receiver_open(proton::receiver &r) OVERRIDE {
+ if (closed) return; // PROTON-1264
+ proton::value actual_session_id = r.source().filters().get("com.microsoft:session-filter");
+ std::cout << "receiving messages with session identifier \"" << actual_session_id
+ << "\" from queue " << entity << std::endl;
+ last_read = proton::timestamp::now();
+ }
+
+ void on_message(proton::delivery &, proton::message &m) OVERRIDE {
+ message_count++;
+ std::cout << " received message: " << m.body() << std::endl;
+ last_read = proton::timestamp::now();
+ }
+
+ void process_timeout() {
+ proton::timestamp deadline = last_read + read_timeout;
+ proton::timestamp now = proton::timestamp::now();
+ if (now >= deadline) {
+ receiver.close();
+ closed = true;
+ receiver.connection().close();
+ if (message_count)
+ do_next_sequence();
+ else
+ std::cout << "Done. No more messages." << std::endl;
+ } else {
+ proton::duration next = deadline - now;
+ container->schedule(next, [this]() { this->process_timeout(); });
+ }
+ }
+};
+
+
+/// Connect to Service Bus queue and send messages divided into different sessions.
+class session_sender : public proton::messaging_handler {
+ private:
+ const std::string &connection_url;
+ const std::string &entity;
+ int msg_count;
+ int total;
+ int accepts;
+
+ public:
+ session_sender(const std::string &c, const std::string &e) : connection_url(c), entity(e),
+ msg_count(0), total(7), accepts(0) {}
+
+ void run(proton::container &c) {
+ c.open_sender(connection_url + "/" + entity, sender_options(), connection_options().handler(*this));
+ }
+
+ void send_remaining_messages(proton::sender &s) {
+ std::string gid;
+ for (; msg_count < total && s.credit() > 0; msg_count++) {
+ switch (msg_count) {
+ case 0: gid = "red"; break;
+ case 1: gid = "green"; break;
+ case 2: gid = "blue"; break;
+ case 3: gid = "red"; break;
+ case 4: gid = "black"; break;
+ case 5: gid = "blue"; break;
+ case 6: gid = "yellow"; break;
+ }
+
+ std::ostringstream mbody;
+ mbody << "message " << msg_count << " in service bus session \"" << gid << "\"";
+ proton::message m(mbody.str());
+ m.group_id(gid); // Service Bus uses the group_id property to as the session identifier.
+ s.send(m);
+ std::cout << " sent message: " << m.body() << std::endl;
+ }
+ }
+
+ void on_sendable(proton::sender &s) OVERRIDE {
+ send_remaining_messages(s);
+ }
+
+ void on_tracker_accept(proton::tracker &t) OVERRIDE {
+ accepts++;
+ if (accepts == total) {
+ // upload complete
+ t.sender().close();
+ t.sender().connection().close();
+ do_next_sequence();
+ }
+ }
+};
+
+
+/// Orchestrate the sequential actions of sending and receiving session-based messages.
+class sequence : public proton::messaging_handler {
+ private:
+ proton::container *container;
+ int sequence_no;
+ session_sender snd;
+ session_receiver rcv_red, rcv_green, rcv_null;
+
+ public:
+ static sequence *the_sequence;
+
+ sequence (const std::string &c, const std::string &e) :
+ container(0), sequence_no(0),
+ snd(c, e), rcv_red(c, e, "red"), rcv_green(c, e, "green"), rcv_null(c, e, NULL) {
+ the_sequence = this;
+ }
+
+ void on_container_start(proton::container &c) OVERRIDE {
+ container = &c;
+ next_sequence();
+ }
+
+ void next_sequence() {
+ switch (sequence_no++) {
+ // run these in order exactly once
+ case 0: snd.run(*container); break;
+ case 1: rcv_green.run(*container); break;
+ case 2: rcv_red.run(*container); break;
+ // Run this until the receiver decides there is no messages left to sequence through
+ default: rcv_null.run(*container); break;
+ }
+ }
+};
+
+sequence *sequence::the_sequence = NULL;
+
+void do_next_sequence() { sequence::the_sequence->next_sequence(); }
+
+
+int main(int argc, char **argv) {
+ std::string sb_namespace; // i.e. "foo.servicebus.windows.net"
+ // Make sure the next two are urlencoded for Proton
+ std::string sb_key_name; // shared access key name for entity (AKA "Policy Name")
+ std::string sb_key; // shared access key
+ std::string sb_entity; // AKA the service bus queue. Must enable
+ // sessions on it for this example.
+
+ example::options opts(argc, argv);
+ opts.add_value(sb_namespace, 'n', "namespace", "Service Bus full namespace", "NAMESPACE");
+ opts.add_value(sb_key_name, 'p', "policy", "policy name that specifies access rights (key name)", "POLICY");
+ opts.add_value(sb_key, 'k', "key", "secret key for the policy", "key");
+ opts.add_value(sb_entity, 'e', "entity", "entity path (queue name)", "ENTITY");
+
+ try {
+ opts.parse();
+ check_arg(sb_namespace, "namespace");
+ check_arg(sb_key_name, "policy");
+ check_arg(sb_key, "key");
+ check_arg(sb_entity, "entity");
+ std::string connection_string("amqps://" + sb_key_name + ":" + sb_key + "@" + sb_namespace);
+
+ sequence seq(connection_string, sb_entity);
+ proton::container(seq).run();
+ return 0;
+ } catch (const std::exception& e) {
+ std::cerr << e.what() << std::endl;
+ }
+
+ return 1;
+}
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/37136940/cpp/examples/simple_connect.cpp
----------------------------------------------------------------------
diff --git a/cpp/examples/simple_connect.cpp b/cpp/examples/simple_connect.cpp
new file mode 100644
index 0000000..74a8c87
--- /dev/null
+++ b/cpp/examples/simple_connect.cpp
@@ -0,0 +1,111 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "options.hpp"
+
+#include <proton/connection.hpp>
+#include <proton/connection_options.hpp>
+#include <proton/container.hpp>
+#include <proton/messaging_handler.hpp>
+#include <proton/reconnect_options.hpp>
+
+#include <iostream>
+
+#include "fake_cpp11.hpp"
+
+class simple_connect : public proton::messaging_handler {
+ private:
+ std::string url;
+ std::string user;
+ std::string password;
+ bool reconnect;
+ bool sasl;
+ std::string mechs;
+ bool insecure;
+ proton::connection connection;
+
+ public:
+ simple_connect(const std::string &a, const std::string &u, const std::string &p,
+ bool r, bool s, const std::string& ms, bool in) :
+ url(a), user(u), password(p),
+ reconnect(r), sasl(s), mechs(ms), insecure(in) {}
+
+ void on_container_start(proton::container &c) OVERRIDE {
+ proton::connection_options co;
+ if (!user.empty()) co.user(user);
+ if (!password.empty()) co.password(password);
+ if (reconnect) co.reconnect(proton::reconnect_options());
+ if (sasl) co.sasl_enabled(true);
+ //
+ // NB: We only set sasl options if they are not default to avoid
+ // forcing SASL negotiation on when it's not needed.
+ //
+ // This is because the SASL negotiation is turned off unless
+ // it is needed. Setting a username/password or any SASL option will
+ // force the SASL negotiation to be turned on.
+ //
+ if (!mechs.empty()) co.sasl_allowed_mechs(mechs);
+ if (insecure) co.sasl_allow_insecure_mechs(true);
+ connection = c.connect(url, co);
+ }
+
+ void on_connection_open(proton::connection &c) OVERRIDE {
+ c.close();
+ }
+
+ void on_error(const proton::error_condition& e) OVERRIDE {
+ throw std::runtime_error(e.what());
+ }
+};
+
+int main(int argc, char **argv) {
+ std::string address("127.0.0.1:5672/examples");
+ std::string user;
+ std::string password;
+ bool reconnect = false;
+ bool sasl = false;
+ std::string mechs;
+ bool insecure = false;
+ example::options opts(argc, argv);
+
+ opts.add_value(address, 'a', "address", "connect and send to URL", "URL");
+ opts.add_value(user, 'u', "user", "authenticate as USER", "USER");
+ opts.add_value(password, 'p', "password", "authenticate with PASSWORD", "PASSWORD");
+ opts.add_flag(reconnect, 'r', "reconnect", "reconnect on connection failure");
+ opts.add_flag(sasl,'s', "sasl", "force SASL authentication with no user specified (Use for Kerberos/GSSAPI)");
+ opts.add_value(mechs, 'm', "mechs", "allowed SASL mechanisms", "MECHS");
+ opts.add_flag(insecure, 'i', "insecure", "allow clear-text passwords");
+
+ try {
+ opts.parse();
+
+ simple_connect connect(address, user, password, reconnect, sasl, mechs, insecure);
+ proton::container(connect).run();
+
+ return 0;
+ } catch (const example::bad_option& e) {
+ std::cout << opts << std::endl << e.what() << std::endl;
+ } catch (const std::exception& e) {
+ std::cerr << e.what() << std::endl;
+ }
+
+ return 1;
+}
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/37136940/cpp/examples/simple_recv.cpp
----------------------------------------------------------------------
diff --git a/cpp/examples/simple_recv.cpp b/cpp/examples/simple_recv.cpp
new file mode 100644
index 0000000..5a7cde4
--- /dev/null
+++ b/cpp/examples/simple_recv.cpp
@@ -0,0 +1,103 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "options.hpp"
+
+#include <proton/connection.hpp>
+#include <proton/connection_options.hpp>
+#include <proton/container.hpp>
+#include <proton/delivery.hpp>
+#include <proton/link.hpp>
+#include <proton/message.hpp>
+#include <proton/message_id.hpp>
+#include <proton/messaging_handler.hpp>
+#include <proton/value.hpp>
+
+#include <iostream>
+#include <map>
+
+#include "fake_cpp11.hpp"
+
+class simple_recv : public proton::messaging_handler {
+ private:
+ std::string url;
+ std::string user;
+ std::string password;
+ proton::receiver receiver;
+ int expected;
+ int received;
+
+ public:
+ simple_recv(const std::string &s, const std::string &u, const std::string &p, int c) :
+ url(s), user(u), password(p), expected(c), received(0) {}
+
+ void on_container_start(proton::container &c) OVERRIDE {
+ proton::connection_options co;
+ if (!user.empty()) co.user(user);
+ if (!password.empty()) co.password(password);
+ receiver = c.open_receiver(url, co);
+ }
+
+ void on_message(proton::delivery &d, proton::message &msg) OVERRIDE {
+ if (proton::coerce<int>(msg.id()) < received) {
+ return; // Ignore duplicate
+ }
+
+ if (expected == 0 || received < expected) {
+ std::cout << msg.body() << std::endl;
+ received++;
+
+ if (received == expected) {
+ d.receiver().close();
+ d.connection().close();
+ }
+ }
+ }
+};
+
+int main(int argc, char **argv) {
+ std::string address("127.0.0.1:5672/examples");
+ std::string user;
+ std::string password;
+ int message_count = 100;
+ example::options opts(argc, argv);
+
+ opts.add_value(address, 'a', "address", "connect to and receive from URL", "URL");
+ opts.add_value(message_count, 'm', "messages", "receive COUNT messages", "COUNT");
+ opts.add_value(user, 'u', "user", "authenticate as USER", "USER");
+ opts.add_value(password, 'p', "password", "authenticate with PASSWORD", "PASSWORD");
+
+
+ try {
+ opts.parse();
+
+ simple_recv recv(address, user, password, message_count);
+ proton::container(recv).run();
+
+ return 0;
+ } catch (const example::bad_option& e) {
+ std::cout << opts << std::endl << e.what() << std::endl;
+ } catch (const std::exception& e) {
+ std::cerr << e.what() << std::endl;
+ }
+
+ return 1;
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org