You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by as...@apache.org on 2016/03/24 15:17:49 UTC

[2/5] qpid-proton git commit: PROTON-1164: [C++ binding] Update handlers to split link events into receiver and sender events

PROTON-1164: [C++ binding] Update handlers to split link events into receiver and sender events


Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/9bdea3b6
Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/9bdea3b6
Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/9bdea3b6

Branch: refs/heads/master
Commit: 9bdea3b652b05e6cc01e60c0681411d2ab10a9b4
Parents: 3d52220
Author: Andrew Stitcher <as...@apache.org>
Authored: Tue Mar 22 14:42:59 2016 -0400
Committer: Andrew Stitcher <as...@apache.org>
Committed: Thu Mar 24 10:12:21 2016 -0400

----------------------------------------------------------------------
 examples/cpp/broker.hpp                         | 38 +++++++++-----------
 examples/cpp/client.cpp                         |  6 ++--
 examples/cpp/engine/client.cpp                  |  5 ++-
 examples/cpp/server_direct.cpp                  |  8 ++---
 .../bindings/cpp/include/proton/handler.hpp     | 13 +++++--
 proton-c/bindings/cpp/src/engine_test.cpp       |  6 +++-
 proton-c/bindings/cpp/src/handler.cpp           |  9 +++--
 proton-c/bindings/cpp/src/messaging_adapter.cpp | 36 +++++++++++++------
 proton-c/bindings/cpp/src/messaging_adapter.hpp |  3 +-
 9 files changed, 72 insertions(+), 52 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/9bdea3b6/examples/cpp/broker.hpp
