You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ac...@apache.org on 2015/11/23 18:57:47 UTC
[19/50] [abbrv] qpid-proton git commit: NO-JIRA: c++: broker example
clean-up.
NO-JIRA: c++: broker example clean-up.
Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/c0917a0f
Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/c0917a0f
Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/c0917a0f
Branch: refs/heads/go1
Commit: c0917a0fbc74eafa31a07f695c3ced00961cf184
Parents: 4d148e3
Author: Alan Conway <ac...@redhat.com>
Authored: Wed Nov 4 12:02:51 2015 -0500
Committer: Alan Conway <ac...@redhat.com>
Committed: Wed Nov 4 12:02:51 2015 -0500
----------------------------------------------------------------------
examples/cpp/broker.hpp | 12 -----
examples/cpp/select_broker.cpp | 102 ++++++++++++++++++------------------
2 files changed, 51 insertions(+), 63 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/c0917a0f/examples/cpp/broker.hpp
----------------------------------------------------------------------
diff --git a/examples/cpp/broker.hpp b/examples/cpp/broker.hpp
index 88d2e33..51dcaa3 100644
--- a/examples/cpp/broker.hpp
+++ b/examples/cpp/broker.hpp
@@ -36,14 +36,6 @@
#include <list>
#include <sstream>
-bool debug_enabled() {
- const char *s = ::getenv("BROKER_DEBUG");
- return s && *s; // Set to non-empty string
-}
-
-/// Debug log messages to stderr.
-#define LOG_DEBUG(MSG) do { if (debug_enabled()) { std::cerr << MSG << std::endl; } } while(0)
-
/** A simple implementation of a queue. */
class queue {
public:
@@ -52,19 +44,16 @@ class queue {
std::string name() const { return name_; }
void subscribe(proton::sender &s) {
- LOG_DEBUG("queue " << name_ << ": subscribe " << s.name());
consumers_.push_back(s.ptr());
}
// Return true if queue can be deleted.
bool unsubscribe(proton::sender &s) {
- LOG_DEBUG("queue " << name_ << ": unsubscribe " << s.name());
consumers_.remove(s.ptr());
return (consumers_.size() == 0 && (dynamic_ || messages_.size() == 0));
}
void publish(const proton::message &m, proton::receiver *r) {
- LOG_DEBUG("queue " << name_ << ": receive from " << r->name() << " : " << m.body());
messages_.push_back(m);
dispatch(0);
}
@@ -85,7 +74,6 @@ class queue {
while (messages_.size()) {
if (s->credit()) {
const proton::message& m = messages_.front();
- LOG_DEBUG("queue " << name_ << ": send to " << s->name() << " : " << m.body());
s->send(m);
messages_.pop_front();
result = true;
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/c0917a0f/examples/cpp/select_broker.cpp
----------------------------------------------------------------------
diff --git a/examples/cpp/select_broker.cpp b/examples/cpp/select_broker.cpp
index a8a8f8f..ade47eb 100644
--- a/examples/cpp/select_broker.cpp
+++ b/examples/cpp/select_broker.cpp
@@ -22,20 +22,12 @@
#include "proton/engine.hpp"
-#include <stdio.h>
-#include <stdlib.h>
-#include <string.h>
-
-#include <sys/types.h>
-#include <sys/socket.h>
-#include <netinet/in.h>
+#include <sstream>
#include <arpa/inet.h>
-#include <netdb.h>
-
#include <sys/select.h>
-#include <fcntl.h>
+#include <sys/socket.h>
+#include <sys/types.h>
#include <unistd.h>
-#include <err.h>
#include <errno.h>
template <class T> T check(T result, const std::string& msg=std::string()) {
@@ -44,12 +36,9 @@ template <class T> T check(T result, const std::string& msg=std::string()) {
return result;
}
-static void fd_set_if(bool on, int fd, fd_set *fds) {
- if (on)
- FD_SET(fd, fds);
- else
- FD_CLR(fd, fds);
-}
+void fd_set_if(bool on, int fd, fd_set *fds);
+int do_listen(uint16_t port);
+int do_accept(int listen_fd);
class broker {
@@ -59,7 +48,6 @@ class broker {
broker_handler handler_;
engine_map engines_;
fd_set reading_, writing_;
- int listen_;
public:
broker() : handler_(queues_) {
@@ -74,17 +62,24 @@ class broker {
void run(uint16_t port) {
- listen(port);
+ int listen_fd = do_listen(port);
+ FD_SET(listen_fd, &reading_);
while(true) {
fd_set readable_set = reading_;
fd_set writable_set = writing_;
- check(::select(FD_SETSIZE, &readable_set, &writable_set, NULL, NULL), "select");
+ check(select(FD_SETSIZE, &readable_set, &writable_set, NULL, NULL), "select");
for (int fd = 0; fd < FD_SETSIZE; ++fd) {
- if (fd == listen_ && FD_ISSET(fd, &readable_set))
- accept();
-
+ if (fd == listen_fd) {
+ if (FD_ISSET(listen_fd, &readable_set)) {
+ int new_fd = do_accept(listen_fd);
+ engines_[new_fd] = new proton::engine(handler_);
+ FD_SET(new_fd, &reading_);
+ FD_SET(new_fd, &writing_);
+ }
+ continue;
+ }
if (engines_.find(fd) != engines_.end()) {
proton::engine& eng = *engines_[fd];
try {
@@ -114,36 +109,10 @@ class broker {
private:
- void listen(uint16_t port) {
- listen_ = check(::socket(PF_INET, SOCK_STREAM, 0), "create listener");
- int yes = 1;
- check(::setsockopt(listen_, SOL_SOCKET, SO_REUSEADDR, &yes, sizeof(int)), "setsockopt");
- struct sockaddr_in name;
- name.sin_family = AF_INET;
- name.sin_port = htons (port);
- name.sin_addr.s_addr = htonl (INADDR_ANY);
- check(::bind(listen_, (struct sockaddr *)&name, sizeof(name)), "bind listener");
- check(::listen(listen_, 32), "listen");
- std::cout << "listening on port " << port << " fd=" << listen_ << std::endl;
- FD_SET(listen_, &reading_);
- }
-
- void accept() {
- struct sockaddr_in client_addr;
- socklen_t size = sizeof(client_addr);
- int fd = check(::accept(listen_, (struct sockaddr *)&client_addr, &size), "accept");
- engines_[fd] = new proton::engine(handler_);
- FD_SET(fd, &reading_);
- FD_SET(fd, &writing_);
- std::cout << "accept " << ::inet_ntoa(client_addr.sin_addr)
- << ":" << ntohs(client_addr.sin_port)
- << " fd=" << fd << std::endl;
- }
-
void readable(int fd, proton::engine& eng) {
proton::buffer<char> input = eng.input();
if (input.size()) {
- ssize_t n = check(::read(fd, input.begin(), input.size()));
+ ssize_t n = check(read(fd, input.begin(), input.size()));
if (n > 0) {
eng.received(n);
} else {
@@ -155,7 +124,7 @@ class broker {
void writable(int fd, proton::engine& eng) {
proton::buffer<const char> output = eng.output();
if (output.size()) {
- ssize_t n = check(::write(fd, output.begin(), output.size()));
+ ssize_t n = check(write(fd, output.begin(), output.size()));
if (n > 0)
eng.sent(n);
else {
@@ -166,6 +135,37 @@ class broker {
};
+void fd_set_if(bool on, int fd, fd_set *fds) {
+ if (on)
+ FD_SET(fd, fds);
+ else
+ FD_CLR(fd, fds);
+}
+
+int do_listen(uint16_t port) {
+ int listen_fd = check(socket(PF_INET, SOCK_STREAM, 0), "create listener");
+ int yes = 1;
+ check(setsockopt(listen_fd, SOL_SOCKET, SO_REUSEADDR, &yes, sizeof(yes)), "setsockopt");
+ struct sockaddr_in name;
+ name.sin_family = AF_INET;
+ name.sin_port = htons (port);
+ name.sin_addr.s_addr = htonl (INADDR_ANY);
+ check(bind(listen_fd, (struct sockaddr *)&name, sizeof(name)), "bind listener");
+ check(listen(listen_fd, 32), "listen");
+ std::cout << "listening on port " << port << " fd=" << listen_fd << std::endl;
+ return listen_fd;
+}
+
+int do_accept(int listen_fd) {
+ struct sockaddr_in client_addr;
+ socklen_t size = sizeof(client_addr);
+ int fd = check(accept(listen_fd, (struct sockaddr *)&client_addr, &size), "accept");
+ std::cout << "accept " << ::inet_ntoa(client_addr.sin_addr)
+ << ":" << ntohs(client_addr.sin_port)
+ << " fd=" << fd << std::endl;
+ return fd;
+}
+
int main(int argc, char **argv) {
// Command line options
proton::url url("0.0.0.0");
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org