You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ac...@apache.org on 2015/06/18 23:58:26 UTC
[48/50] [abbrv] qpid-proton git commit: PROTON-865: Blocking sender
functionality and handler per connection
PROTON-865: Blocking sender functionality and handler per connection
Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/a4565b19
Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/a4565b19
Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/a4565b19
Branch: refs/heads/cjansen-cpp-client
Commit: a4565b19b5ec5d08ecea8a94648c95b570fc9e88
Parents: ab4b7b3
Author: Clifford Jansen <cl...@apache.org>
Authored: Wed May 20 07:54:03 2015 -0700
Committer: Alan Conway <ac...@redhat.com>
Committed: Thu Jun 18 17:28:44 2015 -0400
----------------------------------------------------------------------
proton-c/bindings/cpp/CMakeLists.txt | 7 +
.../cpp/examples/HelloWorldBlocking.cpp | 65 +++++++
.../cpp/include/proton/cpp/BlockingConnection.h | 67 +++++++
.../cpp/include/proton/cpp/BlockingLink.h | 59 +++++++
.../cpp/include/proton/cpp/BlockingSender.h | 54 ++++++
.../cpp/include/proton/cpp/Connection.h | 2 +-
.../bindings/cpp/include/proton/cpp/Container.h | 15 +-
.../bindings/cpp/include/proton/cpp/Delivery.h | 4 +-
.../bindings/cpp/include/proton/cpp/Duration.h | 56 ++++++
proton-c/bindings/cpp/include/proton/cpp/Link.h | 1 +
.../cpp/include/proton/cpp/MessagingAdapter.h | 3 -
.../cpp/include/proton/cpp/MessagingHandler.h | 6 +
.../bindings/cpp/include/proton/cpp/Sender.h | 3 +-
.../cpp/include/proton/cpp/WaitCondition.h | 45 +++++
proton-c/bindings/cpp/src/Connection.cpp | 4 +-
proton-c/bindings/cpp/src/ConnectionImpl.cpp | 28 ++-
proton-c/bindings/cpp/src/ConnectionImpl.h | 3 +-
proton-c/bindings/cpp/src/Container.cpp | 25 ++-
proton-c/bindings/cpp/src/ContainerImpl.cpp | 175 +++++++++++--------
proton-c/bindings/cpp/src/ContainerImpl.h | 24 ++-
proton-c/bindings/cpp/src/Duration.cpp | 55 ++++++
proton-c/bindings/cpp/src/Link.cpp | 4 +
proton-c/bindings/cpp/src/MessagingAdapter.cpp | 6 +-
proton-c/bindings/cpp/src/MessagingHandler.cpp | 57 +++++-
proton-c/bindings/cpp/src/Sender.cpp | 3 +-
.../cpp/src/blocking/BlockingConnection.cpp | 62 +++++++
.../cpp/src/blocking/BlockingConnectionImpl.cpp | 124 +++++++++++++
.../cpp/src/blocking/BlockingConnectionImpl.h | 63 +++++++
.../bindings/cpp/src/blocking/BlockingLink.cpp | 86 +++++++++
.../cpp/src/blocking/BlockingSender.cpp | 66 +++++++
30 files changed, 1060 insertions(+), 112 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/a4565b19/proton-c/bindings/cpp/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/CMakeLists.txt b/proton-c/bindings/cpp/CMakeLists.txt
index 54d8ddd..34123ed 100644
--- a/proton-c/bindings/cpp/CMakeLists.txt
+++ b/proton-c/bindings/cpp/CMakeLists.txt
@@ -40,6 +40,7 @@ set (qpid-proton-cpp-core
src/Terminus.cpp
src/Acceptor.cpp
src/Url.cpp
+ src/Duration.cpp
src/Message.cpp
src/MessagingAdapter.cpp
src/MessagingEvent.cpp
@@ -55,6 +56,10 @@ set (qpid-proton-cpp-core
src/Logger.cpp
src/contexts.cpp
src/exceptions.cpp
+ src/blocking/BlockingConnection.cpp
+ src/blocking/BlockingConnectionImpl.cpp
+ src/blocking/BlockingLink.cpp
+ src/blocking/BlockingSender.cpp
)
#set_source_files_properties (
@@ -102,6 +107,8 @@ add_executable (SimpleSend examples/SimpleSend.cpp)
target_link_libraries (SimpleSend qpid-proton-cpp)
add_executable (Broker examples/Broker.cpp)
target_link_libraries (Broker qpid-proton-cpp)
+add_executable (HelloWorldBlocking examples/HelloWorldBlocking.cpp)
+target_link_libraries (HelloWorldBlocking qpid-proton-cpp)
install (TARGETS qpid-proton-cpp
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/a4565b19/proton-c/bindings/cpp/examples/HelloWorldBlocking.cpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/examples/HelloWorldBlocking.cpp b/proton-c/bindings/cpp/examples/HelloWorldBlocking.cpp
new file mode 100644
index 0000000..a3f729c
--- /dev/null
+++ b/proton-c/bindings/cpp/examples/HelloWorldBlocking.cpp
@@ -0,0 +1,65 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "proton/cpp/Container.h"
+#include "proton/cpp/MessagingHandler.h"
+#include "proton/cpp/BlockingSender.h"
+
+#include <iostream>
+
+
+using namespace proton::reactor;
+
+class HelloWorldBlocking : public MessagingHandler {
+ private:
+ std::string server;
+ std::string address;
+ public:
+
+ HelloWorldBlocking(const std::string &s, const std::string &addr) : server(s), address(addr) {}
+
+ void onStart(Event &e) {
+ Connection conn = e.getContainer().connect(server);
+ e.getContainer().createReceiver(conn, address);
+ }
+
+ void onMessage(Event &e) {
+ std::string body = e.getMessage().getBody();
+ std::cout << body << std::endl;
+ e.getConnection().close();
+ }
+
+};
+
+int main(int argc, char **argv) {
+ std::string url("localhost:5672");
+ std::string addr("examples");
+ BlockingConnection conn = BlockingConnection(url);
+ BlockingSender sender = conn.createSender(addr);
+ Message m;
+ m.setBody("Hello World!");
+ sender.send(m);
+ conn.close();
+
+ // Temporary hack until blocking receiver available
+ HelloWorldBlocking hw("localhost:5672", "examples");
+ Container(hw).run();
+}
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/a4565b19/proton-c/bindings/cpp/include/proton/cpp/BlockingConnection.h
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/include/proton/cpp/BlockingConnection.h b/proton-c/bindings/cpp/include/proton/cpp/BlockingConnection.h
new file mode 100644
index 0000000..aa268db
--- /dev/null
+++ b/proton-c/bindings/cpp/include/proton/cpp/BlockingConnection.h
@@ -0,0 +1,67 @@
+#ifndef PROTON_CPP_BLOCKINGCONNECTION_H
+#define PROTON_CPP_BLOCKINGCONNECTION_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "proton/cpp/ImportExport.h"
+#include "proton/cpp/Handle.h"
+#include "proton/cpp/Endpoint.h"
+#include "proton/cpp/Container.h"
+#include "proton/cpp/Duration.h"
+#include "proton/cpp/MessagingHandler.h"
+#include "proton/types.h"
+#include <string>
+
+struct pn_connection_t;
+
+namespace proton {
+namespace reactor {
+
+class Container;
+class BlockingConnectionImpl;
+class SslDomain;
+class BlockingSender;
+class WaitCondition;
+
+class BlockingConnection : public Handle<BlockingConnectionImpl>
+{
+ public:
+ PROTON_CPP_EXTERN BlockingConnection();
+ PROTON_CPP_EXTERN BlockingConnection(const BlockingConnection& c);
+ PROTON_CPP_EXTERN BlockingConnection& operator=(const BlockingConnection& c);
+ PROTON_CPP_EXTERN ~BlockingConnection();
+
+ PROTON_CPP_EXTERN BlockingConnection(std::string &url, Duration = Duration::FOREVER,
+ SslDomain *ssld=0, Container *c=0);
+ PROTON_CPP_EXTERN void close();
+
+ PROTON_CPP_EXTERN BlockingSender createSender(std::string &address, Handler *h=0);
+ PROTON_CPP_EXTERN void wait(WaitCondition &condition);
+ PROTON_CPP_EXTERN void wait(WaitCondition &condition, std::string &msg, Duration timeout=Duration::FOREVER);
+ PROTON_CPP_EXTERN Duration getTimeout();
+ private:
+ friend class PrivateImplRef<BlockingConnection>;
+};
+
+
+}} // namespace proton::reactor
+
+#endif /*!PROTON_CPP_BLOCKINGCONNECTION_H*/
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/a4565b19/proton-c/bindings/cpp/include/proton/cpp/BlockingLink.h
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/include/proton/cpp/BlockingLink.h b/proton-c/bindings/cpp/include/proton/cpp/BlockingLink.h
new file mode 100644
index 0000000..7f84ce8
--- /dev/null
+++ b/proton-c/bindings/cpp/include/proton/cpp/BlockingLink.h
@@ -0,0 +1,59 @@
+#ifndef PROTON_CPP_BLOCKINGLINK_H
+#define PROTON_CPP_BLOCKINGLINK_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "proton/cpp/ImportExport.h"
+#include "proton/cpp/Handle.h"
+#include "proton/cpp/Endpoint.h"
+#include "proton/cpp/Container.h"
+#include "proton/cpp/Duration.h"
+#include "proton/cpp/MessagingHandler.h"
+#include "proton/cpp/BlockingConnection.h"
+#include "proton/types.h"
+#include <string>
+
+namespace proton {
+namespace reactor {
+
+class BlockingConnection;
+
+class BlockingLink
+{
+ public:
+ PROTON_CPP_EXTERN void close();
+ ~BlockingLink();
+ protected:
+ PROTON_CPP_EXTERN BlockingLink(BlockingConnection *c, pn_link_t *l);
+ PROTON_CPP_EXTERN void waitForClosed(Duration timeout=Duration::SECOND);
+ private:
+ BlockingConnection connection;
+ Link link;
+ void checkClosed();
+ friend class BlockingConnection;
+ friend class BlockingSender;
+ friend class BlockingReceiver;
+};
+
+
+}} // namespace proton::reactor
+
+#endif /*!PROTON_CPP_BLOCKINGLINK_H*/
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/a4565b19/proton-c/bindings/cpp/include/proton/cpp/BlockingSender.h
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/include/proton/cpp/BlockingSender.h b/proton-c/bindings/cpp/include/proton/cpp/BlockingSender.h
new file mode 100644
index 0000000..d4ddeae
--- /dev/null
+++ b/proton-c/bindings/cpp/include/proton/cpp/BlockingSender.h
@@ -0,0 +1,54 @@
+#ifndef PROTON_CPP_BLOCKINGSENDER_H
+#define PROTON_CPP_BLOCKINGSENDER_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "proton/cpp/ImportExport.h"
+#include "proton/cpp/Handle.h"
+#include "proton/cpp/Endpoint.h"
+#include "proton/cpp/Container.h"
+#include "proton/cpp/Duration.h"
+#include "proton/cpp/MessagingHandler.h"
+#include "proton/cpp/BlockingLink.h"
+#include "proton/types.h"
+#include "proton/delivery.h"
+#include <string>
+
+namespace proton {
+namespace reactor {
+
+class BlockingConnection;
+class BlockingLink;
+
+class BlockingSender : public BlockingLink
+{
+ public:
+ PROTON_CPP_EXTERN Delivery send(Message &msg);
+ PROTON_CPP_EXTERN Delivery send(Message &msg, Duration timeout);
+ private:
+ PROTON_CPP_EXTERN BlockingSender(BlockingConnection &c, Sender &l);
+ friend class BlockingConnection;
+};
+
+
+}} // namespace proton::reactor
+
+#endif /*!PROTON_CPP_BLOCKINGSENDER_H*/
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/a4565b19/proton-c/bindings/cpp/include/proton/cpp/Connection.h
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/include/proton/cpp/Connection.h b/proton-c/bindings/cpp/include/proton/cpp/Connection.h
index 86abbe6..f3397ce 100644
--- a/proton-c/bindings/cpp/include/proton/cpp/Connection.h
+++ b/proton-c/bindings/cpp/include/proton/cpp/Connection.h
@@ -47,7 +47,7 @@ class Connection : public Endpoint, public Handle<ConnectionImpl>
PROTON_CPP_EXTERN Connection& operator=(const Connection& c);
PROTON_CPP_EXTERN ~Connection();
- PROTON_CPP_EXTERN Connection(Container &c);
+ PROTON_CPP_EXTERN Connection(Container &c, Handler *h = 0);
PROTON_CPP_EXTERN Transport &getTransport();
PROTON_CPP_EXTERN Handler *getOverride();
PROTON_CPP_EXTERN void setOverride(Handler *h);
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/a4565b19/proton-c/bindings/cpp/include/proton/cpp/Container.h
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/include/proton/cpp/Container.h b/proton-c/bindings/cpp/include/proton/cpp/Container.h
index d596ab1..1d7284d 100644
--- a/proton-c/bindings/cpp/include/proton/cpp/Container.h
+++ b/proton-c/bindings/cpp/include/proton/cpp/Container.h
@@ -24,6 +24,7 @@
#include "proton/cpp/ImportExport.h"
#include "proton/cpp/Handle.h"
#include "proton/cpp/Acceptor.h"
+#include "proton/cpp/Duration.h"
#include <proton/reactor.h>
#include <string>
@@ -39,6 +40,7 @@ class MessagingHandler;
class Sender;
class Receiver;
class Link;
+ class Handler;
class Container : public Handle<ContainerImpl>
{
@@ -48,17 +50,24 @@ class Container : public Handle<ContainerImpl>
PROTON_CPP_EXTERN Container& operator=(const Container& c);
PROTON_CPP_EXTERN ~Container();
+ PROTON_CPP_EXTERN Container();
PROTON_CPP_EXTERN Container(MessagingHandler &mhandler);
- PROTON_CPP_EXTERN Connection connect(std::string &host);
+ PROTON_CPP_EXTERN Connection connect(std::string &host, Handler *h=0);
PROTON_CPP_EXTERN void run();
+ PROTON_CPP_EXTERN void start();
+ PROTON_CPP_EXTERN bool process();
+ PROTON_CPP_EXTERN void stop();
+ PROTON_CPP_EXTERN void wakeup();
+ PROTON_CPP_EXTERN bool isQuiesced();
PROTON_CPP_EXTERN pn_reactor_t *getReactor();
- PROTON_CPP_EXTERN pn_handler_t *getGlobalHandler();
- PROTON_CPP_EXTERN Sender createSender(Connection &connection, std::string &addr);
+ PROTON_CPP_EXTERN Sender createSender(Connection &connection, std::string &addr, Handler *h=0);
PROTON_CPP_EXTERN Sender createSender(std::string &url);
PROTON_CPP_EXTERN Receiver createReceiver(Connection &connection, std::string &addr);
PROTON_CPP_EXTERN Receiver createReceiver(const std::string &url);
PROTON_CPP_EXTERN Acceptor listen(const std::string &url);
PROTON_CPP_EXTERN std::string getContainerId();
+ PROTON_CPP_EXTERN Duration getTimeout();
+ PROTON_CPP_EXTERN void setTimeout(Duration timeout);
private:
friend class PrivateImplRef<Container>;
};
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/a4565b19/proton-c/bindings/cpp/include/proton/cpp/Delivery.h
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/include/proton/cpp/Delivery.h b/proton-c/bindings/cpp/include/proton/cpp/Delivery.h
index a1965f6..8171dd5 100644
--- a/proton-c/bindings/cpp/include/proton/cpp/Delivery.h
+++ b/proton-c/bindings/cpp/include/proton/cpp/Delivery.h
@@ -22,9 +22,11 @@
*
*/
#include "proton/cpp/ImportExport.h"
-#include "proton/cpp/Link.h"
+#include "proton/cpp/ProtonHandle.h"
#include "ProtonImplRef.h"
+
+#include "proton/delivery.h"
#include "proton/disposition.h"
namespace proton {
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/a4565b19/proton-c/bindings/cpp/include/proton/cpp/Duration.h
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/include/proton/cpp/Duration.h b/proton-c/bindings/cpp/include/proton/cpp/Duration.h
new file mode 100644
index 0000000..d5aca03
--- /dev/null
+++ b/proton-c/bindings/cpp/include/proton/cpp/Duration.h
@@ -0,0 +1,56 @@
+#ifndef PROTON_CPP_DURATION_H
+#define PROTON_CPP_DURATION_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "proton/cpp/ImportExport.h"
+#include "proton/types.h"
+
+namespace proton {
+namespace reactor {
+
+/** \ingroup C++
+ * A duration is a time in milliseconds.
+ */
+class Duration
+{
+ public:
+ PROTON_CPP_EXTERN explicit Duration(uint64_t milliseconds);
+ PROTON_CPP_EXTERN uint64_t getMilliseconds() const;
+ PROTON_CPP_EXTERN static const Duration FOREVER;
+ PROTON_CPP_EXTERN static const Duration IMMEDIATE;
+ PROTON_CPP_EXTERN static const Duration SECOND;
+ PROTON_CPP_EXTERN static const Duration MINUTE;
+ private:
+ uint64_t milliseconds;
+};
+
+PROTON_CPP_EXTERN Duration operator*(const Duration& duration,
+ uint64_t multiplier);
+PROTON_CPP_EXTERN Duration operator*(uint64_t multiplier,
+ const Duration& duration);
+PROTON_CPP_EXTERN bool operator==(const Duration& a, const Duration& b);
+PROTON_CPP_EXTERN bool operator!=(const Duration& a, const Duration& b);
+
+}} // namespace proton::reactor
+
+#endif /*!PROTON_CPP_DURATION_H*/
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/a4565b19/proton-c/bindings/cpp/include/proton/cpp/Link.h
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/include/proton/cpp/Link.h b/proton-c/bindings/cpp/include/proton/cpp/Link.h
index 265d80d..391e5fc 100644
--- a/proton-c/bindings/cpp/include/proton/cpp/Link.h
+++ b/proton-c/bindings/cpp/include/proton/cpp/Link.h
@@ -50,6 +50,7 @@ class Link : public Endpoint, public ProtonHandle<pn_link_t>
PROTON_CPP_EXTERN Terminus getTarget();
PROTON_CPP_EXTERN Terminus getRemoteSource();
PROTON_CPP_EXTERN Terminus getRemoteTarget();
+ PROTON_CPP_EXTERN std::string getName();
PROTON_CPP_EXTERN pn_link_t *getPnLink() const;
virtual PROTON_CPP_EXTERN Connection &getConnection();
PROTON_CPP_EXTERN Link getNext(Endpoint::State mask);
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/a4565b19/proton-c/bindings/cpp/include/proton/cpp/MessagingAdapter.h
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/include/proton/cpp/MessagingAdapter.h b/proton-c/bindings/cpp/include/proton/cpp/MessagingAdapter.h
index 36a92e4..280df5b 100644
--- a/proton-c/bindings/cpp/include/proton/cpp/MessagingAdapter.h
+++ b/proton-c/bindings/cpp/include/proton/cpp/MessagingAdapter.h
@@ -71,9 +71,6 @@ class MessagingAdapter : public MessagingHandler
PROTON_CPP_EXTERN virtual void onTransportTailClosed(Event &e);
private:
MessagingHandler &delegate; // The handler for generated MessagingEvent's
- bool autoSettle;
- bool autoAccept;
- bool peerCloseIsError;
};
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/a4565b19/proton-c/bindings/cpp/include/proton/cpp/MessagingHandler.h
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/include/proton/cpp/MessagingHandler.h b/proton-c/bindings/cpp/include/proton/cpp/MessagingHandler.h
index 4f00681..c6d8f72 100644
--- a/proton-c/bindings/cpp/include/proton/cpp/MessagingHandler.h
+++ b/proton-c/bindings/cpp/include/proton/cpp/MessagingHandler.h
@@ -30,6 +30,7 @@ namespace proton {
namespace reactor {
class Event;
+class MessagingAdapter;
class PROTON_CPP_EXTERN MessagingHandler : public ProtonHandler , public Acking
{
@@ -80,9 +81,14 @@ class PROTON_CPP_EXTERN MessagingHandler : public ProtonHandler , public Acking
bool autoSettle;
bool autoAccept;
bool peerCloseIsError;
+ MessagingAdapter *messagingAdapter;
+ Handler *flowController;
+ PROTON_CPP_EXTERN MessagingHandler(bool rawHandler, int prefetch=10, bool autoAccept=true, bool autoSettle=true,
+ bool peerCloseIsError=false);
private:
friend class ContainerImpl;
friend class MessagingAdapter;
+ void createHelpers();
};
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/a4565b19/proton-c/bindings/cpp/include/proton/cpp/Sender.h
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/include/proton/cpp/Sender.h b/proton-c/bindings/cpp/include/proton/cpp/Sender.h
index 9b8683d..c63161c 100644
--- a/proton-c/bindings/cpp/include/proton/cpp/Sender.h
+++ b/proton-c/bindings/cpp/include/proton/cpp/Sender.h
@@ -22,6 +22,7 @@
*
*/
#include "proton/cpp/ImportExport.h"
+#include "proton/cpp/Delivery.h"
#include "proton/cpp/Link.h"
#include "proton/cpp/Message.h"
@@ -40,7 +41,7 @@ class Sender : public Link
PROTON_CPP_EXTERN Sender(pn_link_t *lnk);
PROTON_CPP_EXTERN Sender();
PROTON_CPP_EXTERN Sender(const Link& c);
- PROTON_CPP_EXTERN void send(Message &m);
+ PROTON_CPP_EXTERN Delivery send(Message &m);
protected:
virtual void verifyType(pn_link_t *l);
};
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/a4565b19/proton-c/bindings/cpp/include/proton/cpp/WaitCondition.h
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/include/proton/cpp/WaitCondition.h b/proton-c/bindings/cpp/include/proton/cpp/WaitCondition.h
new file mode 100644
index 0000000..f4c7cb5
--- /dev/null
+++ b/proton-c/bindings/cpp/include/proton/cpp/WaitCondition.h
@@ -0,0 +1,45 @@
+#ifndef PROTON_CPP_WAITCONDITION_H
+#define PROTON_CPP_WAITCONDITION_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "proton/cpp/ImportExport.h"
+
+namespace proton {
+namespace reactor {
+
+// Interface class to indicates that an expected contion has been
+// achieved, i.e. for BlockingConnection.wait()
+
+class WaitCondition
+{
+ public:
+ PROTON_CPP_EXTERN virtual ~WaitCondition();
+
+ // Overide this member function to indicate whether an expected
+ // condition is achieved and requires no further waiting.
+ virtual bool achieved() = 0;
+};
+
+
+}} // namespace proton::reactor
+
+#endif /*!PROTON_CPP_WAITCONDITION_H*/
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/a4565b19/proton-c/bindings/cpp/src/Connection.cpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/src/Connection.cpp b/proton-c/bindings/cpp/src/Connection.cpp
index 1db8fbc..67e7d0c 100644
--- a/proton-c/bindings/cpp/src/Connection.cpp
+++ b/proton-c/bindings/cpp/src/Connection.cpp
@@ -42,8 +42,8 @@ Connection::Connection(const Connection& c) : Handle<ConnectionImpl>() { PI::cop
Connection& Connection::operator=(const Connection& c) { return PI::assign(*this, c); }
Connection::~Connection() { PI::dtor(*this); }
-Connection::Connection(Container &c) {
- ConnectionImpl *cimpl = new ConnectionImpl(c);
+Connection::Connection(Container &c, Handler *h) {
+ ConnectionImpl *cimpl = new ConnectionImpl(c, h);
PI::ctor(*this, cimpl);
}
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/a4565b19/proton-c/bindings/cpp/src/ConnectionImpl.cpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/src/ConnectionImpl.cpp b/proton-c/bindings/cpp/src/ConnectionImpl.cpp
index 9cadffe..f7cc5f9 100644
--- a/proton-c/bindings/cpp/src/ConnectionImpl.cpp
+++ b/proton-c/bindings/cpp/src/ConnectionImpl.cpp
@@ -25,6 +25,8 @@
#include "proton/cpp/Transport.h"
#include "Msg.h"
#include "contexts.h"
+#include "PrivateImplRef.h"
+#include "ContainerImpl.h"
#include "proton/connection.h"
@@ -41,12 +43,25 @@ void ConnectionImpl::decref(ConnectionImpl *impl) {
delete impl;
}
-ConnectionImpl::ConnectionImpl(Container &c, pn_connection_t *pnConn) : container(c), refCount(0), override(0), transport(0), defaultSession(0),
- pnConnection(pnConn),
- reactorReference(this)
+ConnectionImpl::ConnectionImpl(Container &c, pn_connection_t &pnConn)
+ : container(c), refCount(0), override(0), transport(0), defaultSession(0),
+ pnConnection(&pnConn), reactorReference(this)
{
- if (!pnConnection)
- pnConnection = pn_reactor_connection(container.getReactor(), NULL);
+ setConnectionContext(pnConnection, this);
+}
+
+ConnectionImpl::ConnectionImpl(Container &c, Handler *handler)
+ : container(c), refCount(0), override(0), transport(0), defaultSession(0),
+ reactorReference(this)
+{
+ pn_handler_t *chandler = 0;
+ if (handler) {
+ ContainerImpl *containerImpl = PrivateImplRef<Container>::get(c);
+ chandler = containerImpl->wrapHandler(handler);
+ }
+ pnConnection = pn_reactor_connection(container.getReactor(), chandler);
+ if (chandler)
+ pn_decref(chandler);
setConnectionContext(pnConnection, this);
}
@@ -112,7 +127,7 @@ Connection &ConnectionImpl::getReactorReference(pn_connection_t *conn) {
Container container(getContainerContext(reactor));
if (!container) // can't be one created by our container
throw ProtonException(MSG("Unknown Proton connection specifier"));
- impl = new ConnectionImpl(container, conn);
+ impl = new ConnectionImpl(container, *conn);
}
return impl->reactorReference;
}
@@ -121,5 +136,4 @@ Link ConnectionImpl::getLinkHead(Endpoint::State mask) {
return Link(pn_link_head(pnConnection, mask));
}
-
}} // namespace proton::reactor
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/a4565b19/proton-c/bindings/cpp/src/ConnectionImpl.h
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/src/ConnectionImpl.h b/proton-c/bindings/cpp/src/ConnectionImpl.h
index 11b5765..48210a3 100644
--- a/proton-c/bindings/cpp/src/ConnectionImpl.h
+++ b/proton-c/bindings/cpp/src/ConnectionImpl.h
@@ -39,7 +39,8 @@ class Container;
class ConnectionImpl : public Endpoint
{
public:
- PROTON_CPP_EXTERN ConnectionImpl(Container &c, pn_connection_t *pnConn = 0);
+ PROTON_CPP_EXTERN ConnectionImpl(Container &c, pn_connection_t &pnConn);
+ PROTON_CPP_EXTERN ConnectionImpl(Container &c, Handler *h = 0);
PROTON_CPP_EXTERN ~ConnectionImpl();
PROTON_CPP_EXTERN Transport &getTransport();
PROTON_CPP_EXTERN Handler *getOverride();
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/a4565b19/proton-c/bindings/cpp/src/Container.cpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/src/Container.cpp b/proton-c/bindings/cpp/src/Container.cpp
index 4eccb15..e72f484 100644
--- a/proton-c/bindings/cpp/src/Container.cpp
+++ b/proton-c/bindings/cpp/src/Container.cpp
@@ -53,17 +53,23 @@ Container::Container(MessagingHandler &mhandler) {
PI::ctor(*this, cimpl);
}
-Connection Container::connect(std::string &host) { return impl->connect(host); }
+Container::Container() {
+ ContainerImpl *cimpl = new ContainerImpl();
+ PI::ctor(*this, cimpl);
+}
-pn_reactor_t *Container::getReactor() { return impl->getReactor(); }
+Connection Container::connect(std::string &host, Handler *h) { return impl->connect(host, h); }
-pn_handler_t *Container::getGlobalHandler() { return impl->getGlobalHandler(); }
+pn_reactor_t *Container::getReactor() { return impl->getReactor(); }
std::string Container::getContainerId() { return impl->getContainerId(); }
+Duration Container::getTimeout() { return impl->getTimeout(); }
+void Container::setTimeout(Duration timeout) { impl->setTimeout(timeout); }
+
-Sender Container::createSender(Connection &connection, std::string &addr) {
- return impl->createSender(connection, addr);
+Sender Container::createSender(Connection &connection, std::string &addr, Handler *h) {
+ return impl->createSender(connection, addr, h);
}
Sender Container::createSender(std::string &urlString) {
@@ -83,8 +89,11 @@ Acceptor Container::listen(const std::string &urlString) {
}
-void Container::run() {
- impl->run();
-}
+void Container::run() { impl->run(); }
+void Container::start() { impl->start(); }
+bool Container::process() { return impl->process(); }
+void Container::stop() { impl->stop(); }
+void Container::wakeup() { impl->wakeup(); }
+bool Container::isQuiesced() { return impl->isQuiesced(); }
}} // namespace proton::reactor
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/a4565b19/proton-c/bindings/cpp/src/ContainerImpl.cpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/src/ContainerImpl.cpp b/proton-c/bindings/cpp/src/ContainerImpl.cpp
index 1cabf6c..1424dbb 100644
--- a/proton-c/bindings/cpp/src/ContainerImpl.cpp
+++ b/proton-c/bindings/cpp/src/ContainerImpl.cpp
@@ -116,41 +116,19 @@ class OverrideHandler : public Handler
pn_handler_dispatch(baseHandler, cevent, (pn_event_type_t) type);
if (conn && type == PN_CONNECTION_FINAL) {
- // TODO: this must be the last acation of the last handler looking at
+ // TODO: this must be the last action of the last handler looking at
// connection events. Better: generate a custom FINAL event (or task). Or move to
// separate event streams per connection as part of multi threading support.
ConnectionImpl *cimpl = getConnectionContext(conn);
if (cimpl)
cimpl->reactorDetach();
- // TODO: remember all connections and do reactorDetach of zombies connections
- // not pn_connection_release'd at PN_REACTOR_FINAL.
+ // TODO: remember all connections and do reactorDetach of zombie connections
+ // not yet pn_connection_release'd at PN_REACTOR_FINAL.
}
}
};
-class CFlowController : public ProtonHandler
-{
- public:
- pn_handler_t *flowcontroller;
-
- CFlowController(int window) : flowcontroller(pn_flowcontroller(window)) {}
- ~CFlowController() {
- pn_decref(flowcontroller);
- }
-
- void redirect(Event &e) {
- ProtonEvent *pne = dynamic_cast<ProtonEvent *>(&e);
- pn_handler_dispatch(flowcontroller, pne->getPnEvent(), (pn_event_type_t) pne->getType());
- }
-
- virtual void onLinkLocalOpen(Event &e) { redirect(e); }
- virtual void onLinkRemoteOpen(Event &e) { redirect(e); }
- virtual void onLinkFlow(Event &e) { redirect(e); }
- virtual void onDelivery(Event &e) { redirect(e); }
-};
-
-
namespace {
// TODO: configurable policy. SessionPerConnection for now.
@@ -165,7 +143,6 @@ Session getDefaultSession(pn_connection_t *conn, pn_session_t **ses) {
struct InboundContext {
ContainerImpl *containerImpl;
- Container containerRef; // create only once for all inbound events
Handler *cppHandler;
};
@@ -174,11 +151,6 @@ ContainerImpl *getContainerImpl(pn_handler_t *c_handler) {
return ctxt->containerImpl;
}
-Container &getContainerRef(pn_handler_t *c_handler) {
- struct InboundContext *ctxt = (struct InboundContext *) pn_handler_mem(c_handler);
- return ctxt->containerRef;
-}
-
Handler &getCppHandler(pn_handler_t *c_handler) {
struct InboundContext *ctxt = (struct InboundContext *) pn_handler_mem(c_handler);
return *ctxt->cppHandler;
@@ -186,21 +158,19 @@ Handler &getCppHandler(pn_handler_t *c_handler) {
void cpp_handler_dispatch(pn_handler_t *c_handler, pn_event_t *cevent, pn_event_type_t type)
{
- MessagingEvent mevent(cevent, type, getContainerRef(c_handler));
+ Container c(getContainerImpl(c_handler)); // Ref counted per event, but when is the last event if stop() never called?
+ MessagingEvent mevent(cevent, type, c);
mevent.dispatch(getCppHandler(c_handler));
}
void cpp_handler_cleanup(pn_handler_t *c_handler)
{
- struct InboundContext *ctxt = (struct InboundContext *) pn_handler_mem(c_handler);
- ctxt->containerRef.~Container();
}
pn_handler_t *cpp_handler(ContainerImpl *c, Handler *h)
{
pn_handler_t *handler = pn_handler_new(cpp_handler_dispatch, sizeof(struct InboundContext), cpp_handler_cleanup);
struct InboundContext *ctxt = (struct InboundContext *) pn_handler_mem(handler);
- new (&ctxt->containerRef) Container(c);
ctxt->containerImpl = c;
ctxt->cppHandler = h;
return handler;
@@ -220,18 +190,29 @@ void ContainerImpl::decref(ContainerImpl *impl) {
delete impl;
}
-ContainerImpl::ContainerImpl(MessagingHandler &mhandler) :
- reactor(0), globalHandler(0), messagingHandler(mhandler), containerId(generateUuid()),
+ContainerImpl::ContainerImpl(Handler &h) :
+ reactor(0), handler(&h), messagingAdapter(0),
+ overrideHandler(0), flowController(0), containerId(generateUuid()),
refCount(0)
-{
-}
+{}
+
+ContainerImpl::ContainerImpl() :
+ reactor(0), handler(0), messagingAdapter(0),
+ overrideHandler(0), flowController(0), containerId(generateUuid()),
+ refCount(0)
+{}
-ContainerImpl::~ContainerImpl() {}
+ContainerImpl::~ContainerImpl() {
+ delete overrideHandler;
+ delete flowController;
+ delete messagingAdapter;
+ pn_reactor_free(reactor);
+}
-Connection ContainerImpl::connect(std::string &host) {
- if (!reactor) throw ProtonException(MSG("Container not initialized"));
+Connection ContainerImpl::connect(std::string &host, Handler *h) {
+ if (!reactor) throw ProtonException(MSG("Container not started"));
Container cntnr(this);
- Connection connection(cntnr);
+ Connection connection(cntnr, handler);
Connector *connector = new Connector(connection);
// Connector self-deletes depending on reconnect logic
connector->setAddress(host); // TODO: url vector
@@ -242,15 +223,36 @@ Connection ContainerImpl::connect(std::string &host) {
pn_reactor_t *ContainerImpl::getReactor() { return reactor; }
-pn_handler_t *ContainerImpl::getGlobalHandler() { return globalHandler; }
std::string ContainerImpl::getContainerId() { return containerId; }
+Duration ContainerImpl::getTimeout() {
+ pn_millis_t tmo = pn_reactor_get_timeout(reactor);
+ if (tmo == PN_MILLIS_MAX)
+ return Duration::FOREVER;
+ return Duration(tmo);
+}
-Sender ContainerImpl::createSender(Connection &connection, std::string &addr) {
+void ContainerImpl::setTimeout(Duration timeout) {
+ if (timeout == Duration::FOREVER || timeout.getMilliseconds() > PN_MILLIS_MAX)
+ pn_reactor_set_timeout(reactor, PN_MILLIS_MAX);
+ else {
+ pn_millis_t tmo = timeout.getMilliseconds();
+ pn_reactor_set_timeout(reactor, tmo);
+ }
+}
+
+
+Sender ContainerImpl::createSender(Connection &connection, std::string &addr, Handler *h) {
+ if (!reactor) throw ProtonException(MSG("Container not started"));
Session session = getDefaultSession(connection.getPnConnection(), &getImpl(connection)->defaultSession);
Sender snd = session.createSender(containerId + '-' + addr);
- pn_terminus_set_address(pn_link_target(snd.getPnLink()), addr.c_str());
+ pn_link_t *lnk = snd.getPnLink();
+ pn_terminus_set_address(pn_link_target(lnk), addr.c_str());
+ if (h) {
+ pn_record_t *record = pn_link_attachments(lnk);
+ pn_record_set_handler(record, wrapHandler(h));
+ }
snd.open();
ConnectionImpl *connImpl = getImpl(connection);
@@ -258,7 +260,8 @@ Sender ContainerImpl::createSender(Connection &connection, std::string &addr) {
}
Sender ContainerImpl::createSender(std::string &urlString) {
- Connection conn = connect(urlString);
+ if (!reactor) throw ProtonException(MSG("Container not started"));
+ Connection conn = connect(urlString, 0);
Session session = getDefaultSession(conn.getPnConnection(), &getImpl(conn)->defaultSession);
std::string path = Url(urlString).getPath();
Sender snd = session.createSender(containerId + '-' + path);
@@ -270,6 +273,7 @@ Sender ContainerImpl::createSender(std::string &urlString) {
}
Receiver ContainerImpl::createReceiver(Connection &connection, std::string &addr) {
+ if (!reactor) throw ProtonException(MSG("Container not started"));
ConnectionImpl *connImpl = getImpl(connection);
Session session = getDefaultSession(connImpl->pnConnection, &connImpl->defaultSession);
Receiver rcv = session.createReceiver(containerId + '-' + addr);
@@ -279,8 +283,9 @@ Receiver ContainerImpl::createReceiver(Connection &connection, std::string &addr
}
Receiver ContainerImpl::createReceiver(const std::string &urlString) {
+ if (!reactor) throw ProtonException(MSG("Container not started"));
// TODO: const cleanup of API
- Connection conn = connect(const_cast<std::string &>(urlString));
+ Connection conn = connect(const_cast<std::string &>(urlString), 0);
Session session = getDefaultSession(conn.getPnConnection(), &getImpl(conn)->defaultSession);
std::string path = Url(urlString).getPath();
Receiver rcv = session.createReceiver(containerId + '-' + path);
@@ -298,50 +303,76 @@ Acceptor ContainerImpl::acceptor(const std::string &host, const std::string &por
}
Acceptor ContainerImpl::listen(const std::string &urlString) {
+ if (!reactor) throw ProtonException(MSG("Container not started"));
Url url(urlString);
// TODO: SSL
return acceptor(url.getHost(), url.getPort());
}
-void ContainerImpl::run() {
+pn_handler_t *ContainerImpl::wrapHandler(Handler *h) {
+ return cpp_handler(this, h);
+}
+
+
+void ContainerImpl::initializeReactor() {
+ if (reactor) throw ProtonException(MSG("Container already running"));
reactor = pn_reactor();
// Set our context on the reactor
setContainerContext(reactor, this);
- int prefetch = messagingHandler.prefetch;
- Handler *flowController = 0;
-
- // Set the reactor's main/default handler (see note below)
- if (prefetch) {
- flowController = new CFlowController(prefetch);
- messagingHandler.addChildHandler(*flowController);
+ if (handler) {
+ pn_handler_t *cppHandler = cpp_handler(this, handler);
+ pn_reactor_set_handler(reactor, cppHandler);
+ pn_decref(cppHandler);
}
- MessagingAdapter messagingAdapter(messagingHandler);
- messagingHandler.addChildHandler(messagingAdapter);
- pn_handler_t *cppHandler = cpp_handler(this, &messagingHandler);
- pn_reactor_set_handler(reactor, cppHandler);
// Set our own global handler that "subclasses" the existing one
- pn_handler_t *cGlobalHandler = pn_reactor_get_global_handler(reactor);
- pn_incref(cGlobalHandler);
- OverrideHandler overrideHandler(cGlobalHandler);
- pn_handler_t *cppGlobalHandler = cpp_handler(this, &overrideHandler);
+ pn_handler_t *globalHandler = pn_reactor_get_global_handler(reactor);
+ overrideHandler = new OverrideHandler(globalHandler);
+ pn_handler_t *cppGlobalHandler = cpp_handler(this, overrideHandler);
pn_reactor_set_global_handler(reactor, cppGlobalHandler);
+ pn_decref(cppGlobalHandler);
// Note: we have just set up the following 4/5 handlers that see events in this order:
// messagingHandler (Proton C events), pn_flowcontroller (optional), messagingAdapter,
- // messagingHandler (Messaging events from the messagingAdapter), connector override,
- // the reactor's default globalhandler (pn_iohandler)
+ // messagingHandler (Messaging events from the messagingAdapter, i.e. the delegate),
+ // connector override, the reactor's default globalhandler (pn_iohandler)
+}
+
+void ContainerImpl::run() {
+ initializeReactor();
pn_reactor_run(reactor);
+}
- pn_decref(cppHandler);
- pn_decref(cppGlobalHandler);
- pn_decref(cGlobalHandler);
- pn_reactor_free(reactor);
- reactor = 0;
- delete(flowController);
+void ContainerImpl::start() {
+ initializeReactor();
+ pn_reactor_start(reactor);
+}
+
+bool ContainerImpl::process() {
+ if (!reactor) throw ProtonException(MSG("Container not started"));
+ bool result = pn_reactor_process(reactor);
+ // TODO: check errors
+ return result;
+}
+
+void ContainerImpl::stop() {
+ if (!reactor) throw ProtonException(MSG("Container not started"));
+ pn_reactor_stop(reactor);
+ // TODO: check errors
+}
+
+void ContainerImpl::wakeup() {
+ if (!reactor) throw ProtonException(MSG("Container not started"));
+ pn_reactor_wakeup(reactor);
+ // TODO: check errors
+}
+
+bool ContainerImpl::isQuiesced() {
+ if (!reactor) throw ProtonException(MSG("Container not started"));
+ return pn_reactor_quiesced(reactor);
}
}} // namespace proton::reactor
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/a4565b19/proton-c/bindings/cpp/src/ContainerImpl.h
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/src/ContainerImpl.h b/proton-c/bindings/cpp/src/ContainerImpl.h
index f7b5b9e..65a6651 100644
--- a/proton-c/bindings/cpp/src/ContainerImpl.h
+++ b/proton-c/bindings/cpp/src/ContainerImpl.h
@@ -25,6 +25,7 @@
#include "proton/cpp/MessagingHandler.h"
#include "proton/cpp/Connection.h"
#include "proton/cpp/Link.h"
+#include "proton/cpp/Duration.h"
#include "proton/reactor.h"
@@ -40,26 +41,37 @@ class Acceptor;
class ContainerImpl
{
public:
- PROTON_CPP_EXTERN ContainerImpl(MessagingHandler &mhandler);
+ PROTON_CPP_EXTERN ContainerImpl(Handler &h);
+ PROTON_CPP_EXTERN ContainerImpl();
PROTON_CPP_EXTERN ~ContainerImpl();
- PROTON_CPP_EXTERN Connection connect(std::string &host);
+ PROTON_CPP_EXTERN Connection connect(std::string &host, Handler *h);
PROTON_CPP_EXTERN void run();
PROTON_CPP_EXTERN pn_reactor_t *getReactor();
- PROTON_CPP_EXTERN pn_handler_t *getGlobalHandler();
- PROTON_CPP_EXTERN Sender createSender(Connection &connection, std::string &addr);
+ PROTON_CPP_EXTERN Sender createSender(Connection &connection, std::string &addr, Handler *h);
PROTON_CPP_EXTERN Sender createSender(std::string &url);
PROTON_CPP_EXTERN Receiver createReceiver(Connection &connection, std::string &addr);
PROTON_CPP_EXTERN Receiver createReceiver(const std::string &url);
PROTON_CPP_EXTERN Acceptor listen(const std::string &url);
PROTON_CPP_EXTERN std::string getContainerId();
+ PROTON_CPP_EXTERN Duration getTimeout();
+ PROTON_CPP_EXTERN void setTimeout(Duration timeout);
+ void start();
+ bool process();
+ void stop();
+ void wakeup();
+ bool isQuiesced();
+ pn_handler_t *wrapHandler(Handler *h);
static void incref(ContainerImpl *);
static void decref(ContainerImpl *);
private:
void dispatch(pn_event_t *event, pn_event_type_t type);
Acceptor acceptor(const std::string &host, const std::string &port);
+ void initializeReactor();
pn_reactor_t *reactor;
- pn_handler_t *globalHandler;
- MessagingHandler &messagingHandler;
+ Handler *handler;
+ MessagingAdapter *messagingAdapter;
+ Handler *overrideHandler;
+ Handler *flowController;
std::string containerId;
int refCount;
};
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/a4565b19/proton-c/bindings/cpp/src/Duration.cpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/src/Duration.cpp b/proton-c/bindings/cpp/src/Duration.cpp
new file mode 100644
index 0000000..f4155d9
--- /dev/null
+++ b/proton-c/bindings/cpp/src/Duration.cpp
@@ -0,0 +1,55 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "proton/cpp/Duration.h"
+#include <limits>
+
+namespace proton {
+namespace reactor {
+
+Duration::Duration(uint64_t ms) : milliseconds(ms) {}
+uint64_t Duration::getMilliseconds() const { return milliseconds; }
+
+Duration operator*(const Duration& duration, uint64_t multiplier)
+{
+ return Duration(duration.getMilliseconds() * multiplier);
+}
+
+Duration operator*(uint64_t multiplier, const Duration& duration)
+{
+ return Duration(duration.getMilliseconds() * multiplier);
+}
+
+bool operator==(const Duration& a, const Duration& b)
+{
+ return a.getMilliseconds() == b.getMilliseconds();
+}
+
+bool operator!=(const Duration& a, const Duration& b)
+{
+ return a.getMilliseconds() != b.getMilliseconds();
+}
+
+const Duration Duration::FOREVER(std::numeric_limits<uint64_t>::max());
+const Duration Duration::IMMEDIATE(0);
+const Duration Duration::SECOND(1000);
+const Duration Duration::MINUTE(SECOND * 60);
+
+}} // namespace proton::reactor
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/a4565b19/proton-c/bindings/cpp/src/Link.cpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/src/Link.cpp b/proton-c/bindings/cpp/src/Link.cpp
index aab01d9..59cf039 100644
--- a/proton-c/bindings/cpp/src/Link.cpp
+++ b/proton-c/bindings/cpp/src/Link.cpp
@@ -96,6 +96,10 @@ Terminus Link::getRemoteTarget() {
return Terminus(pn_link_remote_target(impl), this);
}
+std::string Link::getName() {
+ return std::string(pn_link_name(impl));
+}
+
Connection &Link::getConnection() {
pn_session_t *s = pn_link_session(impl);
pn_connection_t *c = pn_session_connection(s);
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/a4565b19/proton-c/bindings/cpp/src/MessagingAdapter.cpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/src/MessagingAdapter.cpp b/proton-c/bindings/cpp/src/MessagingAdapter.cpp
index f2916db..625485e 100644
--- a/proton-c/bindings/cpp/src/MessagingAdapter.cpp
+++ b/proton-c/bindings/cpp/src/MessagingAdapter.cpp
@@ -32,14 +32,12 @@
namespace proton {
namespace reactor {
-
MessagingAdapter::MessagingAdapter(MessagingHandler &delegate_) :
- autoSettle(delegate_.autoSettle),
- autoAccept(delegate_.autoAccept),
- peerCloseIsError(delegate_.peerCloseIsError),
+ MessagingHandler(true, delegate_.prefetch, delegate_.autoSettle, delegate_.autoAccept, delegate_.peerCloseIsError),
delegate(delegate_)
{};
+
MessagingAdapter::~MessagingAdapter(){};
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/a4565b19/proton-c/bindings/cpp/src/MessagingHandler.cpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/src/MessagingHandler.cpp b/proton-c/bindings/cpp/src/MessagingHandler.cpp
index 7a4a5cb..925186a 100644
--- a/proton-c/bindings/cpp/src/MessagingHandler.cpp
+++ b/proton-c/bindings/cpp/src/MessagingHandler.cpp
@@ -26,11 +26,64 @@
namespace proton {
namespace reactor {
+namespace {
+class CFlowController : public ProtonHandler
+{
+ public:
+ pn_handler_t *flowcontroller;
+
+ CFlowController(int window) : flowcontroller(pn_flowcontroller(window)) {}
+ ~CFlowController() {
+ pn_decref(flowcontroller);
+ }
+
+ void redirect(Event &e) {
+ ProtonEvent *pne = dynamic_cast<ProtonEvent *>(&e);
+ pn_handler_dispatch(flowcontroller, pne->getPnEvent(), (pn_event_type_t) pne->getType());
+ }
+
+ virtual void onLinkLocalOpen(Event &e) { redirect(e); }
+ virtual void onLinkRemoteOpen(Event &e) { redirect(e); }
+ virtual void onLinkFlow(Event &e) { redirect(e); }
+ virtual void onDelivery(Event &e) { redirect(e); }
+};
+
+} // namespace
+
+
+
+
MessagingHandler::MessagingHandler(int prefetch0, bool autoAccept0, bool autoSettle0, bool peerCloseIsError0) :
prefetch(prefetch0), autoAccept(autoAccept0), autoSettle(autoSettle0), peerCloseIsError(peerCloseIsError0)
-{}
+{
+ createHelpers();
+}
+
+MessagingHandler::MessagingHandler(bool rawHandler, int prefetch0, bool autoAccept0, bool autoSettle0,
+ bool peerCloseIsError0) :
+ prefetch(prefetch0), autoAccept(autoAccept0), autoSettle(autoSettle0), peerCloseIsError(peerCloseIsError0)
+{
+ if (rawHandler) {
+ flowController = 0;
+ messagingAdapter = 0;
+ } else {
+ createHelpers();
+ }
+}
+
+void MessagingHandler::createHelpers() {
+ if (prefetch > 0) {
+ flowController = new CFlowController(prefetch);
+ addChildHandler(*flowController);
+ }
+ messagingAdapter = new MessagingAdapter(*this);
+ addChildHandler(*messagingAdapter);
+}
-MessagingHandler::~MessagingHandler(){};
+MessagingHandler::~MessagingHandler(){
+ delete flowController;
+ delete messagingAdapter;
+};
void MessagingHandler::onAbort(Event &e) { onUnhandled(e); }
void MessagingHandler::onAccepted(Event &e) { onUnhandled(e); }
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/a4565b19/proton-c/bindings/cpp/src/Sender.cpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/src/Sender.cpp b/proton-c/bindings/cpp/src/Sender.cpp
index c521ad1..c8f962e 100644
--- a/proton-c/bindings/cpp/src/Sender.cpp
+++ b/proton-c/bindings/cpp/src/Sender.cpp
@@ -54,7 +54,7 @@ namespace{
uint64_t tagCounter = 0;
}
-void Sender::send(Message &message) {
+Delivery Sender::send(Message &message) {
char tag[8];
void *ptr = &tag;
uint64_t id = ++tagCounter;
@@ -67,6 +67,7 @@ void Sender::send(Message &message) {
pn_link_advance(link);
if (pn_link_snd_settle_mode(link) == PN_SND_SETTLED)
pn_delivery_settle(dlv);
+ return Delivery(dlv);
}
}} // namespace proton::reactor
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/a4565b19/proton-c/bindings/cpp/src/blocking/BlockingConnection.cpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/src/blocking/BlockingConnection.cpp b/proton-c/bindings/cpp/src/blocking/BlockingConnection.cpp
new file mode 100644
index 0000000..3fb6010
--- /dev/null
+++ b/proton-c/bindings/cpp/src/blocking/BlockingConnection.cpp
@@ -0,0 +1,62 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "proton/cpp/Container.h"
+#include "proton/cpp/BlockingConnection.h"
+#include "proton/cpp/BlockingSender.h"
+#include "proton/cpp/MessagingHandler.h"
+#include "proton/cpp/exceptions.h"
+#include "Msg.h"
+#include "BlockingConnectionImpl.h"
+#include "PrivateImplRef.h"
+
+namespace proton {
+namespace reactor {
+
+template class Handle<BlockingConnectionImpl>;
+typedef PrivateImplRef<BlockingConnection> PI;
+
+BlockingConnection::BlockingConnection() {PI::ctor(*this, 0); }
+
+BlockingConnection::BlockingConnection(const BlockingConnection& c) : Handle<BlockingConnectionImpl>() { PI::copy(*this, c); }
+
+BlockingConnection& BlockingConnection::operator=(const BlockingConnection& c) { return PI::assign(*this, c); }
+BlockingConnection::~BlockingConnection() { PI::dtor(*this); }
+
+BlockingConnection::BlockingConnection(std::string &url, Duration d, SslDomain *ssld, Container *c) {
+ BlockingConnectionImpl *cimpl = new BlockingConnectionImpl(url, d,ssld, c);
+ PI::ctor(*this, cimpl);
+}
+
+void BlockingConnection::close() { impl->close(); }
+
+void BlockingConnection::wait(WaitCondition &cond) { return impl->wait(cond); }
+void BlockingConnection::wait(WaitCondition &cond, std::string &msg, Duration timeout) {
+ return impl->wait(cond, msg, timeout);
+}
+
+BlockingSender BlockingConnection::createSender(std::string &address, Handler *h) {
+ Sender sender = impl->container.createSender(impl->connection, address, h);
+ return BlockingSender(*this, sender);
+}
+
+Duration BlockingConnection::getTimeout() { return impl->getTimeout(); }
+
+}} // namespace proton::reactor
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/a4565b19/proton-c/bindings/cpp/src/blocking/BlockingConnectionImpl.cpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/src/blocking/BlockingConnectionImpl.cpp b/proton-c/bindings/cpp/src/blocking/BlockingConnectionImpl.cpp
new file mode 100644
index 0000000..bdda697
--- /dev/null
+++ b/proton-c/bindings/cpp/src/blocking/BlockingConnectionImpl.cpp
@@ -0,0 +1,124 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "proton/cpp/Container.h"
+#include "proton/cpp/MessagingHandler.h"
+#include "proton/cpp/Duration.h"
+#include "proton/cpp/exceptions.h"
+#include "proton/cpp/WaitCondition.h"
+#include "BlockingConnectionImpl.h"
+#include "Msg.h"
+#include "contexts.h"
+
+#include "proton/connection.h"
+
+namespace proton {
+namespace reactor {
+
+WaitCondition::~WaitCondition() {}
+
+
+void BlockingConnectionImpl::incref(BlockingConnectionImpl *impl) {
+ impl->refCount++;
+}
+
+void BlockingConnectionImpl::decref(BlockingConnectionImpl *impl) {
+ impl->refCount--;
+ if (impl->refCount == 0)
+ delete impl;
+}
+
+namespace {
+struct ConnectionOpening : public WaitCondition {
+ ConnectionOpening(pn_connection_t *c) : pnConnection(c) {}
+ bool achieved() { return (pn_connection_state(pnConnection) & PN_REMOTE_UNINIT); }
+ pn_connection_t *pnConnection;
+};
+
+struct ConnectionClosed : public WaitCondition {
+ ConnectionClosed(pn_connection_t *c) : pnConnection(c) {}
+ bool achieved() { return !(pn_connection_state(pnConnection) & PN_REMOTE_ACTIVE); }
+ pn_connection_t *pnConnection;
+};
+
+}
+
+
+BlockingConnectionImpl::BlockingConnectionImpl(std::string &u, Duration timeout0, SslDomain *ssld, Container *c)
+ : url(u), timeout(timeout0), refCount(0)
+{
+ if (c)
+ container = *c;
+ container.start();
+ container.setTimeout(timeout);
+ // Create connection and send the connection events here
+ connection = container.connect(url, static_cast<Handler *>(this));
+ ConnectionOpening cond(connection.getPnConnection());
+ wait(cond);
+}
+
+BlockingConnectionImpl::~BlockingConnectionImpl() {
+ container = Container();
+}
+
+void BlockingConnectionImpl::close() {
+ connection.close();
+ ConnectionClosed cond(connection.getPnConnection());
+ wait(cond);
+}
+
+void BlockingConnectionImpl::wait(WaitCondition &condition) {
+ std::string empty;
+ wait(condition, empty, timeout);
+}
+
+void BlockingConnectionImpl::wait(WaitCondition &condition, std::string &msg, Duration waitTimeout) {
+ if (waitTimeout == Duration::FOREVER) {
+ while (!condition.achieved()) {
+ container.process();
+ }
+ }
+
+ pn_reactor_t *reactor = container.getReactor();
+ pn_millis_t origTimeout = pn_reactor_get_timeout(reactor);
+ pn_reactor_set_timeout(reactor, waitTimeout.getMilliseconds());
+ try {
+ pn_timestamp_t now = pn_reactor_mark(reactor);
+ pn_timestamp_t deadline = now + waitTimeout.getMilliseconds();
+ while (!condition.achieved()) {
+ container.process();
+ if (deadline < pn_reactor_mark(reactor)) {
+ std::string txt = "Connection timed out";
+ if (!msg.empty())
+ txt += ": " + msg;
+ // TODO: proper Timeout exception
+ throw ProtonException(MSG(txt));
+ }
+ }
+ } catch (...) {
+ pn_reactor_set_timeout(reactor, origTimeout);
+ throw;
+ }
+ pn_reactor_set_timeout(reactor, origTimeout);
+}
+
+
+
+}} // namespace proton::reactor
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/a4565b19/proton-c/bindings/cpp/src/blocking/BlockingConnectionImpl.h
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/src/blocking/BlockingConnectionImpl.h b/proton-c/bindings/cpp/src/blocking/BlockingConnectionImpl.h
new file mode 100644
index 0000000..5f263ab
--- /dev/null
+++ b/proton-c/bindings/cpp/src/blocking/BlockingConnectionImpl.h
@@ -0,0 +1,63 @@
+#ifndef PROTON_CPP_CONNECTIONIMPL_H
+#define PROTON_CPP_CONNECTIONIMPL_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "proton/cpp/ImportExport.h"
+#include "proton/cpp/Endpoint.h"
+#include "proton/cpp/Container.h"
+#include "proton/types.h"
+#include <string>
+
+struct pn_connection_t;
+
+namespace proton {
+namespace reactor {
+
+class Handler;
+class Container;
+class SslDomain;
+
+ class BlockingConnectionImpl : public MessagingHandler
+{
+ public:
+ PROTON_CPP_EXTERN BlockingConnectionImpl(std::string &url, Duration d, SslDomain *ssld, Container *c);
+ PROTON_CPP_EXTERN ~BlockingConnectionImpl();
+ PROTON_CPP_EXTERN void close();
+ PROTON_CPP_EXTERN void wait(WaitCondition &condition);
+ PROTON_CPP_EXTERN void wait(WaitCondition &condition, std::string &msg, Duration timeout);
+ PROTON_CPP_EXTERN pn_connection_t *getPnBlockingConnection();
+ Duration getTimeout() { return timeout; }
+ static void incref(BlockingConnectionImpl *);
+ static void decref(BlockingConnectionImpl *);
+ private:
+ friend class BlockingConnection;
+ Container container;
+ Connection connection;
+ std::string url;
+ Duration timeout;
+ int refCount;
+};
+
+
+}} // namespace proton::reactor
+
+#endif /*!PROTON_CPP_CONNECTIONIMPL_H*/
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/a4565b19/proton-c/bindings/cpp/src/blocking/BlockingLink.cpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/src/blocking/BlockingLink.cpp b/proton-c/bindings/cpp/src/blocking/BlockingLink.cpp
new file mode 100644
index 0000000..5a572ae
--- /dev/null
+++ b/proton-c/bindings/cpp/src/blocking/BlockingLink.cpp
@@ -0,0 +1,86 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "proton/cpp/BlockingLink.h"
+#include "proton/cpp/BlockingConnection.h"
+#include "proton/cpp/MessagingHandler.h"
+#include "proton/cpp/WaitCondition.h"
+#include "proton/cpp/exceptions.h"
+#include "Msg.h"
+
+
+namespace proton {
+namespace reactor {
+
+namespace {
+struct LinkOpened : public WaitCondition {
+ LinkOpened(pn_link_t *l) : pnLink(l) {}
+ bool achieved() { return !(pn_link_state(pnLink) & PN_REMOTE_UNINIT); }
+ pn_link_t *pnLink;
+};
+
+struct LinkClosed : public WaitCondition {
+ LinkClosed(pn_link_t *l) : pnLink(l) {}
+ bool achieved() { return (pn_link_state(pnLink) & PN_REMOTE_CLOSED); }
+ pn_link_t *pnLink;
+};
+
+struct LinkNotOpen : public WaitCondition {
+ LinkNotOpen(pn_link_t *l) : pnLink(l) {}
+ bool achieved() { return !(pn_link_state(pnLink) & PN_REMOTE_ACTIVE); }
+ pn_link_t *pnLink;
+};
+
+
+} // namespace
+
+
+BlockingLink::BlockingLink(BlockingConnection *c, pn_link_t *pnl) : connection(*c), link(pnl) {
+ std::string msg = "Opening link " + link.getName();
+ LinkOpened linkOpened(link.getPnLink());
+ connection.wait(linkOpened, msg);
+}
+
+BlockingLink::~BlockingLink() {}
+
+void BlockingLink::waitForClosed(Duration timeout) {
+ std::string msg = "Closing link " + link.getName();
+ LinkClosed linkClosed(link.getPnLink());
+ connection.wait(linkClosed, msg);
+ checkClosed();
+}
+
+void BlockingLink::checkClosed() {
+ pn_link_t * pnLink = link.getPnLink();
+ if (pn_link_state(pnLink) & PN_REMOTE_CLOSED) {
+ link.close();
+ // TODO: LinkDetached exception
+ throw ProtonException(MSG("Link detached"));
+ }
+}
+
+void BlockingLink::close() {
+ link.close();
+ std::string msg = "Closing link " + link.getName();
+ LinkNotOpen linkNotOpen(link.getPnLink());
+ connection.wait(linkNotOpen, msg);
+}
+
+}} // namespace proton::reactor
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/a4565b19/proton-c/bindings/cpp/src/blocking/BlockingSender.cpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/src/blocking/BlockingSender.cpp b/proton-c/bindings/cpp/src/blocking/BlockingSender.cpp
new file mode 100644
index 0000000..dc6b9bd
--- /dev/null
+++ b/proton-c/bindings/cpp/src/blocking/BlockingSender.cpp
@@ -0,0 +1,66 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "proton/cpp/BlockingSender.h"
+#include "proton/cpp/BlockingConnection.h"
+#include "proton/cpp/WaitCondition.h"
+#include "proton/cpp/exceptions.h"
+#include "Msg.h"
+
+
+namespace proton {
+namespace reactor {
+
+namespace {
+struct DeliverySettled : public WaitCondition {
+ DeliverySettled(pn_delivery_t *d) : pnDelivery(d) {}
+ bool achieved() { return pn_delivery_settled(pnDelivery); }
+ pn_delivery_t *pnDelivery;
+};
+
+} // namespace
+
+
+BlockingSender::BlockingSender(BlockingConnection &c, Sender &l) : BlockingLink(&c, l.getPnLink()) {
+ std::string ta = link.getTarget().getAddress();
+ std::string rta = link.getRemoteTarget().getAddress();
+ if (ta.empty() || ta.compare(rta) != 0) {
+ waitForClosed();
+ link.close();
+ std::string txt = "Failed to open sender " + link.getName() + ", target does not match";
+ throw ProtonException(MSG("Container not started"));
+ }
+}
+
+Delivery BlockingSender::send(Message &msg, Duration timeout) {
+ Sender snd = link;
+ Delivery dlv = snd.send(msg);
+ std::string txt = "Sending on sender " + link.getName();
+ DeliverySettled cond(dlv.getPnDelivery());
+ connection.wait(cond, txt, timeout);
+ return dlv;
+}
+
+Delivery BlockingSender::send(Message &msg) {
+ // Use default timeout
+ return send(msg, connection.getTimeout());
+}
+
+}} // namespace proton::reactor
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org