----------------------------------------------------------------------
diff --git a/examples/cpp/broker.hpp b/examples/cpp/broker.hpp
index 3caffbe..ba9040c 100644
--- a/examples/cpp/broker.hpp
+++ b/examples/cpp/broker.hpp
@@ -154,23 +154,21 @@ class broker_handler : public proton::handler {
   public:
     broker_handler(queues& qs) : queues_(qs) {}
 
-    void on_link_open(proton::event &e, proton::link &lnk) override {
-
-        if (!!lnk.sender()) {
-            proton::terminus remote_source(lnk.remote_source());
-            queue &q = remote_source.dynamic() ?
-                queues_.dynamic() : queues_.get(remote_source.address());
-            lnk.local_source().address(q.name());
-
-            q.subscribe(lnk.sender());
-            std::cout << "broker outgoing link from " << q.name() << std::endl;
-        } else {
-            // Receiver
-            std::string address = lnk.remote_target().address();
-            if (!address.empty()) {
-                lnk.local_target().address(address);
-                std::cout << "broker incoming link to " << address << std::endl;
-            }
+    void on_sender_open(proton::event &e, proton::sender &sender) override {
+        proton::terminus remote_source(sender.remote_source());
+        queue &q = remote_source.dynamic() ?
+            queues_.dynamic() : queues_.get(remote_source.address());
+        sender.local_source().address(q.name());
+
+        q.subscribe(sender);
+        std::cout << "broker outgoing link from " << q.name() << std::endl;
+    }
+
+    void on_receiver_open(proton::event &e, proton::receiver &receiver) override {
+        std::string address = receiver.remote_target().address();
+        if (!address.empty()) {
+            receiver.local_target().address(address);
+            std::cout << "broker incoming link to " << address << std::endl;
         }
     }
 
@@ -182,10 +180,8 @@ class broker_handler : public proton::handler {
         }
     }
 
-    void on_link_close(proton::event &e, proton::link &lnk) override {
-        if (!!lnk.sender()) {
-            unsubscribe(lnk.sender());
-        }
+    void on_sender_close(proton::event &e, proton::sender &sender) override {
+        unsubscribe(sender);
     }
 
     void on_connection_close(proton::event &e, proton::connection &c) override {

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/9bdea3b6/examples/cpp/client.cpp
----------------------------------------------------------------------
diff --git a/examples/cpp/client.cpp b/examples/cpp/client.cpp
index 3a55efe..d026c51 100644
--- a/examples/cpp/client.cpp
+++ b/examples/cpp/client.cpp
@@ -54,10 +54,8 @@ class client : public proton::handler {
         sender.send(req);
     }
 
-    void on_link_open(proton::event &e, proton::link &l) override {
-        if (l == receiver) {
-            send_request();
-        }
+    void on_receiver_open(proton::event &e, proton::receiver &) override {
+        send_request();
     }
 
     void on_message(proton::event &e, proton::message &response) override {

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/9bdea3b6/examples/cpp/engine/client.cpp
----------------------------------------------------------------------
diff --git a/examples/cpp/engine/client.cpp b/examples/cpp/engine/client.cpp
index f1066fa..2b22288 100644
--- a/examples/cpp/engine/client.cpp
+++ b/examples/cpp/engine/client.cpp
@@ -53,9 +53,8 @@ class client : public proton::handler {
         sender.send(req);
     }
 
-    void on_link_open(proton::event &e, proton::link &l) override {
-        if (l == receiver)
-            send_request();
+    void on_receiver_open(proton::event &e, proton::receiver &) override {
+        send_request();
     }
 
     void on_message(proton::event &e, proton::message &response) override {

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/9bdea3b6/examples/cpp/server_direct.cpp
----------------------------------------------------------------------
diff --git a/examples/cpp/server_direct.cpp b/examples/cpp/server_direct.cpp
index 5ce2d21..a99adc0 100644
--- a/examples/cpp/server_direct.cpp
+++ b/examples/cpp/server_direct.cpp
@@ -66,10 +66,10 @@ class server : public proton::handler {
         return addr.str();
     }
 
-    void on_link_open(proton::event& e, proton::link &link) override {
-        if (!!link.sender() && link.remote_source().dynamic()) {
-            link.local_source().address(generate_address());
-            senders[link.local_source().address()] = link.sender();
+    void on_sender_open(proton::event& e, proton::sender &sender) override {
+        if (sender.remote_source().dynamic()) {
+            sender.local_source().address(generate_address());
+            senders[sender.local_source().address()] = sender;
         }
     }
 

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/9bdea3b6/proton-c/bindings/cpp/include/proton/handler.hpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/include/proton/handler.hpp b/proton-c/bindings/cpp/include/proton/handler.hpp
index 4d00f93..aaed559 100644
--- a/proton-c/bindings/cpp/include/proton/handler.hpp
+++ b/proton-c/bindings/cpp/include/proton/handler.hpp
@@ -104,11 +104,18 @@ PN_CPP_CLASS_EXTERN handler
     PN_CPP_EXTERN virtual void on_session_error(event &e, session &s);
 
     /// The remote peer opened the link.
-    PN_CPP_EXTERN virtual void on_link_open(event &e, link& l);
+    PN_CPP_EXTERN virtual void on_receiver_open(event &e, receiver& l);
     /// The remote peer closed the link.
-    PN_CPP_EXTERN virtual void on_link_close(event &e, link& l);
+    PN_CPP_EXTERN virtual void on_receiver_close(event &e, receiver& l);
     /// The remote peer closed the link with an error condition.
-    PN_CPP_EXTERN virtual void on_link_error(event &e, link& l);
+    PN_CPP_EXTERN virtual void on_receiver_error(event &e, receiver& l);
+
+    /// The remote peer opened the link.
+    PN_CPP_EXTERN virtual void on_sender_open(event &e, sender& l);
+    /// The remote peer closed the link.
+    PN_CPP_EXTERN virtual void on_sender_close(event &e, sender& l);
+    /// The remote peer closed the link with an error condition.
+    PN_CPP_EXTERN virtual void on_sender_error(event &e, sender& l);
 
     /// The remote peer accepted an outgoing message.
     PN_CPP_EXTERN virtual void on_delivery_accept(event &e, delivery &d);

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/9bdea3b6/proton-c/bindings/cpp/src/engine_test.cpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/src/engine_test.cpp b/proton-c/bindings/cpp/src/engine_test.cpp
index 1b32fb0..ea1be2f 100644
--- a/proton-c/bindings/cpp/src/engine_test.cpp
+++ b/proton-c/bindings/cpp/src/engine_test.cpp
@@ -100,7 +100,11 @@ struct record_handler : public handler {
     std::deque<proton::session> sessions;
     std::deque<std::string> errors;
 
-    void on_link_open(event& e, link &l) override {
+    void on_receiver_open(event& e, receiver &l) override {
+        links.push_back(l);
+    }
+
+    void on_sender_open(event& e, sender &l) override {
         links.push_back(l);
     }
 

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/9bdea3b6/proton-c/bindings/cpp/src/handler.cpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/src/handler.cpp b/proton-c/bindings/cpp/src/handler.cpp
index a6ee246..87a9075 100644
--- a/proton-c/bindings/cpp/src/handler.cpp
+++ b/proton-c/bindings/cpp/src/handler.cpp
@@ -48,9 +48,12 @@ void handler::on_connection_open(event &e, connection &) { on_unhandled(e); }
 void handler::on_session_close(event &e, session &) { on_unhandled(e); }
 void handler::on_session_error(event &e, session &s) { on_unhandled_error(e, s.remote_condition()); }
 void handler::on_session_open(event &e, session &) { on_unhandled(e); }
-void handler::on_link_close(event &e, link &) { on_unhandled(e); }
-void handler::on_link_error(event &e, link &l) { on_unhandled_error(e, l.remote_condition()); }
-void handler::on_link_open(event &e, link &) { on_unhandled(e); }
+void handler::on_receiver_close(event &e, receiver &) { on_unhandled(e); }
+void handler::on_receiver_error(event &e, receiver &l) { on_unhandled_error(e, l.remote_condition()); }
+void handler::on_receiver_open(event &e, receiver &) { on_unhandled(e); }
+void handler::on_sender_close(event &e, sender &) { on_unhandled(e); }
+void handler::on_sender_error(event &e, sender &l) { on_unhandled_error(e, l.remote_condition()); }
+void handler::on_sender_open(event &e, sender &) { on_unhandled(e); }
 void handler::on_delivery_accept(event &e, delivery &) { on_unhandled(e); }
 void handler::on_delivery_reject(event &e, delivery &) { on_unhandled(e); }
 void handler::on_delivery_release(event &e, delivery &) { on_unhandled(e); }

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/9bdea3b6/proton-c/bindings/cpp/src/messaging_adapter.cpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/src/messaging_adapter.cpp b/proton-c/bindings/cpp/src/messaging_adapter.cpp
index a0cf9ab..f280330 100644
--- a/proton-c/bindings/cpp/src/messaging_adapter.cpp
+++ b/proton-c/bindings/cpp/src/messaging_adapter.cpp
@@ -147,13 +147,21 @@ bool is_local_unititialised(pn_state_t state) {
 void messaging_adapter::on_link_remote_close(proton_event &pe) {
     pn_event_t *cevent = pe.pn_event();
     pn_link_t *lnk = pn_event_link(cevent);
-    link l(lnk);
-    if (pn_condition_is_set(pn_link_remote_condition(lnk))) {
-        messaging_event mevent(messaging_event::LINK_ERROR, pe);
-        delegate_.on_link_error(mevent, l);
+    messaging_event clevent(messaging_event::LINK_CLOSE, pe);
+    messaging_event eevent(messaging_event::LINK_ERROR, pe);
+    if (pn_link_is_receiver(lnk)) {
+        receiver r = link(lnk).receiver();
+        if (pn_condition_is_set(pn_link_remote_condition(lnk))) {
+            delegate_.on_receiver_error(eevent, r);
+        }
+        delegate_.on_receiver_close(clevent, r);
+    } else {
+        sender s = link(lnk).sender();
+        if (pn_condition_is_set(pn_link_remote_condition(lnk))) {
+            delegate_.on_sender_error(eevent, s);
+        }
+        delegate_.on_sender_close(clevent, s);
     }
-    messaging_event mevent(messaging_event::LINK_CLOSE, pe);
-    delegate_.on_link_close(mevent, l);
     pn_link_close(lnk);
 }
 
@@ -209,16 +217,22 @@ void messaging_adapter::on_link_local_open(proton_event &pe) {
 
 void messaging_adapter::on_link_remote_open(proton_event &pe) {
     messaging_event mevent(messaging_event::LINK_OPEN, pe);
-    pn_link_t *link = pn_event_link(pe.pn_event());
-    class link l(link);
-    delegate_.on_link_open(mevent, l);
-    if (!is_local_open(pn_link_state(link)) && is_local_unititialised(pn_link_state(link))) {
+    pn_link_t *lnk = pn_event_link(pe.pn_event());
+    if (pn_link_is_receiver(lnk)) {
+      receiver r = link(lnk).receiver();
+      delegate_.on_receiver_open(mevent, r);
+    } else {
+      sender s = link(lnk).sender();
+      delegate_.on_sender_open(mevent, s);
+    }
+    if (!is_local_open(pn_link_state(lnk)) && is_local_unititialised(pn_link_state(lnk))) {
+        link l(lnk);
         if (pe.container_)
             l.open(pe.container_->impl_->link_options_);
         else
             l.open();    // No default for engine
     }
-    credit_topup(link);
+    credit_topup(lnk);
 }
 
 void messaging_adapter::on_transport_tail_closed(proton_event &pe) {

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/9bdea3b6/proton-c/bindings/cpp/src/messaging_adapter.hpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/src/messaging_adapter.hpp b/proton-c/bindings/cpp/src/messaging_adapter.hpp
index 1fa4172..ce51c4a 100644
--- a/proton-c/bindings/cpp/src/messaging_adapter.hpp
+++ b/proton-c/bindings/cpp/src/messaging_adapter.hpp
@@ -33,8 +33,7 @@
 
 namespace proton {
 
-// Combine's Python's: endpoint_state_handler, incoming_message_handler, outgoing_message_handler
-
+/// Convert the low level proton-c events to the higher level proton::handler calls
 class messaging_adapter : public proton_handler
 {
   public:


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org