You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by gs...@apache.org on 2007/09/10 10:41:06 UTC
svn commit: r574176 - in /incubator/qpid/trunk/qpid/cpp/src: ./ qpid/broker/
qpid/client/ qpid/framing/ tests/
Author: gsim
Date: Mon Sep 10 01:41:05 2007
New Revision: 574176
URL: http://svn.apache.org/viewvc?rev=574176&view=rev
Log:
Client side support for message and delivery properties in header segments.
Added:
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageAdapter.cpp (with props)
incubator/qpid/trunk/qpid/cpp/src/qpid/framing/TransferContent.cpp (with props)
incubator/qpid/trunk/qpid/cpp/src/qpid/framing/TransferContent.h (with props)
Modified:
incubator/qpid/trunk/qpid/cpp/src/Makefile.am
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerQueue.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DeliveryRecord.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Message.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Message.h
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageAdapter.h
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageBuilder.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Session.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/client/ClientMessage.h
incubator/qpid/trunk/qpid/cpp/src/qpid/client/ConnectionImpl.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/client/ConnectionImpl.h
incubator/qpid/trunk/qpid/cpp/src/qpid/client/ExecutionHandler.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/client/ExecutionHandler.h
incubator/qpid/trunk/qpid/cpp/src/qpid/framing/AMQContentBody.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/framing/FrameSet.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/framing/FrameSet.h
incubator/qpid/trunk/qpid/cpp/src/qpid/framing/MethodContent.h
incubator/qpid/trunk/qpid/cpp/src/tests/ClientSessionTest.cpp
Modified: incubator/qpid/trunk/qpid/cpp/src/Makefile.am
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/Makefile.am?rev=574176&r1=574175&r2=574176&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/Makefile.am (original)
+++ incubator/qpid/trunk/qpid/cpp/src/Makefile.am Mon Sep 10 01:41:05 2007
@@ -132,6 +132,7 @@
qpid/framing/Blob.cpp \
qpid/framing/MethodHolder.h qpid/framing/MethodHolder.cpp \
qpid/framing/MethodHolderMaxSize.h \
+ qpid/framing/TransferContent.cpp \
qpid/Exception.cpp \
qpid/Plugin.h \
qpid/Plugin.cpp \
@@ -178,6 +179,7 @@
qpid/broker/FanOutExchange.cpp \
qpid/broker/HeadersExchange.cpp \
qpid/broker/Message.cpp \
+ qpid/broker/MessageAdapter.cpp \
qpid/broker/MessageBuilder.cpp \
qpid/broker/MessageDelivery.cpp \
qpid/broker/MessageHandlerImpl.cpp \
@@ -344,6 +346,7 @@
qpid/framing/SequenceNumberSet.h \
qpid/framing/SerializeHandler.h \
qpid/framing/StructHelper.h \
+ qpid/framing/TransferContent.h \
qpid/framing/TypeFilter.h \
qpid/framing/Value.h \
qpid/framing/Visitor.h \
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerQueue.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerQueue.cpp?rev=574176&r1=574175&r2=574176&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerQueue.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerQueue.cpp Mon Sep 10 01:41:05 2007
@@ -68,7 +68,7 @@
if (msg->isImmediate() && getConsumerCount() == 0) {
if (alternateExchange) {
DeliverableMessage deliverable(msg);
- alternateExchange->route(deliverable, msg->getRoutingKey(), &(msg->getApplicationHeaders()));
+ alternateExchange->route(deliverable, msg->getRoutingKey(), msg->getApplicationHeaders());
}
} else {
@@ -358,7 +358,7 @@
while(!messages.empty()){
DeliverableMessage msg(messages.front().payload);
alternateExchange->route(msg, msg.getMessage().getRoutingKey(),
- &(msg.getMessage().getApplicationHeaders()));
+ msg.getMessage().getApplicationHeaders());
pop();
}
alternateExchange->decAlternateUsers();
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DeliveryRecord.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DeliveryRecord.cpp?rev=574176&r1=574175&r2=574176&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DeliveryRecord.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DeliveryRecord.cpp Mon Sep 10 01:41:05 2007
@@ -95,7 +95,7 @@
Exchange::shared_ptr alternate = queue->getAlternateExchange();
if (alternate) {
DeliverableMessage delivery(msg.payload);
- alternate->route(delivery, msg.payload->getRoutingKey(), &(msg.payload->getApplicationHeaders()));
+ alternate->route(delivery, msg.payload->getRoutingKey(), msg.payload->getApplicationHeaders());
QPID_LOG(info, "Routed rejected message from " << queue->getName() << " to "
<< alternate->getName());
} else {
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Message.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Message.cpp?rev=574176&r1=574175&r2=574176&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Message.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Message.cpp Mon Sep 10 01:41:05 2007
@@ -38,12 +38,12 @@
Message::Message(const SequenceNumber& id) : frames(id), persistenceId(0), redelivered(false), publisher(0), store(0), adapter(0) {}
-const std::string& Message::getRoutingKey() const
+std::string Message::getRoutingKey() const
{
return getAdapter().getRoutingKey(frames);
}
-const std::string& Message::getExchangeName() const
+std::string Message::getExchangeName() const
{
return getAdapter().getExchange(frames);
}
@@ -61,7 +61,7 @@
return getAdapter().isImmediate(frames);
}
-const FieldTable& Message::getApplicationHeaders() const
+const FieldTable* Message::getApplicationHeaders() const
{
return getAdapter().getApplicationHeaders(frames);
}
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Message.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Message.h?rev=574176&r1=574175&r2=574176&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Message.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Message.h Mon Sep 10 01:41:05 2007
@@ -59,11 +59,11 @@
uint64_t contentSize() const;
- const std::string& getRoutingKey() const;
+ std::string getRoutingKey() const;
const boost::shared_ptr<Exchange> getExchange(ExchangeRegistry&) const;
- const std::string& getExchangeName() const;
+ std::string getExchangeName() const;
bool isImmediate() const;
- const framing::FieldTable& getApplicationHeaders() const;
+ const framing::FieldTable* getApplicationHeaders() const;
bool isPersistent();
framing::FrameSet& getFrames() { return frames; }
Added: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageAdapter.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageAdapter.cpp?rev=574176&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageAdapter.cpp (added)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageAdapter.cpp Mon Sep 10 01:41:05 2007
@@ -0,0 +1,87 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "MessageAdapter.h"
+
+namespace {
+ const std::string empty;
+}
+
+namespace qpid {
+namespace broker{
+
+ std::string PublishAdapter::getRoutingKey(const framing::FrameSet& f)
+ {
+ return f.as<framing::BasicPublishBody>()->getRoutingKey();
+ }
+
+ std::string PublishAdapter::getExchange(const framing::FrameSet& f)
+ {
+ return f.as<framing::BasicPublishBody>()->getExchange();
+ }
+
+ bool PublishAdapter::isImmediate(const framing::FrameSet& f)
+ {
+ return f.as<framing::BasicPublishBody>()->getImmediate();
+ }
+
+ const framing::FieldTable* PublishAdapter::getApplicationHeaders(const framing::FrameSet& f)
+ {
+ const framing::BasicHeaderProperties* p = f.getHeaders()->get<framing::BasicHeaderProperties>();
+ return p ? &(p->getHeaders()) : 0;
+ }
+
+ bool PublishAdapter::isPersistent(const framing::FrameSet& f)
+ {
+ const framing::BasicHeaderProperties* p = f.getHeaders()->get<framing::BasicHeaderProperties>();
+ return p && p->getDeliveryMode() == 2;
+ }
+
+ std::string TransferAdapter::getRoutingKey(const framing::FrameSet& f)
+ {
+ const framing::DeliveryProperties* p = f.getHeaders()->get<framing::DeliveryProperties>();
+ return p ? p->getRoutingKey() : empty;
+ }
+
+ std::string TransferAdapter::getExchange(const framing::FrameSet& f)
+ {
+ return f.as<framing::MessageTransferBody>()->getDestination();
+ }
+
+ bool TransferAdapter::isImmediate(const framing::FrameSet&)
+ {
+ //TODO: we seem to have lost the immediate flag
+ return false;
+ }
+
+ const framing::FieldTable* TransferAdapter::getApplicationHeaders(const framing::FrameSet& f)
+ {
+ const framing::MessageProperties* p = f.getHeaders()->get<framing::MessageProperties>();
+ return p ? &(p->getApplicationHeaders()) : 0;
+ }
+
+ bool TransferAdapter::isPersistent(const framing::FrameSet& f)
+ {
+ const framing::DeliveryProperties* p = f.getHeaders()->get<framing::DeliveryProperties>();
+ return p && p->getDeliveryMode() == 2;
+ }
+
+}}
Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageAdapter.cpp
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageAdapter.cpp
------------------------------------------------------------------------------
svn:keywords = Rev Date
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageAdapter.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageAdapter.h?rev=574176&r1=574175&r2=574176&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageAdapter.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageAdapter.h Mon Sep 10 01:41:05 2007
@@ -38,68 +38,29 @@
{
virtual ~MessageAdapter() {}
- virtual const std::string& getRoutingKey(const framing::FrameSet& f) = 0;
- virtual const std::string& getExchange(const framing::FrameSet& f) = 0;
+ virtual std::string getRoutingKey(const framing::FrameSet& f) = 0;
+ virtual std::string getExchange(const framing::FrameSet& f) = 0;
virtual bool isImmediate(const framing::FrameSet& f) = 0;
- virtual const framing::FieldTable& getApplicationHeaders(const framing::FrameSet& f) = 0;
+ virtual const framing::FieldTable* getApplicationHeaders(const framing::FrameSet& f) = 0;
virtual bool isPersistent(const framing::FrameSet& f) = 0;
};
struct PublishAdapter : MessageAdapter
{
- const std::string& getRoutingKey(const framing::FrameSet& f)
- {
- return f.as<framing::BasicPublishBody>()->getRoutingKey();
- }
-
- const std::string& getExchange(const framing::FrameSet& f)
- {
- return f.as<framing::BasicPublishBody>()->getExchange();
- }
-
- bool isImmediate(const framing::FrameSet& f)
- {
- return f.as<framing::BasicPublishBody>()->getImmediate();
- }
-
- const framing::FieldTable& getApplicationHeaders(const framing::FrameSet& f)
- {
- return f.getHeaders()->get<framing::BasicHeaderProperties>()->getHeaders();
- }
-
- bool isPersistent(const framing::FrameSet& f)
- {
- return f.getHeaders()->get<framing::BasicHeaderProperties>()->getDeliveryMode() == 2;
- }
+ std::string getRoutingKey(const framing::FrameSet& f);
+ std::string getExchange(const framing::FrameSet& f);
+ bool isImmediate(const framing::FrameSet& f);
+ const framing::FieldTable* getApplicationHeaders(const framing::FrameSet& f);
+ bool isPersistent(const framing::FrameSet& f);
};
struct TransferAdapter : MessageAdapter
{
- const std::string& getRoutingKey(const framing::FrameSet& f)
- {
- return f.getHeaders()->get<framing::DeliveryProperties>()->getRoutingKey();
- }
-
- const std::string& getExchange(const framing::FrameSet& f)
- {
- return f.as<framing::MessageTransferBody>()->getDestination();
- }
-
- bool isImmediate(const framing::FrameSet&)
- {
- //TODO: we seem to have lost the immediate flag
- return false;
- }
-
- const framing::FieldTable& getApplicationHeaders(const framing::FrameSet& f)
- {
- return f.getHeaders()->get<framing::MessageProperties>()->getApplicationHeaders();
- }
-
- bool isPersistent(const framing::FrameSet& f)
- {
- return f.getHeaders()->get<framing::DeliveryProperties>()->getDeliveryMode() == 2;
- }
+ std::string getRoutingKey(const framing::FrameSet& f);
+ std::string getExchange(const framing::FrameSet& f);
+ bool isImmediate(const framing::FrameSet&);
+ const framing::FieldTable* getApplicationHeaders(const framing::FrameSet& f);
+ bool isPersistent(const framing::FrameSet& f);
};
}}
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageBuilder.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageBuilder.cpp?rev=574176&r1=574175&r2=574176&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageBuilder.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageBuilder.cpp Mon Sep 10 01:41:05 2007
@@ -22,8 +22,8 @@
#include "Message.h"
#include "MessageStore.h"
-#include "qpid/Exception.h"
#include "qpid/framing/AMQFrame.h"
+#include "qpid/framing/reply_exceptions.h"
using namespace qpid::broker;
using namespace qpid::framing;
@@ -46,7 +46,7 @@
checkType(CONTENT_BODY, frame.getBody()->type());
break;
default:
- throw ConnectionException(504, "Invalid frame sequence for message.");
+ throw CommandInvalidException(QPID_MSG("Invalid frame sequence for message (state=" << state << ")"));
}
if (staging) {
store->appendContent(*message, frame.castBody<AMQContentBody>()->getData());
@@ -61,13 +61,6 @@
}
}
-void MessageBuilder::checkType(uint8_t expected, uint8_t actual)
-{
- if (expected != actual) {
- throw ConnectionException(504, "Invalid frame sequence for message.");
- }
-}
-
void MessageBuilder::end()
{
message.reset();
@@ -80,4 +73,33 @@
message = Message::shared_ptr(new Message(id));
state = METHOD;
staging = false;
+}
+
+namespace {
+
+const std::string HEADER_BODY_S = "HEADER";
+const std::string METHOD_BODY_S = "METHOD";
+const std::string CONTENT_BODY_S = "CONTENT";
+const std::string HEARTBEAT_BODY_S = "HEARTBEAT";
+const std::string UNKNOWN = "unknown";
+
+std::string type_str(uint8_t type)
+{
+ switch(type) {
+ case METHOD_BODY: return METHOD_BODY_S;
+ case HEADER_BODY: return HEADER_BODY_S;
+ case CONTENT_BODY: return CONTENT_BODY_S;
+ case HEARTBEAT_BODY: return HEARTBEAT_BODY_S;
+ }
+ return UNKNOWN;
+}
+
+}
+
+void MessageBuilder::checkType(uint8_t expected, uint8_t actual)
+{
+ if (expected != actual) {
+ throw CommandInvalidException(QPID_MSG("Invalid frame sequence for message (expected "
+ << type_str(expected) << " got " << type_str(actual) << ")"));
+ }
}
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Session.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Session.cpp?rev=574176&r1=574175&r2=574176&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Session.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Session.cpp Mon Sep 10 01:41:05 2007
@@ -336,13 +336,13 @@
cacheExchange = getAdapter()->getConnection().broker.getExchanges().get(exchangeName);
}
- cacheExchange->route(strategy, msg->getRoutingKey(), &(msg->getApplicationHeaders()));
+ cacheExchange->route(strategy, msg->getRoutingKey(), msg->getApplicationHeaders());
if (!strategy.delivered) {
//TODO:if reject-unroutable, then reject
//else route to alternate exchange
if (cacheExchange->getAlternate()) {
- cacheExchange->getAlternate()->route(strategy, msg->getRoutingKey(), &(msg->getApplicationHeaders()));
+ cacheExchange->getAlternate()->route(strategy, msg->getRoutingKey(), msg->getApplicationHeaders());
}
}
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/ClientMessage.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/ClientMessage.h?rev=574176&r1=574175&r2=574176&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/ClientMessage.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/ClientMessage.h Mon Sep 10 01:41:05 2007
@@ -58,8 +58,14 @@
bool isRedelivered() const { return redelivered; }
void setRedelivered(bool _redelivered){ redelivered = _redelivered; }
- const HeaderProperties& getMethodHeaders() const { return *this; }
-
+ framing::AMQHeaderBody getHeader() const
+ {
+ framing::AMQHeaderBody header;
+ BasicHeaderProperties* properties = header.get<BasicHeaderProperties>(true);
+ BasicHeaderProperties::copy<BasicHeaderProperties, Message>(*properties, *this);
+ properties->setContentLength(data.size());
+ return header;
+ }
//TODO: move this elsewhere (GRS 24/08/2007)
void populate(framing::FrameSet& frameset)
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/ConnectionImpl.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/ConnectionImpl.cpp?rev=574176&r1=574175&r2=574176&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/ConnectionImpl.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/ConnectionImpl.cpp Mon Sep 10 01:41:05 2007
@@ -24,6 +24,7 @@
using namespace qpid::client;
using namespace qpid::framing;
+using namespace qpid::sys;
ConnectionImpl::ConnectionImpl(boost::shared_ptr<Connector> c) : connector(c)
{
@@ -38,6 +39,7 @@
void ConnectionImpl::allocated(SessionCore::shared_ptr session)
{
+ Mutex::ScopedLock l(lock);
if (sessions.find(session->getId()) != sessions.end()) {
throw Exception("Id already in use.");
}
@@ -46,6 +48,7 @@
void ConnectionImpl::released(SessionCore::shared_ptr session)
{
+ Mutex::ScopedLock l(lock);
SessionMap::iterator i = sessions.find(session->getId());
if (i != sessions.end()) {
sessions.erase(i);
@@ -59,12 +62,7 @@
void ConnectionImpl::incoming(framing::AMQFrame& frame)
{
- uint16_t id = frame.getChannel();
- SessionMap::iterator i = sessions.find(id);
- if (i == sessions.end()) {
- throw ConnectionException(504, (boost::format("Invalid channel number %g") % id).str());
- }
- i->second->handle(frame);
+ find(frame.getChannel())->handle(frame);
}
void ConnectionImpl::open(const std::string& host, int port,
@@ -93,10 +91,7 @@
void ConnectionImpl::closedByPeer(uint16_t code, const std::string& text)
{
- for (SessionMap::iterator i = sessions.begin(); i != sessions.end(); i++) {
- i->second->closed(code, text);
- }
- sessions.clear();
+ signalClose(code, text);
connector->close();
}
@@ -114,8 +109,25 @@
void ConnectionImpl::shutdown()
{
//this indicates that the socket to the server has closed
+ signalClose(0, "Unexpected socket closure.");
+}
+
+void ConnectionImpl::signalClose(uint16_t code, const std::string& text)
+{
+ Mutex::ScopedLock l(lock);
for (SessionMap::iterator i = sessions.begin(); i != sessions.end(); i++) {
- i->second->closed(0, "Unexpected socket closure.");
+ Mutex::ScopedUnlock u(lock);
+ i->second->closed(code, text);
}
sessions.clear();
+}
+
+SessionCore::shared_ptr ConnectionImpl::find(uint16_t id)
+{
+ Mutex::ScopedLock l(lock);
+ SessionMap::iterator i = sessions.find(id);
+ if (i == sessions.end()) {
+ throw ConnectionException(504, (boost::format("Invalid channel number %g") % id).str());
+ }
+ return i->second;
}
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/ConnectionImpl.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/ConnectionImpl.h?rev=574176&r1=574175&r2=574176&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/ConnectionImpl.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/ConnectionImpl.h Mon Sep 10 01:41:05 2007
@@ -25,6 +25,7 @@
#include <map>
#include <boost/shared_ptr.hpp>
#include "qpid/framing/FrameHandler.h"
+#include "qpid/sys/Mutex.h"
#include "qpid/sys/ShutdownHandler.h"
#include "qpid/sys/TimeoutHandler.h"
#include "ConnectionHandler.h"
@@ -44,6 +45,7 @@
ConnectionHandler handler;
boost::shared_ptr<Connector> connector;
framing::ProtocolVersion version;
+ sys::Mutex lock;
void incoming(framing::AMQFrame& frame);
void closed();
@@ -51,6 +53,9 @@
void idleOut();
void idleIn();
void shutdown();
+ void signalClose(uint16_t, const std::string&);
+ SessionCore::shared_ptr find(uint16_t);
+
public:
typedef boost::shared_ptr<ConnectionImpl> shared_ptr;
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/ExecutionHandler.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/ExecutionHandler.cpp?rev=574176&r1=574175&r2=574176&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/ExecutionHandler.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/ExecutionHandler.cpp Mon Sep 10 01:41:05 2007
@@ -181,31 +181,28 @@
CompletionTracker::ResultListener l)
{
SequenceNumber id = send(command, l);
- sendContent(dynamic_cast<const BasicHeaderProperties&>(content.getMethodHeaders()), content.getData());
+ sendContent(content);
return id;
}
-void ExecutionHandler::sendContent(const BasicHeaderProperties& headers, const std::string& data)
+void ExecutionHandler::sendContent(const MethodContent& content)
{
- AMQHeaderBody header;
- BasicHeaderProperties::copy(*header.get<BasicHeaderProperties>(true), headers);
- header.get<BasicHeaderProperties>(true)->setContentLength(data.size());
- AMQFrame h(0, header);
- out(h);
+ AMQFrame header(0, content.getHeader());
+ out(header);
- u_int64_t data_length = data.length();
+ u_int64_t data_length = content.getData().length();
if(data_length > 0){
//frame itself uses 8 bytes
u_int32_t frag_size = maxFrameSize - 8;
if(data_length < frag_size){
- AMQFrame frame(0, AMQContentBody(data));
+ AMQFrame frame(0, AMQContentBody(content.getData()));
out(frame);
}else{
u_int32_t offset = 0;
u_int32_t remaining = data_length - offset;
while (remaining > 0) {
u_int32_t length = remaining > frag_size ? frag_size : remaining;
- string frag(data.substr(offset, length));
+ string frag(content.getData().substr(offset, length));
AMQFrame frame(0, AMQContentBody(frag));
out(frame);
offset += length;
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/ExecutionHandler.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/ExecutionHandler.h?rev=574176&r1=574175&r2=574176&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/ExecutionHandler.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/ExecutionHandler.h Mon Sep 10 01:41:05 2007
@@ -59,7 +59,7 @@
void sendCompletion();
- void sendContent(const framing::BasicHeaderProperties& headers, const std::string& data);
+ void sendContent(const framing::MethodContent&);
public:
typedef CompletionTracker::ResultListener ResultListener;
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/framing/AMQContentBody.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/framing/AMQContentBody.cpp?rev=574176&r1=574175&r2=574176&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/framing/AMQContentBody.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/framing/AMQContentBody.cpp Mon Sep 10 01:41:05 2007
@@ -41,6 +41,6 @@
{
out << "content (" << size() << " bytes)";
#ifndef NDEBUG
- out << data.substr(0,10);
+ out << " " << data.substr(0,10);
#endif
}
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/framing/FrameSet.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/framing/FrameSet.cpp?rev=574176&r1=574175&r2=574176&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/framing/FrameSet.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/framing/FrameSet.cpp Mon Sep 10 01:41:05 2007
@@ -56,17 +56,17 @@
const AMQMethodBody* FrameSet::getMethod() const
{
- return parts.empty() ? 0 : dynamic_cast<const AMQMethodBody*>(parts[0].getBody());
+ return parts.empty() ? 0 : parts[0].getMethod();
}
const AMQHeaderBody* FrameSet::getHeaders() const
{
- return parts.size() < 2 ? 0 : dynamic_cast<const AMQHeaderBody*>(parts[1].getBody());
+ return parts.size() < 2 ? 0 : parts[1].castBody<AMQHeaderBody>();
}
AMQHeaderBody* FrameSet::getHeaders()
{
- return parts.size() < 2 ? 0 : dynamic_cast<AMQHeaderBody*>(parts[1].getBody());
+ return parts.size() < 2 ? 0 : parts[1].castBody<AMQHeaderBody>();
}
uint64_t FrameSet::getContentSize() const
@@ -80,4 +80,11 @@
{
AccumulateContent accumulator(out);
map_if(accumulator, TypeFilter(CONTENT_BODY));
+}
+
+std::string FrameSet::getContent() const
+{
+ std::string out;
+ getContent(out);
+ return out;
}
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/framing/FrameSet.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/framing/FrameSet.h?rev=574176&r1=574175&r2=574176&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/framing/FrameSet.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/framing/FrameSet.h Mon Sep 10 01:41:05 2007
@@ -48,6 +48,7 @@
uint64_t getContentSize() const;
void getContent(std::string&) const;
+ std::string getContent() const;
const AMQMethodBody* getMethod() const;
const AMQHeaderBody* getHeaders() const;
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/framing/MethodContent.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/framing/MethodContent.h?rev=574176&r1=574175&r2=574176&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/framing/MethodContent.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/framing/MethodContent.h Mon Sep 10 01:41:05 2007
@@ -21,7 +21,8 @@
#ifndef _MethodContent_
#define _MethodContent_
-#include "HeaderProperties.h"
+#include <string>
+#include "AMQHeaderBody.h"
namespace qpid {
namespace framing {
@@ -31,7 +32,7 @@
public:
virtual ~MethodContent() {}
//TODO: rethink this interface
- virtual const HeaderProperties& getMethodHeaders() const = 0;
+ virtual AMQHeaderBody getHeader() const = 0;
virtual const std::string& getData() const = 0;
};
Added: incubator/qpid/trunk/qpid/cpp/src/qpid/framing/TransferContent.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/framing/TransferContent.cpp?rev=574176&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/framing/TransferContent.cpp (added)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/framing/TransferContent.cpp Mon Sep 10 01:41:05 2007
@@ -0,0 +1,64 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "TransferContent.h"
+
+namespace qpid {
+namespace framing {
+
+TransferContent::TransferContent(const std::string& _data)
+{
+ setData(_data);
+}
+
+AMQHeaderBody TransferContent::getHeader() const
+{
+ return header;
+}
+
+const std::string& TransferContent::getData() const
+{
+ return data;
+}
+
+void TransferContent::setData(const std::string& _data)
+{
+ data = _data;
+ header.get<MessageProperties>(true)->setContentLength(data.size());
+}
+
+void TransferContent::appendData(const std::string& _data)
+{
+ data += _data;
+ header.get<MessageProperties>(true)->setContentLength(data.size());
+}
+
+MessageProperties& TransferContent::getMessageProperties()
+{
+ return *header.get<MessageProperties>(true);
+}
+
+DeliveryProperties& TransferContent::getDeliveryProperties()
+{
+ return *header.get<DeliveryProperties>(true);
+}
+
+}}
Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/framing/TransferContent.cpp
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/framing/TransferContent.cpp
------------------------------------------------------------------------------
svn:keywords = Rev Date
Added: incubator/qpid/trunk/qpid/cpp/src/qpid/framing/TransferContent.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/framing/TransferContent.h?rev=574176&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/framing/TransferContent.h (added)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/framing/TransferContent.h Mon Sep 10 01:41:05 2007
@@ -0,0 +1,46 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#ifndef _TransferContent_
+#define _TransferContent_
+
+#include "MethodContent.h"
+#include "qpid/framing/MessageProperties.h"
+#include "qpid/framing/DeliveryProperties.h"
+
+namespace qpid {
+namespace framing {
+
+class TransferContent : public MethodContent
+{
+ AMQHeaderBody header;
+ std::string data;
+public:
+ TransferContent(const std::string& data);
+ AMQHeaderBody getHeader() const;
+ void setData(const std::string&);
+ void appendData(const std::string&);
+ const std::string& getData() const;
+ MessageProperties& getMessageProperties();
+ DeliveryProperties& getDeliveryProperties();
+};
+
+}}
+#endif
Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/framing/TransferContent.h
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/framing/TransferContent.h
------------------------------------------------------------------------------
svn:keywords = Rev Date
Modified: incubator/qpid/trunk/qpid/cpp/src/tests/ClientSessionTest.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/tests/ClientSessionTest.cpp?rev=574176&r1=574175&r2=574176&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/tests/ClientSessionTest.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/tests/ClientSessionTest.cpp Mon Sep 10 01:41:05 2007
@@ -22,6 +22,7 @@
#include "qpid_test_plugin.h"
#include "InProcessBroker.h"
#include "qpid/client/Session.h"
+#include "qpid/framing/TransferContent.h"
using namespace qpid::client;
using namespace qpid::framing;
@@ -29,7 +30,8 @@
class ClientSessionTest : public CppUnit::TestCase
{
CPPUNIT_TEST_SUITE(ClientSessionTest);
- CPPUNIT_TEST(testQueueQuery);;
+ CPPUNIT_TEST(testQueueQuery);
+ CPPUNIT_TEST(testTransfer);
CPPUNIT_TEST_SUITE_END();
boost::shared_ptr<Connector> broker;
@@ -55,14 +57,24 @@
CPPUNIT_ASSERT_EQUAL(alternate, result.get().getAlternateExchange());
}
- void testCompletion()
+ void testTransfer()
{
std::string queue("my-queue");
std::string dest("my-dest");
+ std::string data("my message");
session.queueDeclare(0, queue, "", false, false, true, true, FieldTable());
- //subcribe to the queue with confirm_mode = 1
+ //subcribe to the queue with confirm_mode = 1:
session.messageSubscribe(0, queue, dest, false, 1, 0, false, FieldTable());
- //publish some messages
+ //publish a message:
+ TransferContent content(data);
+ content.getDeliveryProperties().setRoutingKey("my-queue");
+ session.messageTransfer(0, "", 0, 0, content);
+ //get & test the message:
+ FrameSet::shared_ptr msg = session.get();
+ CPPUNIT_ASSERT(msg->isA<MessageTransferBody>());
+ CPPUNIT_ASSERT_EQUAL(data, msg->getContent());
+ //confirm receipt:
+ session.execution().completed(msg->getId(), true, true);
}
};