You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ac...@apache.org on 2015/11/23 18:57:47 UTC

[19/50] [abbrv] qpid-proton git commit: NO-JIRA: c++: broker example clean-up.

NO-JIRA: c++: broker example clean-up.


Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/c0917a0f
Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/c0917a0f
Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/c0917a0f

Branch: refs/heads/go1
Commit: c0917a0fbc74eafa31a07f695c3ced00961cf184
Parents: 4d148e3
Author: Alan Conway <ac...@redhat.com>
Authored: Wed Nov 4 12:02:51 2015 -0500
Committer: Alan Conway <ac...@redhat.com>
Committed: Wed Nov 4 12:02:51 2015 -0500

----------------------------------------------------------------------
 examples/cpp/broker.hpp        |  12 -----
 examples/cpp/select_broker.cpp | 102 ++++++++++++++++++------------------
 2 files changed, 51 insertions(+), 63 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/c0917a0f/examples/cpp/broker.hpp
----------------------------------------------------------------------
diff --git a/examples/cpp/broker.hpp b/examples/cpp/broker.hpp
index 88d2e33..51dcaa3 100644
--- a/examples/cpp/broker.hpp
+++ b/examples/cpp/broker.hpp
@@ -36,14 +36,6 @@
 #include <list>
 #include <sstream>
 
