You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ac...@apache.org on 2015/06/03 19:26:10 UTC
[47/50] [abbrv] qpid-proton git commit: PROTON-865: most of
MessagingAdapter in place, SimpleSend/Recv
PROTON-865: most of MessagingAdapter in place, SimpleSend/Recv
Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/648f7b36
Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/648f7b36
Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/648f7b36
Branch: refs/heads/cjansen-cpp-client
Commit: 648f7b36caa26e8078b92b50391d2e9f23e16080
Parents: ad7c977
Author: Clifford Jansen <cl...@apache.org>
Authored: Tue May 5 06:48:48 2015 -0700
Committer: Alan Conway <ac...@redhat.com>
Committed: Tue Jun 2 14:46:14 2015 -0400
----------------------------------------------------------------------
proton-c/bindings/cpp/CMakeLists.txt | 4 +
proton-c/bindings/cpp/examples/SimpleRecv.cpp | 109 +++++++
proton-c/bindings/cpp/examples/SimpleSend.cpp | 117 +++++++
.../bindings/cpp/include/proton/cpp/Container.h | 1 +
.../bindings/cpp/include/proton/cpp/Message.h | 23 +-
.../cpp/include/proton/cpp/MessagingAdapter.h | 30 +-
.../cpp/include/proton/cpp/MessagingEvent.h | 16 +-
.../cpp/include/proton/cpp/MessagingHandler.h | 13 +-
.../bindings/cpp/include/proton/cpp/Session.h | 8 +-
proton-c/bindings/cpp/src/Connection.cpp | 2 +-
proton-c/bindings/cpp/src/ConnectionImpl.cpp | 12 +-
proton-c/bindings/cpp/src/Connector.cpp | 5 +-
proton-c/bindings/cpp/src/Container.cpp | 4 +
proton-c/bindings/cpp/src/ContainerImpl.cpp | 87 +++--
proton-c/bindings/cpp/src/ContainerImpl.h | 1 +
proton-c/bindings/cpp/src/Message.cpp | 194 ++++++++++--
proton-c/bindings/cpp/src/MessagingAdapter.cpp | 316 ++++++++++++++++---
proton-c/bindings/cpp/src/MessagingEvent.cpp | 31 +-
proton-c/bindings/cpp/src/MessagingHandler.cpp | 12 +-
proton-c/bindings/cpp/src/Session.cpp | 29 +-
proton-c/bindings/cpp/src/contexts.cpp | 47 +--
proton-c/bindings/cpp/src/contexts.h | 4 +
22 files changed, 897 insertions(+), 168 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/648f7b36/proton-c/bindings/cpp/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/CMakeLists.txt b/proton-c/bindings/cpp/CMakeLists.txt
index 7e456a5..68baad7 100644
--- a/proton-c/bindings/cpp/CMakeLists.txt
+++ b/proton-c/bindings/cpp/CMakeLists.txt
@@ -93,6 +93,10 @@ add_executable (HelloWorld examples/HelloWorld.cpp)
target_link_libraries (HelloWorld qpid-proton-cpp)
add_executable (HelloWorldDirect examples/HelloWorldDirect.cpp)
target_link_libraries (HelloWorldDirect qpid-proton-cpp)
+add_executable (SimpleRecv examples/SimpleRecv.cpp)
+target_link_libraries (SimpleRecv qpid-proton-cpp)
+add_executable (SimpleSend examples/SimpleSend.cpp)
+target_link_libraries (SimpleSend qpid-proton-cpp)
install (TARGETS qpid-proton-cpp
EXPORT proton
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/648f7b36/proton-c/bindings/cpp/examples/SimpleRecv.cpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/examples/SimpleRecv.cpp b/proton-c/bindings/cpp/examples/SimpleRecv.cpp
new file mode 100644
index 0000000..22778f0
--- /dev/null
+++ b/proton-c/bindings/cpp/examples/SimpleRecv.cpp
@@ -0,0 +1,109 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "proton/cpp/Container.h"
+#include "proton/cpp/MessagingHandler.h"
+#include "proton/cpp/Link.h"
+
+#include <iostream>
+#include <stdlib.h>
+#include <stdio.h>
+#include <string.h>
+
+
+using namespace proton::reactor;
+
+class Recv : public MessagingHandler {
+ private:
+ std::string url;
+ int expected;
+ int received;
+ public:
+
+ Recv(const std::string &s, int c) : url(s), expected(c), received(0) {}
+
+ void onStart(Event &e) {
+ e.getContainer().createReceiver(url);
+ }
+
+ void onMessage(Event &e) {
+ uint64_t id = 0;
+ Message msg = e.getMessage();
+ if (msg.getIdType() == PN_ULONG) {
+ id = msg.getId();
+ if (id < received)
+ return; // ignore duplicate
+ }
+ if (expected == 0 || received < expected) {
+ std::cout << '[' << id << "]: " << msg.getBody() << std::endl;
+ received++;
+ if (received == expected) {
+ e.getReceiver().close();
+ e.getConnection().close();
+ }
+ }
+ }
+};
+
+static void parse_options(int argc, char **argv, int &count, std::string &addr);
+
+int main(int argc, char **argv) {
+ int messageCount = 100;
+ std::string address("localhost:5672/examples");
+ parse_options(argc, argv, messageCount, address);
+ Recv recv(address, messageCount);
+ Container(recv).run();
+}
+
+
+static void usage() {
+ std::cout << "Usage: SimpleRecv -m message_count -a address:" << std::endl;
+ exit (1);
+}
+
+
+static void parse_options(int argc, char **argv, int &count, std::string &addr) {
+ int c, i;
+ for (i = 1; i < argc; i++) {
+ if (strlen(argv[i]) == 2 && argv[i][0] == '-') {
+ c = argv[i][1];
+ const char *nextarg = i < argc ? argv[i+1] : NULL;
+
+ switch (c) {
+ case 'a':
+ if (!nextarg) usage();
+ addr = nextarg;
+ i++;
+ break;
+ case 'm':
+ if (!nextarg) usage();
+ unsigned newc;
+ if (sscanf( nextarg, "%d", &newc) != 1) usage();
+ count = newc;
+ i++;
+ break;
+ default:
+ usage();
+ }
+ }
+ else usage();
+ }
+}
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/648f7b36/proton-c/bindings/cpp/examples/SimpleSend.cpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/examples/SimpleSend.cpp b/proton-c/bindings/cpp/examples/SimpleSend.cpp
new file mode 100644
index 0000000..89ca39d
--- /dev/null
+++ b/proton-c/bindings/cpp/examples/SimpleSend.cpp
@@ -0,0 +1,117 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "proton/cpp/Container.h"
+#include "proton/cpp/MessagingHandler.h"
+#include "proton/cpp/Connection.h"
+
+#include <iostream>
+#include <string.h>
+#include <stdlib.h>
+#include <stdio.h>
+
+
+using namespace proton::reactor;
+
+class Send : public MessagingHandler {
+ private:
+ std::string url;
+ int sent;
+ int confirmed;
+ int total;
+ public:
+
+ Send(const std::string &s, int c) : url(s), sent(0), confirmed(0), total(c) {}
+
+ void onStart(Event &e) {
+ e.getContainer().createSender(url);
+ }
+
+ void onSendable(Event &e) {
+ Sender sender = e.getSender();
+ while (sender.getCredit() && sent < total) {
+ Message msg;
+ msg.setId(sent + 1);
+ // TODO: fancy map body content as in Python example. Simple binary for now.
+ const char *bin = "some arbitrary binary data";
+ msg.setBody(bin, strlen(bin));
+ sender.send(msg);
+ sent++;
+ }
+ }
+
+ void onAccepted(Event &e) {
+ confirmed++;
+ if (confirmed == total) {
+ std::cout << "all messages confirmed" << std::endl;
+ e.getConnection().close();
+ }
+ }
+
+ void onDisconnected(Event &e) {
+ sent = confirmed;
+ }
+};
+
+static void parse_options(int argc, char **argv, int &count, std::string &addr);
+
+int main(int argc, char **argv) {
+ int messageCount = 100;
+ std::string address("localhost:5672/examples");
+ parse_options(argc, argv, messageCount, address);
+ Send send(address, messageCount);
+ Container(send).run();
+}
+
+
+static void usage() {
+ std::cout << "Usage: SimpleSend -m message_count -a address:" << std::endl;
+ exit (1);
+}
+
+
+static void parse_options(int argc, char **argv, int &count, std::string &addr) {
+ int c, i;
+ for (i = 1; i < argc; i++) {
+ if (strlen(argv[i]) == 2 && argv[i][0] == '-') {
+ c = argv[i][1];
+ const char *nextarg = i < argc ? argv[i+1] : NULL;
+
+ switch (c) {
+ case 'a':
+ if (!nextarg) usage();
+ addr = nextarg;
+ i++;
+ break;
+ case 'm':
+ if (!nextarg) usage();
+ unsigned newc;
+ if (sscanf( nextarg, "%d", &newc) != 1) usage();
+ count = newc;
+ i++;
+ break;
+ default:
+ usage();
+ }
+ }
+ else usage();
+ }
+}
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/648f7b36/proton-c/bindings/cpp/include/proton/cpp/Container.h
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/include/proton/cpp/Container.h b/proton-c/bindings/cpp/include/proton/cpp/Container.h
index fbb1a83..d596ab1 100644
--- a/proton-c/bindings/cpp/include/proton/cpp/Container.h
+++ b/proton-c/bindings/cpp/include/proton/cpp/Container.h
@@ -56,6 +56,7 @@ class Container : public Handle<ContainerImpl>
PROTON_CPP_EXTERN Sender createSender(Connection &connection, std::string &addr);
PROTON_CPP_EXTERN Sender createSender(std::string &url);
PROTON_CPP_EXTERN Receiver createReceiver(Connection &connection, std::string &addr);
+ PROTON_CPP_EXTERN Receiver createReceiver(const std::string &url);
PROTON_CPP_EXTERN Acceptor listen(const std::string &url);
PROTON_CPP_EXTERN std::string getContainerId();
private:
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/648f7b36/proton-c/bindings/cpp/include/proton/cpp/Message.h
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/include/proton/cpp/Message.h b/proton-c/bindings/cpp/include/proton/cpp/Message.h
index 51ca731..ae29ca2 100644
--- a/proton-c/bindings/cpp/include/proton/cpp/Message.h
+++ b/proton-c/bindings/cpp/include/proton/cpp/Message.h
@@ -22,6 +22,7 @@
*
*/
#include "proton/cpp/ImportExport.h"
+#include "proton/cpp/ProtonHandle.h"
#include "proton/message.h"
#include <string>
@@ -29,22 +30,36 @@
namespace proton {
namespace reactor {
-class Message
+class Message : public ProtonHandle<pn_message_t>
{
public:
PROTON_CPP_EXTERN Message();
- PROTON_CPP_EXTERN ~Message();
+ PROTON_CPP_EXTERN Message(pn_message_t *);
PROTON_CPP_EXTERN Message(const Message&);
PROTON_CPP_EXTERN Message& operator=(const Message&);
+ PROTON_CPP_EXTERN ~Message();
+
+ PROTON_CPP_EXTERN pn_message_t *getPnMessage() const;
+
+ PROTON_CPP_EXTERN void setId(uint64_t id);
+ PROTON_CPP_EXTERN uint64_t getId();
+ PROTON_CPP_EXTERN pn_type_t getIdType();
- PROTON_CPP_EXTERN pn_message_t *getPnMessage();
PROTON_CPP_EXTERN void setBody(const std::string &data);
PROTON_CPP_EXTERN std::string getBody();
+
+ PROTON_CPP_EXTERN void getBody(std::string &str);
+
+ PROTON_CPP_EXTERN void setBody(const char *, size_t len);
+ PROTON_CPP_EXTERN size_t getBody(char *, size_t len);
+ PROTON_CPP_EXTERN size_t getBinaryBodySize();
+
+
PROTON_CPP_EXTERN void encode(std::string &data);
PROTON_CPP_EXTERN void decode(const std::string &data);
private:
- pn_message_t *pnMessage;
+ friend class ProtonImplRef<Message>;
};
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/648f7b36/proton-c/bindings/cpp/include/proton/cpp/MessagingAdapter.h
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/include/proton/cpp/MessagingAdapter.h b/proton-c/bindings/cpp/include/proton/cpp/MessagingAdapter.h
index 8551c9c..ac8b483 100644
--- a/proton-c/bindings/cpp/include/proton/cpp/MessagingAdapter.h
+++ b/proton-c/bindings/cpp/include/proton/cpp/MessagingAdapter.h
@@ -32,10 +32,10 @@
namespace proton {
namespace reactor {
-// For now, stands in for Python's: EndpointStateHandler, IncomingMessageHandler, OutgoingMessageHandler
+// Combine's Python's: EndpointStateHandler, IncomingMessageHandler, OutgoingMessageHandler
-class MessagingAdapter : public ProtonHandler
+class MessagingAdapter : public MessagingHandler
{
public:
PROTON_CPP_EXTERN MessagingAdapter(MessagingHandler &delegate);
@@ -44,11 +44,37 @@ class MessagingAdapter : public ProtonHandler
PROTON_CPP_EXTERN virtual void onLinkFlow(Event &e);
PROTON_CPP_EXTERN virtual void onDelivery(Event &e);
PROTON_CPP_EXTERN virtual void onUnhandled(Event &e);
+ PROTON_CPP_EXTERN virtual void onConnectionClosed(Event &e);
+ PROTON_CPP_EXTERN virtual void onConnectionClosing(Event &e);
+ PROTON_CPP_EXTERN virtual void onConnectionError(Event &e);
+ PROTON_CPP_EXTERN virtual void onConnectionLocalOpen(Event &e);
+ PROTON_CPP_EXTERN virtual void onConnectionRemoteOpen(Event &e);
PROTON_CPP_EXTERN virtual void onConnectionRemoteClose(Event &e);
+ PROTON_CPP_EXTERN virtual void onConnectionOpened(Event &e);
+ PROTON_CPP_EXTERN virtual void onConnectionOpening(Event &e);
+ PROTON_CPP_EXTERN virtual void onSessionClosed(Event &e);
+ PROTON_CPP_EXTERN virtual void onSessionClosing(Event &e);
+ PROTON_CPP_EXTERN virtual void onSessionError(Event &e);
+ PROTON_CPP_EXTERN virtual void onSessionLocalOpen(Event &e);
+ PROTON_CPP_EXTERN virtual void onSessionRemoteOpen(Event &e);
+ PROTON_CPP_EXTERN virtual void onSessionRemoteClose(Event &e);
+ PROTON_CPP_EXTERN virtual void onSessionOpened(Event &e);
+ PROTON_CPP_EXTERN virtual void onSessionOpening(Event &e);
+ PROTON_CPP_EXTERN virtual void onLinkClosed(Event &e);
+ PROTON_CPP_EXTERN virtual void onLinkClosing(Event &e);
+ PROTON_CPP_EXTERN virtual void onLinkError(Event &e);
+ PROTON_CPP_EXTERN virtual void onLinkLocalOpen(Event &e);
PROTON_CPP_EXTERN virtual void onLinkRemoteOpen(Event &e);
+ PROTON_CPP_EXTERN virtual void onLinkRemoteClose(Event &e);
+ PROTON_CPP_EXTERN virtual void onLinkOpened(Event &e);
+ PROTON_CPP_EXTERN virtual void onLinkOpening(Event &e);
+ PROTON_CPP_EXTERN virtual void onTransportTailClosed(Event &e);
private:
MessagingHandler &delegate; // The actual MessagingHandler
pn_handler_t *handshaker;
+ bool autoSettle;
+ bool autoAccept;
+ bool peerCloseIsError;
};
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/648f7b36/proton-c/bindings/cpp/include/proton/cpp/MessagingEvent.h
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/include/proton/cpp/MessagingEvent.h b/proton-c/bindings/cpp/include/proton/cpp/MessagingEvent.h
index d8d5c7f..de79618 100644
--- a/proton-c/bindings/cpp/include/proton/cpp/MessagingEvent.h
+++ b/proton-c/bindings/cpp/include/proton/cpp/MessagingEvent.h
@@ -37,17 +37,19 @@ typedef enum {
PN_MESSAGING_ABORT,
PN_MESSAGING_ACCEPTED,
PN_MESSAGING_COMMIT,
- PN_MESSAGING_CONNECTION_CLOSE,
PN_MESSAGING_CONNECTION_CLOSED,
PN_MESSAGING_CONNECTION_CLOSING,
- PN_MESSAGING_CONNECTION_OPEN,
+ PN_MESSAGING_CONNECTION_ERROR,
PN_MESSAGING_CONNECTION_OPENED,
+ PN_MESSAGING_CONNECTION_OPENING,
PN_MESSAGING_DISCONNECTED,
PN_MESSAGING_FETCH,
PN_MESSAGING_ID_LOADED,
+ PN_MESSAGING_LINK_CLOSED,
PN_MESSAGING_LINK_CLOSING,
PN_MESSAGING_LINK_OPENED,
PN_MESSAGING_LINK_OPENING,
+ PN_MESSAGING_LINK_ERROR,
PN_MESSAGING_MESSAGE,
PN_MESSAGING_QUIT,
PN_MESSAGING_RECORD_INSERTED,
@@ -57,19 +59,25 @@ typedef enum {
PN_MESSAGING_REQUEST,
PN_MESSAGING_RESPONSE,
PN_MESSAGING_SENDABLE,
+ PN_MESSAGING_SESSION_CLOSED,
+ PN_MESSAGING_SESSION_CLOSING,
+ PN_MESSAGING_SESSION_OPENED,
+ PN_MESSAGING_SESSION_OPENING,
+ PN_MESSAGING_SESSION_ERROR,
PN_MESSAGING_SETTLED,
PN_MESSAGING_START,
PN_MESSAGING_TIMER,
PN_MESSAGING_TRANSACTION_ABORTED,
PN_MESSAGING_TRANSACTION_COMMITTED,
- PN_MESSAGING_TRANSACTION_DECLARED
+ PN_MESSAGING_TRANSACTION_DECLARED,
+ PN_MESSAGING_TRANSPORT_CLOSED
} MessagingEventType_t;
class MessagingEvent : public ProtonEvent
{
public:
MessagingEvent(pn_event_t *ce, pn_event_type_t t, Container &c);
- MessagingEvent(MessagingEventType_t t, ProtonEvent *parent, Container &c);
+ MessagingEvent(MessagingEventType_t t, ProtonEvent &parent);
~MessagingEvent();
virtual PROTON_CPP_EXTERN void dispatch(Handler &h);
virtual PROTON_CPP_EXTERN Connection &getConnection();
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/648f7b36/proton-c/bindings/cpp/include/proton/cpp/MessagingHandler.h
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/include/proton/cpp/MessagingHandler.h b/proton-c/bindings/cpp/include/proton/cpp/MessagingHandler.h
index 875af43..51f679a 100644
--- a/proton-c/bindings/cpp/include/proton/cpp/MessagingHandler.h
+++ b/proton-c/bindings/cpp/include/proton/cpp/MessagingHandler.h
@@ -34,20 +34,23 @@ class PROTON_CPP_EXTERN MessagingHandler : public ProtonHandler
{
public:
PROTON_CPP_EXTERN MessagingHandler();
+//ZZZ PROTON_CPP_EXTERN MessagingHandler(int prefetch=10, bool autoAccept=true, autoSettle=true, peerCloseIsError=false);
virtual ~MessagingHandler();
virtual void onAbort(Event &e);
virtual void onAccepted(Event &e);
virtual void onCommit(Event &e);
- virtual void onConnectionClose(Event &e);
virtual void onConnectionClosed(Event &e);
virtual void onConnectionClosing(Event &e);
- virtual void onConnectionOpen(Event &e);
+ virtual void onConnectionError(Event &e);
+ virtual void onConnectionOpening(Event &e);
virtual void onConnectionOpened(Event &e);
virtual void onDisconnected(Event &e);
virtual void onFetch(Event &e);
virtual void onIdLoaded(Event &e);
+ virtual void onLinkClosed(Event &e);
virtual void onLinkClosing(Event &e);
+ virtual void onLinkError(Event &e);
virtual void onLinkOpened(Event &e);
virtual void onLinkOpening(Event &e);
virtual void onMessage(Event &e);
@@ -59,12 +62,18 @@ class PROTON_CPP_EXTERN MessagingHandler : public ProtonHandler
virtual void onRequest(Event &e);
virtual void onResponse(Event &e);
virtual void onSendable(Event &e);
+ virtual void onSessionClosed(Event &e);
+ virtual void onSessionClosing(Event &e);
+ virtual void onSessionError(Event &e);
+ virtual void onSessionOpened(Event &e);
+ virtual void onSessionOpening(Event &e);
virtual void onSettled(Event &e);
virtual void onStart(Event &e);
virtual void onTimer(Event &e);
virtual void onTransactionAborted(Event &e);
virtual void onTransactionCommitted(Event &e);
virtual void onTransactionDeclared(Event &e);
+ virtual void onTransportClosed(Event &e);
};
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/648f7b36/proton-c/bindings/cpp/include/proton/cpp/Session.h
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/include/proton/cpp/Session.h b/proton-c/bindings/cpp/include/proton/cpp/Session.h
index e556cde..68f5e40 100644
--- a/proton-c/bindings/cpp/include/proton/cpp/Session.h
+++ b/proton-c/bindings/cpp/include/proton/cpp/Session.h
@@ -27,6 +27,7 @@
#include "proton/types.h"
#include "proton/link.h"
+#include "ProtonImplRef.h"
#include <string>
struct pn_connection_t;
@@ -38,19 +39,22 @@ class Container;
class Handler;
class Transport;
-class Session : public Endpoint
+ class Session : public Endpoint, public ProtonHandle<pn_session_t>
{
public:
PROTON_CPP_EXTERN Session(pn_session_t *s);
+ PROTON_CPP_EXTERN Session();
PROTON_CPP_EXTERN ~Session();
PROTON_CPP_EXTERN void open();
+ PROTON_CPP_EXTERN Session(const Session&);
+ PROTON_CPP_EXTERN Session& operator=(const Session&);
PROTON_CPP_EXTERN void close();
PROTON_CPP_EXTERN pn_session_t *getPnSession();
virtual PROTON_CPP_EXTERN Connection &getConnection();
Receiver createReceiver(std::string name);
Sender createSender(std::string name);
private:
- pn_session_t *pnSession;
+ friend class ProtonImplRef<Session>;
};
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/648f7b36/proton-c/bindings/cpp/src/Connection.cpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/src/Connection.cpp b/proton-c/bindings/cpp/src/Connection.cpp
index e85b323..49d171e 100644
--- a/proton-c/bindings/cpp/src/Connection.cpp
+++ b/proton-c/bindings/cpp/src/Connection.cpp
@@ -35,7 +35,7 @@ namespace reactor {
template class Handle<ConnectionImpl>;
typedef PrivateImplRef<Connection> PI;
-Connection::Connection() {}
+Connection::Connection() {PI::ctor(*this, 0); }
Connection::Connection(ConnectionImpl* p) { PI::ctor(*this, p); }
Connection::Connection(const Connection& c) : Handle<ConnectionImpl>() { PI::copy(*this, c); }
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/648f7b36/proton-c/bindings/cpp/src/ConnectionImpl.cpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/src/ConnectionImpl.cpp b/proton-c/bindings/cpp/src/ConnectionImpl.cpp
index 2feecb5..be01f8d 100644
--- a/proton-c/bindings/cpp/src/ConnectionImpl.cpp
+++ b/proton-c/bindings/cpp/src/ConnectionImpl.cpp
@@ -22,6 +22,7 @@
#include "proton/cpp/Handler.h"
#include "proton/cpp/exceptions.h"
#include "ConnectionImpl.h"
+#include "proton/cpp/Transport.h"
#include "Msg.h"
#include "contexts.h"
@@ -47,7 +48,10 @@ ConnectionImpl::ConnectionImpl(Container &c) : container(c), refCount(0), overri
setConnectionContext(pnConnection, this);
}
-ConnectionImpl::~ConnectionImpl() {}
+ConnectionImpl::~ConnectionImpl() {
+ delete transport;
+ delete override;
+}
Transport &ConnectionImpl::getTransport() {
if (transport)
@@ -56,7 +60,11 @@ Transport &ConnectionImpl::getTransport() {
}
Handler* ConnectionImpl::getOverride() { return override; }
-void ConnectionImpl::setOverride(Handler *h) { override = h; }
+void ConnectionImpl::setOverride(Handler *h) {
+ if (override)
+ delete override;
+ override = h;
+}
void ConnectionImpl::open() {
pn_connection_open(pnConnection);
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/648f7b36/proton-c/bindings/cpp/src/Connector.cpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/src/Connector.cpp b/proton-c/bindings/cpp/src/Connector.cpp
index 6885575..8ebdfb6 100644
--- a/proton-c/bindings/cpp/src/Connector.cpp
+++ b/proton-c/bindings/cpp/src/Connector.cpp
@@ -62,15 +62,14 @@ void Connector::onConnectionRemoteOpen(Event &e) {
}
void Connector::onConnectionInit(Event &e) {
-
}
void Connector::onTransportClosed(Event &e) {
// TODO: prepend with reconnect logic
PN_CPP_LOG(info, "Disconnected");
- connection.setOverride(0); // No more call backs
pn_connection_release(connection.impl->pnConnection);
- delete this;
+ // No more interaction, so drop our counted reference.
+ connection = Connection();
}
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/648f7b36/proton-c/bindings/cpp/src/Container.cpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/src/Container.cpp b/proton-c/bindings/cpp/src/Container.cpp
index 8e79b15..4eccb15 100644
--- a/proton-c/bindings/cpp/src/Container.cpp
+++ b/proton-c/bindings/cpp/src/Container.cpp
@@ -74,6 +74,10 @@ Receiver Container::createReceiver(Connection &connection, std::string &addr) {
return impl->createReceiver(connection, addr);
}
+Receiver Container::createReceiver(const std::string &url) {
+ return impl->createReceiver(url);
+}
+
Acceptor Container::listen(const std::string &urlString) {
return impl->listen(urlString);
}
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/648f7b36/proton-c/bindings/cpp/src/ContainerImpl.cpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/src/ContainerImpl.cpp b/proton-c/bindings/cpp/src/ContainerImpl.cpp
index 7ff9c1d..df8c716 100644
--- a/proton-c/bindings/cpp/src/ContainerImpl.cpp
+++ b/proton-c/bindings/cpp/src/ContainerImpl.cpp
@@ -37,6 +37,7 @@
#include "proton/connection.h"
#include "proton/session.h"
+#include "proton/handlers.h"
namespace proton {
namespace reactor {
@@ -64,23 +65,20 @@ class CHandler : public Handler
pn_decref(pnHandler);
}
pn_handler_t *getPnHandler() { return pnHandler; }
+
+ virtual void onUnhandled(Event &e) {
+ ProtonEvent *pne = dynamic_cast<ProtonEvent *>(&e);
+ if (!pne) return;
+ int type = pne->getType();
+ if (!type) return; // Not from the reactor
+ pn_handler_dispatch(pnHandler, pne->getPnEvent(), (pn_event_type_t) type);
+ }
+
private:
pn_handler_t *pnHandler;
};
-void dispatch(Handler &h, MessagingEvent &e) {
- // TODO: also dispatch to add()'ed Handlers
- CHandler *chandler;
- int type = e.getType();
- if (type && (chandler = dynamic_cast<CHandler*>(&h))) {
- // event and handler are both native Proton C
- pn_handler_dispatch(chandler->getPnHandler(), e.getPnEvent(), (pn_event_type_t) type);
- }
- else
- e.dispatch(h);
-}
-
// Used to sniff for Connector events before the reactor's global handler sees them.
class OverrideHandler : public Handler
{
@@ -109,8 +107,9 @@ class OverrideHandler : public Handler
ConnectionImpl *connection = getConnectionContext(conn);
if (connection) {
Handler *override = connection->getOverride();
- if (override)
+ if (override) {
e.dispatch(*override);
+ }
}
}
@@ -127,6 +126,29 @@ class OverrideHandler : public Handler
}
};
+
+class CFlowController : public ProtonHandler
+{
+ public:
+ pn_handler_t *flowcontroller;
+
+ CFlowController(int window) : flowcontroller(pn_flowcontroller(window)) {}
+ ~CFlowController() {
+ pn_decref(flowcontroller);
+ }
+
+ void redirect(Event &e) {
+ ProtonEvent *pne = dynamic_cast<ProtonEvent *>(&e);
+ pn_handler_dispatch(flowcontroller, pne->getPnEvent(), (pn_event_type_t) pne->getType());
+ }
+
+ virtual void onLinkLocalOpen(Event &e) { redirect(e); }
+ virtual void onLinkRemoteOpen(Event &e) { redirect(e); }
+ virtual void onLinkFlow(Event &e) { redirect(e); }
+ virtual void onDelivery(Event &e) { redirect(e); }
+};
+
+
namespace {
// TODO: configurable policy. SessionPerConnection for now.
@@ -162,8 +184,8 @@ Handler &getCppHandler(pn_handler_t *c_handler) {
void cpp_handler_dispatch(pn_handler_t *c_handler, pn_event_t *cevent, pn_event_type_t type)
{
- MessagingEvent ev(cevent, type, getContainerRef(c_handler));
- dispatch(getCppHandler(c_handler), ev);
+ MessagingEvent mevent(cevent, type, getContainerRef(c_handler));
+ mevent.dispatch(getCppHandler(c_handler));
}
void cpp_handler_cleanup(pn_handler_t *c_handler)
@@ -176,7 +198,7 @@ pn_handler_t *cpp_handler(ContainerImpl *c, Handler *h)
{
pn_handler_t *handler = pn_handler_new(cpp_handler_dispatch, sizeof(struct InboundContext), cpp_handler_cleanup);
struct InboundContext *ctxt = (struct InboundContext *) pn_handler_mem(handler);
- ctxt->containerRef = Container(c);
+ new (&ctxt->containerRef) Container(c);
ctxt->containerImpl = c;
ctxt->cppHandler = h;
return handler;
@@ -254,6 +276,17 @@ Receiver ContainerImpl::createReceiver(Connection &connection, std::string &addr
return rcv;
}
+Receiver ContainerImpl::createReceiver(const std::string &urlString) {
+ // TODO: const cleanup of API
+ Connection conn = connect(const_cast<std::string &>(urlString));
+ Session session = getDefaultSession(conn.getPnConnection(), &getImpl(conn)->defaultSession);
+ std::string path = Url(urlString).getPath();
+ Receiver rcv = session.createReceiver(containerId + '-' + path);
+ pn_terminus_set_address(pn_link_source(rcv.getPnLink()), path.c_str());
+ rcv.open();
+ return rcv;
+}
+
Acceptor ContainerImpl::acceptor(const std::string &host, const std::string &port) {
pn_acceptor_t *acptr = pn_reactor_acceptor(reactor, host.c_str(), port.c_str(), NULL);
if (acptr)
@@ -271,11 +304,20 @@ Acceptor ContainerImpl::listen(const std::string &urlString) {
void ContainerImpl::run() {
reactor = pn_reactor();
+
// Set our context on the reactor
setContainerContext(reactor, this);
+ int prefetch = 10; // TODO: configurable
+ Handler *flowController = 0;
+
+
// Set the reactor's main/default handler (see note below)
MessagingAdapter messagingAdapter(messagingHandler);
+ if (prefetch) {
+ flowController = new CFlowController(prefetch);
+ messagingHandler.addChildHandler(*flowController);
+ }
messagingHandler.addChildHandler(messagingAdapter);
pn_handler_t *cppHandler = cpp_handler(this, &messagingHandler);
pn_reactor_set_handler(reactor, cppHandler);
@@ -287,15 +329,18 @@ void ContainerImpl::run() {
pn_handler_t *cppGlobalHandler = cpp_handler(this, &overrideHandler);
pn_reactor_set_global_handler(reactor, cppGlobalHandler);
- // Note: we have just set up the following 4 handlers that see events in this order:
- // messagingHandler, messagingAdapter, connector override, the reactor's default global
- // handler (pn_iohandler)
- // TODO: remove fifth pn_handshaker once messagingAdapter matures
-
+ // Note: we have just set up the following 4/5 handlers that see events in this order:
+ // messagingHandler (Proton C events), pn_flowcontroller (optional), messagingAdapter,
+ // messagingHandler (Messaging events from the messagingAdapter), connector override,
+ // the reactor's default globalhandler (pn_iohandler)
pn_reactor_run(reactor);
+
+ pn_decref(cppHandler);
+ pn_decref(cppGlobalHandler);
pn_decref(cGlobalHandler);
pn_reactor_free(reactor);
reactor = 0;
+ delete(flowController);
}
}} // namespace proton::reactor
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/648f7b36/proton-c/bindings/cpp/src/ContainerImpl.h
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/src/ContainerImpl.h b/proton-c/bindings/cpp/src/ContainerImpl.h
index a14bf52..8a6faba 100644
--- a/proton-c/bindings/cpp/src/ContainerImpl.h
+++ b/proton-c/bindings/cpp/src/ContainerImpl.h
@@ -49,6 +49,7 @@ class ContainerImpl
PROTON_CPP_EXTERN Sender createSender(Connection &connection, std::string &addr);
PROTON_CPP_EXTERN Sender createSender(std::string &url);
PROTON_CPP_EXTERN Receiver createReceiver(Connection &connection, std::string &addr);
+ PROTON_CPP_EXTERN Receiver createReceiver(const std::string &url); //ZZZ
PROTON_CPP_EXTERN Acceptor listen(const std::string &url);
PROTON_CPP_EXTERN std::string getContainerId();
static void incref(ContainerImpl *);
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/648f7b36/proton-c/bindings/cpp/src/Message.cpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/src/Message.cpp b/proton-c/bindings/cpp/src/Message.cpp
index 840a10b..ead6eb1 100644
--- a/proton-c/bindings/cpp/src/Message.cpp
+++ b/proton-c/bindings/cpp/src/Message.cpp
@@ -22,33 +22,128 @@
#include "proton/cpp/Message.h"
#include "proton/cpp/exceptions.h"
#include "Msg.h"
+#include "ProtonImplRef.h"
+
+#include <cstring>
namespace proton {
namespace reactor {
-Message::Message() : pnMessage(pn_message()){}
+template class ProtonHandle<pn_message_t>;
+typedef ProtonImplRef<Message> PI;
-Message::~Message() {
- pn_decref(pnMessage);
+Message::Message() {
+ PI::ctor(*this, 0);
+}
+Message::Message(pn_message_t *p) {
+ PI::ctor(*this, p);
}
+Message::Message(const Message& m) : ProtonHandle<pn_message_t>() {
+ PI::copy(*this, m);
+}
+Message& Message::operator=(const Message& m) {
+ return PI::assign(*this, m);
+}
+Message::~Message() { PI::dtor(*this); }
-Message::Message(const Message& m) : pnMessage(m.pnMessage) {
- pn_incref(pnMessage);
+namespace {
+void confirm(pn_message_t *&p) {
+ if (p) return;
+ p = pn_message(); // Correct refcount of 1
+ if (!p)
+ throw ProtonException(MSG("No memory"));
}
-Message& Message::operator=(const Message& m) {
- pnMessage = m.pnMessage;
- pn_incref(pnMessage);
- return *this;
+void getFormatedStringContent(pn_data_t *data, std::string &str) {
+ pn_data_rewind(data);
+ size_t sz = str.capacity();
+ if (sz < 512) sz = 512;
+ while (true) {
+ str.resize(sz);
+ int err = pn_data_format(data, (char *) str.data(), &sz);
+ if (err) {
+ if (err != PN_OVERFLOW)
+ throw ProtonException(MSG("Unexpected message body data error"));
+ }
+ else {
+ str.resize(sz);
+ return;
+ }
+ sz *= 2;
+ }
+}
+
+} // namespace
+
+void Message::setId(uint64_t id) {
+ confirm(impl);
+ pn_data_t *data = pn_message_id(impl);
+ pn_data_clear(data);
+ if (int err = pn_data_put_ulong(data, id))
+ throw ProtonException(MSG("setId error " << err));
+}
+
+uint64_t Message::getId() {
+ confirm(impl);
+ pn_data_t *data = pn_message_id(impl);
+ pn_data_rewind(data);
+ if (pn_data_size(data) == 1 && pn_data_next(data) && pn_data_type(data) == PN_ULONG) {
+ return pn_data_get_ulong(data);
+ }
+ throw ProtonException(MSG("Message ID is not a ULONG"));
+}
+
+pn_type_t Message::getIdType() {
+ confirm(impl);
+ pn_data_t *data = pn_message_id(impl);
+ pn_data_rewind(data);
+ if (pn_data_size(data) == 1 && pn_data_next(data)) {
+ pn_type_t type = pn_data_type(data);
+ switch (type) {
+ case PN_ULONG:
+ case PN_STRING:
+ case PN_BINARY:
+ case PN_UUID:
+ return type;
+ break;
+ default:
+ break;
+ }
+ }
+ return PN_NULL;
}
void Message::setBody(const std::string &buf) {
- pn_data_t *body = pn_message_body(pnMessage);
+ confirm(impl);
+ pn_data_t *body = pn_message_body(impl);
+ pn_data_clear(body);
pn_data_put_string(body, pn_bytes(buf.size(), buf.data()));
}
+void Message::getBody(std::string &str) {
+ // User supplied string/buffer
+ confirm(impl);
+ pn_data_t *body = pn_message_body(impl);
+ pn_data_rewind(body);
+
+ if (pn_data_next(body) && pn_data_type(body) == PN_STRING) {
+ pn_bytes_t bytes= pn_data_get_string(body);
+ if (!pn_data_next(body)) {
+ // String data and nothing else
+ str.resize(bytes.size);
+ memmove((void *) str.data(), bytes.start, bytes.size);
+ return;
+ }
+ }
+
+ getFormatedStringContent(body, str);
+}
+
std::string Message::getBody() {
- pn_data_t *body = pn_message_body(pnMessage);
+ confirm(impl);
+ pn_data_t *body = pn_message_body(impl);
+ pn_data_rewind(body);
+
if (pn_data_next(body) && pn_data_type(body) == PN_STRING) {
pn_bytes_t bytes= pn_data_get_string(body);
if (!pn_data_next(body)) {
@@ -57,36 +152,73 @@ std::string Message::getBody() {
}
}
- pn_data_rewind(body);
std::string str;
- size_t sz = 1024;
- str.resize(sz);
- int err = pn_data_format(body, (char *) str.data(), &sz);
- if (err == PN_OVERFLOW)
- throw ProtonException(MSG("TODO: sizing loop missing"));
- if (err) throw ProtonException(MSG("Unexpected data error"));
- str.resize(sz);
+ getFormatedStringContent(body, str);
return str;
}
+void Message::setBody(const char *bytes, size_t len) {
+ confirm(impl);
+ pn_data_t *body = pn_message_body(impl);
+ pn_data_clear(body);
+ pn_data_put_binary(body, pn_bytes(len, bytes));
+}
+
+size_t Message::getBody(char *bytes, size_t len) {
+ confirm(impl);
+ pn_data_t *body = pn_message_body(impl);
+ pn_data_rewind(body);
+ if (pn_data_size(body) == 1 && pn_data_next(body) && pn_data_type(body) == PN_BINARY) {
+ pn_bytes_t pnb = pn_data_get_binary(body);
+ if (len >= pnb.size) {
+ memmove(bytes, pnb.start, pnb.size);
+ return pnb.size;
+ }
+ throw ProtonException(MSG("Binary buffer too small"));
+ }
+ throw ProtonException(MSG("Not simple binary data"));
+}
+
+
+
+size_t Message::getBinaryBodySize() {
+ confirm(impl);
+ pn_data_t *body = pn_message_body(impl);
+ pn_data_rewind(body);
+ if (pn_data_size(body) == 1 && pn_data_next(body) && pn_data_type(body) == PN_BINARY) {
+ pn_bytes_t bytes = pn_data_get_binary(body);
+ return bytes.size;
+ }
+ return 0;
+}
+
+
void Message::encode(std::string &s) {
- size_t sz = 1024;
- if (s.capacity() > sz)
- sz = s.capacity();
- else
- s.reserve(sz);
- s.resize(sz);
- int err = pn_message_encode(pnMessage, (char *) s.data(), &sz);
- if (err == PN_OVERFLOW)
- throw ProtonException(MSG("TODO: fix overflow with dynamic buffer resizing"));
- if (err) throw ProtonException(MSG("unexpected error"));
- s.resize(sz);
+ confirm(impl);
+ size_t sz = s.capacity();
+ if (sz < 512) sz = 512;
+ while (true) {
+ s.resize(sz);
+ int err = pn_message_encode(impl, (char *) s.data(), &sz);
+ if (err) {
+ if (err != PN_OVERFLOW)
+ throw ProtonException(MSG("unexpected error"));
+ } else {
+ s.resize(sz);
+ return;
+ }
+ sz *= 2;
+ }
}
void Message::decode(const std::string &s) {
- int err = pn_message_decode(pnMessage, s.data(), s.size());
+ confirm(impl);
+ int err = pn_message_decode(impl, s.data(), s.size());
if (err) throw ProtonException(MSG("unexpected error"));
}
+pn_message_t *Message::getPnMessage() const {
+ return impl;
+}
}} // namespace proton::reactor
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/648f7b36/proton-c/bindings/cpp/src/MessagingAdapter.cpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/src/MessagingAdapter.cpp b/proton-c/bindings/cpp/src/MessagingAdapter.cpp
index 9cab2b3..1097305 100644
--- a/proton-c/bindings/cpp/src/MessagingAdapter.cpp
+++ b/proton-c/bindings/cpp/src/MessagingAdapter.cpp
@@ -28,23 +28,26 @@
#include "proton/handlers.h"
#include "proton/delivery.h"
#include "proton/connection.h"
+#include "proton/session.h"
namespace proton {
namespace reactor {
-MessagingAdapter::MessagingAdapter(MessagingHandler &d) : delegate(d), handshaker(pn_handshaker()) {
- pn_handler_t *flowcontroller = pn_flowcontroller(10);
- pn_handler_add(handshaker, flowcontroller);
- pn_decref(flowcontroller);
+MessagingAdapter::MessagingAdapter(MessagingHandler &d) : delegate(d), handshaker(pn_handshaker()),
+ autoSettle(true), autoAccept(true),
+ peerCloseIsError(false) {
};
MessagingAdapter::~MessagingAdapter(){
pn_decref(handshaker);
};
+
void MessagingAdapter::onReactorInit(Event &e) {
- // create onStart extended event
- MessagingEvent mevent(PN_MESSAGING_START, NULL, e.getContainer());
- mevent.dispatch(delegate);
+ ProtonEvent *pe = dynamic_cast<ProtonEvent*>(&e);
+ if (pe) {
+ MessagingEvent mevent(PN_MESSAGING_START, *pe);
+ delegate.onStart(mevent);
+ }
}
void MessagingAdapter::onLinkFlow(Event &e) {
@@ -54,8 +57,8 @@ void MessagingAdapter::onLinkFlow(Event &e) {
pn_link_t *lnk = pn_event_link(pne);
if (lnk && pn_link_is_sender(lnk) && pn_link_credit(lnk) > 0) {
// create onMessage extended event
- MessagingEvent mevent(PN_MESSAGING_SENDABLE, pe, e.getContainer());
- mevent.dispatch(delegate);
+ MessagingEvent mevent(PN_MESSAGING_SENDABLE, *pe);
+ delegate.onSendable(mevent);;
}
}
}
@@ -85,32 +88,60 @@ void MessagingAdapter::onDelivery(Event &e) {
if (pn_link_is_receiver(lnk)) {
if (!pn_delivery_partial(dlv) && pn_delivery_readable(dlv)) {
// generate onMessage
- MessagingEvent mevent(PN_MESSAGING_MESSAGE, pe, pe->getContainer());
+ MessagingEvent mevent(PN_MESSAGING_MESSAGE, *pe);
Message m(receiveMessage(lnk, dlv));
mevent.setMessage(m);
- // TODO: check if endpoint closed...
- mevent.dispatch(delegate);
- // only do auto accept for now
- pn_delivery_update(dlv, PN_ACCEPTED);
- pn_delivery_settle(dlv);
- // TODO: generate onSettled
+ if (pn_link_state(lnk) & PN_LOCAL_CLOSED) {
+ if (autoAccept) {
+ pn_delivery_update(dlv, PN_RELEASED);
+ pn_delivery_settle(dlv);
+ }
+ }
+ else {
+ try {
+ delegate.onMessage(mevent);
+ if (autoAccept) {
+ pn_delivery_update(dlv, PN_ACCEPTED);
+ pn_delivery_settle(dlv);
+ }
+ }
+ catch (MessageReject &) {
+ pn_delivery_update(dlv, PN_REJECTED);
+ pn_delivery_settle(dlv);
+ }
+ catch (MessageRelease &) {
+ pn_delivery_update(dlv, PN_REJECTED);
+ pn_delivery_settle(dlv);
+ }
+ }
+ }
+ else if (pn_delivery_updated(dlv) && pn_delivery_settled(dlv)) {
+ MessagingEvent mevent(PN_MESSAGING_SETTLED, *pe);
+ delegate.onSettled(mevent);
}
} else {
// Sender
if (pn_delivery_updated(dlv)) {
uint64_t rstate = pn_delivery_remote_state(dlv);
- if (rstate == PN_ACCEPTED)
- // generate onAccepted
- MessagingEvent(PN_MESSAGING_ACCEPTED, pe, pe->getContainer()).dispatch(delegate);
- else if (rstate = PN_REJECTED)
- MessagingEvent(PN_MESSAGING_REJECTED, pe, pe->getContainer()).dispatch(delegate);
- else if (rstate == PN_RELEASED || rstate == PN_MODIFIED)
- MessagingEvent(PN_MESSAGING_RELEASED, pe, pe->getContainer()).dispatch(delegate);
-
- if (pn_delivery_settled(dlv))
- MessagingEvent(PN_MESSAGING_SETTLED, pe, pe->getContainer()).dispatch(delegate);
-
- pn_delivery_settle(dlv); // TODO: only if auto settled
+ if (rstate == PN_ACCEPTED) {
+ MessagingEvent mevent(PN_MESSAGING_ACCEPTED, *pe);
+ delegate.onAccepted(mevent);
+ }
+ else if (rstate = PN_REJECTED) {
+ MessagingEvent mevent(PN_MESSAGING_REJECTED, *pe);
+ delegate.onRejected(mevent);
+ }
+ else if (rstate == PN_RELEASED || rstate == PN_MODIFIED) {
+ MessagingEvent mevent(PN_MESSAGING_RELEASED, *pe);
+ delegate.onReleased(mevent);
+ }
+
+ if (pn_delivery_settled(dlv)) {
+ MessagingEvent mevent(PN_MESSAGING_SETTLED, *pe);
+ delegate.onSettled(mevent);
+ }
+ if (autoSettle)
+ pn_delivery_settle(dlv);
}
}
}
@@ -140,52 +171,247 @@ bool isRemoteClosed(pn_state_t state) {
} // namespace
+void MessagingAdapter::onLinkRemoteClose(Event &e) {
+ ProtonEvent *pe = dynamic_cast<ProtonEvent*>(&e);
+ if (pe) {
+ pn_event_t *cevent = pe->getPnEvent();
+ pn_link_t *lnk = pn_event_link(cevent);
+ pn_state_t state = pn_link_state(lnk);
+ if (pn_condition_is_set(pn_link_remote_condition(lnk))) {
+ MessagingEvent mevent(PN_MESSAGING_LINK_ERROR, *pe);
+ onLinkError(mevent);
+ }
+ else if (isLocalClosed(state)) {
+ MessagingEvent mevent(PN_MESSAGING_LINK_CLOSED, *pe);
+ onLinkClosed(mevent);
+ }
+ else {
+ MessagingEvent mevent(PN_MESSAGING_LINK_CLOSING, *pe);
+ onLinkClosing(mevent);
+ }
+ pn_link_close(lnk);
+ }
+}
+
+void MessagingAdapter::onSessionRemoteClose(Event &e) {
+ ProtonEvent *pe = dynamic_cast<ProtonEvent*>(&e);
+ if (pe) {
+ pn_event_t *cevent = pe->getPnEvent();
+ pn_session_t *session = pn_event_session(cevent);
+ pn_state_t state = pn_session_state(session);
+ if (pn_condition_is_set(pn_session_remote_condition(session))) {
+ MessagingEvent mevent(PN_MESSAGING_SESSION_ERROR, *pe);
+ onSessionError(mevent);
+ }
+ else if (isLocalClosed(state)) {
+ MessagingEvent mevent(PN_MESSAGING_SESSION_CLOSED, *pe);
+ onSessionClosed(mevent);
+ }
+ else {
+ MessagingEvent mevent(PN_MESSAGING_SESSION_CLOSING, *pe);
+ onSessionClosing(mevent);
+ }
+ pn_session_close(session);
+ }
+}
+
void MessagingAdapter::onConnectionRemoteClose(Event &e) {
ProtonEvent *pe = dynamic_cast<ProtonEvent*>(&e);
if (pe) {
pn_event_t *cevent = pe->getPnEvent();
- pn_connection_t *conn = pn_event_connection(cevent);
- // TODO: remote condition -> error
- if (isLocalClosed(pn_connection_state(conn))) {
- MessagingEvent(PN_MESSAGING_CONNECTION_CLOSED, pe, pe->getContainer()).dispatch(delegate);
+ pn_connection_t *connection = pn_event_connection(cevent);
+ pn_state_t state = pn_connection_state(connection);
+ if (pn_condition_is_set(pn_connection_remote_condition(connection))) {
+ MessagingEvent mevent(PN_MESSAGING_CONNECTION_ERROR, *pe);
+ onConnectionError(mevent);
+ }
+ else if (isLocalClosed(state)) {
+ MessagingEvent mevent(PN_MESSAGING_CONNECTION_CLOSED, *pe);
+ onConnectionClosed(mevent);
}
else {
- MessagingEvent(PN_MESSAGING_CONNECTION_CLOSING, pe, pe->getContainer()).dispatch(delegate);
+ MessagingEvent mevent(PN_MESSAGING_CONNECTION_CLOSING, *pe);
+ onConnectionClosing(mevent);
+ }
+ pn_connection_close(connection);
+ }
+}
+
+void MessagingAdapter::onConnectionLocalOpen(Event &e) {
+ ProtonEvent *pe = dynamic_cast<ProtonEvent*>(&e);
+ if (pe) {
+ pn_connection_t *connection = pn_event_connection(pe->getPnEvent());
+ if (isRemoteOpen(pn_connection_state(connection))) {
+ MessagingEvent mevent(PN_MESSAGING_CONNECTION_OPENED, *pe);
+ onConnectionOpened(mevent);
+ }
+ }
+}
+
+void MessagingAdapter::onConnectionRemoteOpen(Event &e) {
+ ProtonEvent *pe = dynamic_cast<ProtonEvent*>(&e);
+ if (pe) {
+ pn_connection_t *connection = pn_event_connection(pe->getPnEvent());
+ if (isLocalOpen(pn_connection_state(connection))) {
+ MessagingEvent mevent(PN_MESSAGING_CONNECTION_OPENED, *pe);
+ onConnectionOpened(mevent);
+ }
+ else if (isLocalUnititialised(pn_connection_state(connection))) {
+ MessagingEvent mevent(PN_MESSAGING_CONNECTION_OPENING, *pe);
+ onConnectionOpening(mevent);
+ pn_connection_open(connection);
+ }
+ }
+}
+
+void MessagingAdapter::onSessionLocalOpen(Event &e) {
+ ProtonEvent *pe = dynamic_cast<ProtonEvent*>(&e);
+ if (pe) {
+ pn_session_t *session = pn_event_session(pe->getPnEvent());
+ if (isRemoteOpen(pn_session_state(session))) {
+ MessagingEvent mevent(PN_MESSAGING_SESSION_OPENED, *pe);
+ onSessionOpened(mevent);
+ }
+ }
+}
+
+void MessagingAdapter::onSessionRemoteOpen(Event &e) {
+ ProtonEvent *pe = dynamic_cast<ProtonEvent*>(&e);
+ if (pe) {
+ pn_session_t *session = pn_event_session(pe->getPnEvent());
+ if (isLocalOpen(pn_session_state(session))) {
+ MessagingEvent mevent(PN_MESSAGING_SESSION_OPENED, *pe);
+ onSessionOpened(mevent);
+ }
+ else if (isLocalUnititialised(pn_session_state(session))) {
+ MessagingEvent mevent(PN_MESSAGING_SESSION_OPENING, *pe);
+ onSessionOpening(mevent);
+ pn_session_open(session);
}
- pn_connection_close(conn);
}
}
+void MessagingAdapter::onLinkLocalOpen(Event &e) {
+ ProtonEvent *pe = dynamic_cast<ProtonEvent*>(&e);
+ if (pe) {
+ pn_link_t *link = pn_event_link(pe->getPnEvent());
+ if (isRemoteOpen(pn_link_state(link))) {
+ MessagingEvent mevent(PN_MESSAGING_LINK_OPENED, *pe);
+ onLinkOpened(mevent);
+ }
+ }
+}
void MessagingAdapter::onLinkRemoteOpen(Event &e) {
ProtonEvent *pe = dynamic_cast<ProtonEvent*>(&e);
if (pe) {
- pn_event_t *cevent = pe->getPnEvent();
- pn_link_t *link = pn_event_link(cevent);
- // TODO: remote condition -> error
+ pn_link_t *link = pn_event_link(pe->getPnEvent());
if (isLocalOpen(pn_link_state(link))) {
- MessagingEvent(PN_MESSAGING_LINK_OPENED, pe, pe->getContainer()).dispatch(delegate);
+ MessagingEvent mevent(PN_MESSAGING_LINK_OPENED, *pe);
+ onLinkOpened(mevent);
}
else if (isLocalUnititialised(pn_link_state(link))) {
- MessagingEvent(PN_MESSAGING_LINK_OPENING, pe, pe->getContainer()).dispatch(delegate);
+ MessagingEvent mevent(PN_MESSAGING_LINK_OPENING, *pe);
+ onLinkOpening(mevent);
pn_link_open(link);
}
}
}
+void MessagingAdapter::onTransportTailClosed(Event &e) {
+ ProtonEvent *pe = dynamic_cast<ProtonEvent*>(&e);
+ if (pe) {
+ pn_connection_t *conn = pn_event_connection(pe->getPnEvent());
+ if (conn && isLocalOpen(pn_connection_state(conn))) {
+ MessagingEvent mevent(PN_MESSAGING_DISCONNECTED, *pe);
+ delegate.onDisconnected(mevent);
+ }
+ }
+}
+
-void MessagingAdapter::onUnhandled(Event &e) {
- // Until this code fleshes out closer to python's, cheat a bit with a pn_handshaker
+void MessagingAdapter::onConnectionOpened(Event &e) {
+ delegate.onConnectionOpened(e);
+}
+void MessagingAdapter::onSessionOpened(Event &e) {
+ delegate.onSessionOpened(e);
+}
+
+void MessagingAdapter::onLinkOpened(Event &e) {
+ delegate.onLinkOpened(e);
+}
+
+void MessagingAdapter::onConnectionOpening(Event &e) {
+ delegate.onConnectionOpening(e);
+}
+
+void MessagingAdapter::onSessionOpening(Event &e) {
+ delegate.onSessionOpening(e);
+}
+
+void MessagingAdapter::onLinkOpening(Event &e) {
+ delegate.onLinkOpening(e);
+}
+
+void MessagingAdapter::onConnectionError(Event &e) {
+ delegate.onConnectionError(e);
ProtonEvent *pe = dynamic_cast<ProtonEvent*>(&e);
if (pe) {
- pn_event_type_t type = (pn_event_type_t) pe->getType();
- if (type != PN_EVENT_NONE) {
- pn_handler_dispatch(handshaker, pe->getPnEvent(), type);
- }
+ pn_connection_t *connection = pn_event_connection(pe->getPnEvent());
+ pn_connection_close(connection);
+ }
+}
+
+void MessagingAdapter::onSessionError(Event &e) {
+ delegate.onSessionError(e);
+ ProtonEvent *pe = dynamic_cast<ProtonEvent*>(&e);
+ if (pe) {
+ pn_session_t *session = pn_event_session(pe->getPnEvent());
+ pn_session_close(session);
}
}
+void MessagingAdapter::onLinkError(Event &e) {
+ delegate.onLinkError(e);
+ ProtonEvent *pe = dynamic_cast<ProtonEvent*>(&e);
+ if (pe) {
+ pn_link_t *link = pn_event_link(pe->getPnEvent());
+ pn_link_close(link);
+ }
+}
+
+void MessagingAdapter::onConnectionClosed(Event &e) {
+ delegate.onConnectionClosed(e);
+}
+void MessagingAdapter::onSessionClosed(Event &e) {
+ delegate.onSessionClosed(e);
+}
+
+void MessagingAdapter::onLinkClosed(Event &e) {
+ delegate.onLinkClosed(e);
+}
+
+void MessagingAdapter::onConnectionClosing(Event &e) {
+ delegate.onConnectionClosing(e);
+ if (peerCloseIsError)
+ onConnectionError(e);
+}
+
+void MessagingAdapter::onSessionClosing(Event &e) {
+ delegate.onSessionClosing(e);
+ if (peerCloseIsError)
+ onSessionError(e);
+}
+
+void MessagingAdapter::onLinkClosing(Event &e) {
+ delegate.onLinkClosing(e);
+ if (peerCloseIsError)
+ onLinkError(e);
+}
+
+void MessagingAdapter::onUnhandled(Event &e) {
+}
}} // namespace proton::reactor
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/648f7b36/proton-c/bindings/cpp/src/MessagingEvent.cpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/src/MessagingEvent.cpp b/proton-c/bindings/cpp/src/MessagingEvent.cpp
index bcfb721..b8a2f8a 100644
--- a/proton-c/bindings/cpp/src/MessagingEvent.cpp
+++ b/proton-c/bindings/cpp/src/MessagingEvent.cpp
@@ -24,6 +24,7 @@
#include "proton/link.h"
#include "proton/cpp/MessagingEvent.h"
+#include "proton/cpp/Message.h"
#include "proton/cpp/ProtonHandler.h"
#include "proton/cpp/MessagingHandler.h"
#include "proton/cpp/exceptions.h"
@@ -37,8 +38,8 @@ MessagingEvent::MessagingEvent(pn_event_t *ce, pn_event_type_t t, Container &c)
ProtonEvent(ce, t, c), messagingType(PN_MESSAGING_PROTON), parentEvent(0), message(0)
{}
-MessagingEvent::MessagingEvent(MessagingEventType_t t, ProtonEvent *p, Container &c) :
- ProtonEvent(NULL, PN_EVENT_NONE, c), messagingType(t), parentEvent(p), message(0) {
+MessagingEvent::MessagingEvent(MessagingEventType_t t, ProtonEvent &p) :
+ ProtonEvent(NULL, PN_EVENT_NONE, p.getContainer()), messagingType(t), parentEvent(&p), message(0) {
if (messagingType == PN_MESSAGING_PROTON)
throw ProtonException(MSG("invalid messaging event type"));
}
@@ -80,16 +81,18 @@ Link MessagingEvent::getLink() {
}
Message MessagingEvent::getMessage() {
- if (message)
- return *message;
+ if (parentEvent) {
+ pn_message_t *m = getEventContext(parentEvent->getPnEvent());
+ if (m)
+ return Message(m);
+ }
throw ProtonException(MSG("No message context for event"));
}
void MessagingEvent::setMessage(Message &m) {
- if (messagingType != PN_MESSAGING_MESSAGE)
+ if (messagingType != PN_MESSAGING_MESSAGE || !parentEvent)
throw ProtonException(MSG("Event type does not provide message"));
- delete message;
- message = new Message(m);
+ setEventContext(parentEvent->getPnEvent(), m.getPnMessage());
}
void MessagingEvent::dispatch(Handler &h) {
@@ -112,9 +115,23 @@ void MessagingEvent::dispatch(Handler &h) {
case PN_MESSAGING_CONNECTION_CLOSING: handler->onConnectionClosing(*this); break;
case PN_MESSAGING_CONNECTION_CLOSED: handler->onConnectionClosed(*this); break;
+ case PN_MESSAGING_CONNECTION_ERROR: handler->onConnectionError(*this); break;
+ case PN_MESSAGING_CONNECTION_OPENING: handler->onConnectionOpening(*this); break;
+ case PN_MESSAGING_CONNECTION_OPENED: handler->onConnectionOpened(*this); break;
+
+ case PN_MESSAGING_LINK_CLOSED: handler->onLinkClosed(*this); break;
+ case PN_MESSAGING_LINK_CLOSING: handler->onLinkClosing(*this); break;
+ case PN_MESSAGING_LINK_ERROR: handler->onLinkError(*this); break;
case PN_MESSAGING_LINK_OPENING: handler->onLinkOpening(*this); break;
case PN_MESSAGING_LINK_OPENED: handler->onLinkOpened(*this); break;
+ case PN_MESSAGING_SESSION_CLOSED: handler->onSessionClosed(*this); break;
+ case PN_MESSAGING_SESSION_CLOSING: handler->onSessionClosing(*this); break;
+ case PN_MESSAGING_SESSION_ERROR: handler->onSessionError(*this); break;
+ case PN_MESSAGING_SESSION_OPENING: handler->onSessionOpening(*this); break;
+ case PN_MESSAGING_SESSION_OPENED: handler->onSessionOpened(*this); break;
+
+ case PN_MESSAGING_TRANSPORT_CLOSED: handler->onTransportClosed(*this); break;
default:
throw ProtonException(MSG("Unkown messaging event type " << messagingType));
break;
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/648f7b36/proton-c/bindings/cpp/src/MessagingHandler.cpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/src/MessagingHandler.cpp b/proton-c/bindings/cpp/src/MessagingHandler.cpp
index 7cb48f6..6066b07 100644
--- a/proton-c/bindings/cpp/src/MessagingHandler.cpp
+++ b/proton-c/bindings/cpp/src/MessagingHandler.cpp
@@ -30,15 +30,17 @@ MessagingHandler::~MessagingHandler(){};
void MessagingHandler::onAbort(Event &e) { onUnhandled(e); }
void MessagingHandler::onAccepted(Event &e) { onUnhandled(e); }
void MessagingHandler::onCommit(Event &e) { onUnhandled(e); }
-void MessagingHandler::onConnectionClose(Event &e) { onUnhandled(e); }
void MessagingHandler::onConnectionClosed(Event &e) { onUnhandled(e); }
void MessagingHandler::onConnectionClosing(Event &e) { onUnhandled(e); }
-void MessagingHandler::onConnectionOpen(Event &e) { onUnhandled(e); }
+void MessagingHandler::onConnectionError(Event &e) { onUnhandled(e); }
void MessagingHandler::onConnectionOpened(Event &e) { onUnhandled(e); }
+void MessagingHandler::onConnectionOpening(Event &e) { onUnhandled(e); }
void MessagingHandler::onDisconnected(Event &e) { onUnhandled(e); }
void MessagingHandler::onFetch(Event &e) { onUnhandled(e); }
void MessagingHandler::onIdLoaded(Event &e) { onUnhandled(e); }
+void MessagingHandler::onLinkClosed(Event &e) { onUnhandled(e); }
void MessagingHandler::onLinkClosing(Event &e) { onUnhandled(e); }
+void MessagingHandler::onLinkError(Event &e) { onUnhandled(e); }
void MessagingHandler::onLinkOpened(Event &e) { onUnhandled(e); }
void MessagingHandler::onLinkOpening(Event &e) { onUnhandled(e); }
void MessagingHandler::onMessage(Event &e) { onUnhandled(e); }
@@ -50,11 +52,17 @@ void MessagingHandler::onReleased(Event &e) { onUnhandled(e); }
void MessagingHandler::onRequest(Event &e) { onUnhandled(e); }
void MessagingHandler::onResponse(Event &e) { onUnhandled(e); }
void MessagingHandler::onSendable(Event &e) { onUnhandled(e); }
+void MessagingHandler::onSessionClosed(Event &e) { onUnhandled(e); }
+void MessagingHandler::onSessionClosing(Event &e) { onUnhandled(e); }
+void MessagingHandler::onSessionError(Event &e) { onUnhandled(e); }
+void MessagingHandler::onSessionOpened(Event &e) { onUnhandled(e); }
+void MessagingHandler::onSessionOpening(Event &e) { onUnhandled(e); }
void MessagingHandler::onSettled(Event &e) { onUnhandled(e); }
void MessagingHandler::onStart(Event &e) { onUnhandled(e); }
void MessagingHandler::onTimer(Event &e) { onUnhandled(e); }
void MessagingHandler::onTransactionAborted(Event &e) { onUnhandled(e); }
void MessagingHandler::onTransactionCommitted(Event &e) { onUnhandled(e); }
void MessagingHandler::onTransactionDeclared(Event &e) { onUnhandled(e); }
+void MessagingHandler::onTransportClosed(Event &e) { onUnhandled(e); }
}} // namespace proton::reactor
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/648f7b36/proton-c/bindings/cpp/src/Session.cpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/src/Session.cpp b/proton-c/bindings/cpp/src/Session.cpp
index d2b01dd..4333dcf 100644
--- a/proton-c/bindings/cpp/src/Session.cpp
+++ b/proton-c/bindings/cpp/src/Session.cpp
@@ -30,34 +30,43 @@
namespace proton {
namespace reactor {
+template class ProtonHandle<pn_session_t>;
+typedef ProtonImplRef<Session> PI;
-Session::Session(pn_session_t *s) : pnSession(s)
-{
- pn_incref(pnSession);
+Session::Session(pn_session_t *p) {
+ PI::ctor(*this, p);
+}
+Session::Session() {
+ PI::ctor(*this, 0);
+}
+Session::Session(const Session& c) : ProtonHandle<pn_session_t>() {
+ PI::copy(*this, c);
+}
+Session& Session::operator=(const Session& c) {
+ return PI::assign(*this, c);
}
-
Session::~Session() {
- pn_decref(pnSession);
+ PI::dtor(*this);
}
-pn_session_t *Session::getPnSession() { return pnSession; }
+pn_session_t *Session::getPnSession() { return impl; }
void Session::open() {
- pn_session_open(pnSession);
+ pn_session_open(impl);
}
Connection &Session::getConnection() {
- pn_connection_t *c = pn_session_connection(pnSession);
+ pn_connection_t *c = pn_session_connection(impl);
return ConnectionImpl::getReactorReference(c);
}
Receiver Session::createReceiver(std::string name) {
- pn_link_t *link = pn_receiver(pnSession, name.c_str());
+ pn_link_t *link = pn_receiver(impl, name.c_str());
return Receiver(link);
}
Sender Session::createSender(std::string name) {
- pn_link_t *link = pn_sender(pnSession, name.c_str());
+ pn_link_t *link = pn_sender(impl, name.c_str());
return Sender(link);
}
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/648f7b36/proton-c/bindings/cpp/src/contexts.cpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/src/contexts.cpp b/proton-c/bindings/cpp/src/contexts.cpp
index b1dec49..56483ff 100644
--- a/proton-c/bindings/cpp/src/contexts.cpp
+++ b/proton-c/bindings/cpp/src/contexts.cpp
@@ -23,13 +23,13 @@
#include "proton/cpp/exceptions.h"
#include "Msg.h"
#include "proton/object.h"
+#include "proton/message.h"
#include "proton/session.h"
#include "proton/link.h"
PN_HANDLE(PNI_CPP_CONNECTION_CONTEXT)
-PN_HANDLE(PNI_CPP_SESSION_CONTEXT)
-PN_HANDLE(PNI_CPP_LINK_CONTEXT)
PN_HANDLE(PNI_CPP_CONTAINER_CONTEXT)
+PN_HANDLE(PNI_CPP_EVENT_CONTEXT)
namespace proton {
namespace reactor {
@@ -39,7 +39,6 @@ void setConnectionContext(pn_connection_t *pnConnection, ConnectionImpl *connect
pn_record_def(record, PNI_CPP_CONNECTION_CONTEXT, PN_VOID);
pn_record_set(record, PNI_CPP_CONNECTION_CONTEXT, connection);
}
-
ConnectionImpl *getConnectionContext(pn_connection_t *pnConnection) {
if (!pnConnection) return NULL;
pn_record_t *record = pn_connection_attachments(pnConnection);
@@ -48,40 +47,11 @@ ConnectionImpl *getConnectionContext(pn_connection_t *pnConnection) {
}
-void setSessionContext(pn_session_t *pnSession, Session *session) {
- pn_record_t *record = pn_session_attachments(pnSession);
- pn_record_def(record, PNI_CPP_SESSION_CONTEXT, PN_VOID);
- pn_record_set(record, PNI_CPP_SESSION_CONTEXT, session);
-}
-
-Session *getSessionContext(pn_session_t *pnSession) {
- if (!pnSession) return NULL;
- pn_record_t *record = pn_session_attachments(pnSession);
- Session *p = (Session *) pn_record_get(record, PNI_CPP_SESSION_CONTEXT);
- return p;
-}
-
-
-void setLinkContext(pn_link_t *pnLink, Link *link) {
- pn_record_t *record = pn_link_attachments(pnLink);
- pn_record_def(record, PNI_CPP_LINK_CONTEXT, PN_VOID);
- pn_record_set(record, PNI_CPP_LINK_CONTEXT, link);
-}
-
-Link *getLinkContext(pn_link_t *pnLink) {
- if (!pnLink) return NULL;
- pn_record_t *record = pn_link_attachments(pnLink);
- Link *p = (Link *) pn_record_get(record, PNI_CPP_LINK_CONTEXT);
- return p;
-}
-
-
void setContainerContext(pn_reactor_t *pnReactor, ContainerImpl *container) {
pn_record_t *record = pn_reactor_attachments(pnReactor);
pn_record_def(record, PNI_CPP_CONTAINER_CONTEXT, PN_VOID);
pn_record_set(record, PNI_CPP_CONTAINER_CONTEXT, container);
}
-
ContainerImpl *getContainerContext(pn_reactor_t *pnReactor) {
pn_record_t *record = pn_reactor_attachments(pnReactor);
ContainerImpl *p = (ContainerImpl *) pn_record_get(record, PNI_CPP_CONTAINER_CONTEXT);
@@ -89,4 +59,17 @@ ContainerImpl *getContainerContext(pn_reactor_t *pnReactor) {
return p;
}
+void setEventContext(pn_event_t *pnEvent, pn_message_t *m) {
+ pn_record_t *record = pn_event_attachments(pnEvent);
+ pn_record_def(record, PNI_CPP_EVENT_CONTEXT, PN_OBJECT); // refcount it for life of the event
+ pn_record_set(record, PNI_CPP_EVENT_CONTEXT, m);
+}
+pn_message_t *getEventContext(pn_event_t *pnEvent) {
+ if (!pnEvent) return NULL;
+ pn_record_t *record = pn_event_attachments(pnEvent);
+ pn_message_t *p = (pn_message_t *) pn_record_get(record, PNI_CPP_EVENT_CONTEXT);
+ return p;
+}
+
+
}} // namespace proton::reactor
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/648f7b36/proton-c/bindings/cpp/src/contexts.h
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/src/contexts.h b/proton-c/bindings/cpp/src/contexts.h
index c04a77a..e1b5f24 100644
--- a/proton-c/bindings/cpp/src/contexts.h
+++ b/proton-c/bindings/cpp/src/contexts.h
@@ -23,6 +23,7 @@
*/
#include "proton/reactor.h"
#include "proton/connection.h"
+#include "proton/message.h"
namespace proton {
namespace reactor {
@@ -43,6 +44,9 @@ class ContainerImpl;
void setContainerContext(pn_reactor_t *pnReactor, ContainerImpl *container);
ContainerImpl *getContainerContext(pn_reactor_t *pnReactor);
+void setEventContext(pn_event_t *pnEvent, pn_message_t *m);
+pn_message_t *getEventContext(pn_event_t *pnEvent);
+
}} // namespace proton::reactor
#endif /*!PROTON_CPP_CONTEXTS_H*/
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org