-bool debug_enabled() {
-    const char *s = ::getenv("BROKER_DEBUG");
-    return s && *s;             // Set to non-empty string
-}
-
-/// Debug log messages to stderr.
-#define LOG_DEBUG(MSG) do { if (debug_enabled()) { std::cerr << MSG << std::endl; } } while(0)
-
 /** A simple implementation of a queue. */
 class queue {
   public:
@@ -52,19 +44,16 @@ class queue {
     std::string name() const { return name_; }
 
     void subscribe(proton::sender &s) {
-        LOG_DEBUG("queue " << name_ << ": subscribe " << s.name());
         consumers_.push_back(s.ptr());
     }
 
     // Return true if queue can be deleted.
     bool unsubscribe(proton::sender &s) {
-        LOG_DEBUG("queue " << name_ << ": unsubscribe " << s.name());
         consumers_.remove(s.ptr());
         return (consumers_.size() == 0 && (dynamic_ || messages_.size() == 0));
     }
 
     void publish(const proton::message &m, proton::receiver *r) {
-        LOG_DEBUG("queue " << name_ << ": receive from " << r->name() << " : " << m.body());
         messages_.push_back(m);
         dispatch(0);
     }
@@ -85,7 +74,6 @@ class queue {
         while (messages_.size()) {
             if (s->credit()) {
                 const proton::message& m = messages_.front();
-                LOG_DEBUG("queue " << name_ << ": send to " << s->name() << " : " << m.body());
                 s->send(m);
                 messages_.pop_front();
                 result = true;

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/c0917a0f/examples/cpp/select_broker.cpp
----------------------------------------------------------------------
diff --git a/examples/cpp/select_broker.cpp b/examples/cpp/select_broker.cpp
index a8a8f8f..ade47eb 100644
--- a/examples/cpp/select_broker.cpp
+++ b/examples/cpp/select_broker.cpp
@@ -22,20 +22,12 @@
 
 #include "proton/engine.hpp"
 
-#include <stdio.h>
-#include <stdlib.h>
-#include <string.h>
-
-#include <sys/types.h>
-#include <sys/socket.h>
-#include <netinet/in.h>
+#include <sstream>
 #include <arpa/inet.h>
-#include <netdb.h>
-
 #include <sys/select.h>
-#include <fcntl.h>
+#include <sys/socket.h>
+#include <sys/types.h>
 #include <unistd.h>
-#include <err.h>
 #include <errno.h>
 
 template <class T> T check(T result, const std::string& msg=std::string()) {
@@ -44,12 +36,9 @@ template <class T> T check(T result, const std::string& msg=std::string()) {
     return result;
 }
 
-static void fd_set_if(bool on, int fd, fd_set *fds) {
-    if (on)
-        FD_SET(fd, fds);
-    else
-        FD_CLR(fd, fds);
-}
+void fd_set_if(bool on, int fd, fd_set *fds);
+int do_listen(uint16_t port);
+int do_accept(int listen_fd);
 
 class broker {
 
@@ -59,7 +48,6 @@ class broker {
     broker_handler handler_;
     engine_map engines_;
     fd_set reading_, writing_;
-    int listen_;
 
   public:
     broker() : handler_(queues_) {
@@ -74,17 +62,24 @@ class broker {
 
     void run(uint16_t port) {
 
-        listen(port);
+        int listen_fd = do_listen(port);
+        FD_SET(listen_fd, &reading_);
 
         while(true) {
             fd_set readable_set = reading_;
             fd_set writable_set = writing_;
 
-            check(::select(FD_SETSIZE, &readable_set, &writable_set, NULL, NULL), "select");
+            check(select(FD_SETSIZE, &readable_set, &writable_set, NULL, NULL), "select");
             for (int fd = 0; fd < FD_SETSIZE; ++fd) {
-                if (fd == listen_ && FD_ISSET(fd, &readable_set)) 
-                    accept();
-
+                if (fd == listen_fd) {
+                    if (FD_ISSET(listen_fd, &readable_set)) {
+                        int new_fd = do_accept(listen_fd);
+                        engines_[new_fd] = new proton::engine(handler_);
+                        FD_SET(new_fd, &reading_);
+                        FD_SET(new_fd, &writing_);
+                    }
+                    continue;
+                }
                 if (engines_.find(fd) != engines_.end()) {
                     proton::engine& eng = *engines_[fd];
                     try {
@@ -114,36 +109,10 @@ class broker {
 
   private:
 
-    void listen(uint16_t port) {
-        listen_ = check(::socket(PF_INET, SOCK_STREAM, 0), "create listener");
-        int yes = 1;
-        check(::setsockopt(listen_, SOL_SOCKET, SO_REUSEADDR, &yes, sizeof(int)), "setsockopt");
-        struct sockaddr_in name;
-        name.sin_family = AF_INET;
-        name.sin_port = htons (port);
-        name.sin_addr.s_addr = htonl (INADDR_ANY);
-        check(::bind(listen_, (struct sockaddr *)&name, sizeof(name)), "bind listener");
-        check(::listen(listen_, 32), "listen");
-        std::cout << "listening on port " << port << " fd=" << listen_ << std::endl;
-        FD_SET(listen_, &reading_);
-    }
-
-    void accept() {
-        struct sockaddr_in client_addr;
-        socklen_t size = sizeof(client_addr);
-        int fd = check(::accept(listen_, (struct sockaddr *)&client_addr, &size), "accept");
-        engines_[fd] = new proton::engine(handler_);
-        FD_SET(fd, &reading_);
-        FD_SET(fd, &writing_);
-        std::cout << "accept " << ::inet_ntoa(client_addr.sin_addr)
-                  << ":" << ntohs(client_addr.sin_port)
-                  << " fd=" << fd << std::endl;
-    }
-
     void readable(int fd, proton::engine& eng) {
         proton::buffer<char> input = eng.input();
         if (input.size()) {
-            ssize_t n = check(::read(fd, input.begin(), input.size()));
+            ssize_t n = check(read(fd, input.begin(), input.size()));
             if (n > 0) {
                 eng.received(n);
             } else {
@@ -155,7 +124,7 @@ class broker {
     void writable(int fd, proton::engine& eng) {
         proton::buffer<const char> output = eng.output();
         if (output.size()) {
-            ssize_t n = check(::write(fd, output.begin(), output.size()));
+            ssize_t n = check(write(fd, output.begin(), output.size()));
             if (n > 0)
                 eng.sent(n);
             else {
@@ -166,6 +135,37 @@ class broker {
 
 };
 
+void fd_set_if(bool on, int fd, fd_set *fds) {
+    if (on)
+        FD_SET(fd, fds);
+    else
+        FD_CLR(fd, fds);
+}
+
+int do_listen(uint16_t port) {
+    int listen_fd = check(socket(PF_INET, SOCK_STREAM, 0), "create listener");
+    int yes = 1;
+    check(setsockopt(listen_fd, SOL_SOCKET, SO_REUSEADDR, &yes, sizeof(yes)), "setsockopt");
+    struct sockaddr_in name;
+    name.sin_family = AF_INET;
+    name.sin_port = htons (port);
+    name.sin_addr.s_addr = htonl (INADDR_ANY);
+    check(bind(listen_fd, (struct sockaddr *)&name, sizeof(name)), "bind listener");
+    check(listen(listen_fd, 32), "listen");
+    std::cout << "listening on port " << port << " fd=" << listen_fd << std::endl;
+    return listen_fd;
+}
+
+int do_accept(int listen_fd) {
+    struct sockaddr_in client_addr;
+    socklen_t size = sizeof(client_addr);
+    int fd = check(accept(listen_fd, (struct sockaddr *)&client_addr, &size), "accept");
+    std::cout << "accept " << ::inet_ntoa(client_addr.sin_addr)
+              << ":" << ntohs(client_addr.sin_port)
+              << " fd=" << fd << std::endl;
+    return fd;
+}
+
 int main(int argc, char **argv) {
     // Command line options
     proton::url url("0.0.0.0");


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org