You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by or...@apache.org on 2012/12/17 13:33:15 UTC
svn commit: r1422901 [2/3] - in /qpid/branches/java-broker-config-qpid-4390:
./ qpid/ qpid/cpp/bindings/ qpid/cpp/bindings/qmf2/examples/cpp/
qpid/cpp/bindings/qpid/examples/perl/ qpid/cpp/bindings/qpid/perl/test/
qpid/cpp/bindings/qpid/ruby/ext/cqpid/...
Modified: qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/RecoverableMessageImpl.h
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/RecoverableMessageImpl.h?rev=1422901&r1=1422900&r2=1422901&view=diff
==============================================================================
--- qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/RecoverableMessageImpl.h (original)
+++ qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/RecoverableMessageImpl.h Mon Dec 17 12:33:05 2012
@@ -37,6 +37,7 @@ public:
~RecoverableMessageImpl() {};
void setPersistenceId(uint64_t id);
void setRedelivered();
+ void computeExpiration(const boost::intrusive_ptr<ExpiryPolicy>& ep);
bool loadContent(uint64_t available);
void decodeContent(framing::Buffer& buffer);
void recover(boost::shared_ptr<Queue> queue);
Modified: qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/RecoveryManagerImpl.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/RecoveryManagerImpl.cpp?rev=1422901&r1=1422900&r2=1422901&view=diff
==============================================================================
--- qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/RecoveryManagerImpl.cpp (original)
+++ qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/RecoveryManagerImpl.cpp Mon Dec 17 12:33:05 2012
@@ -186,6 +186,11 @@ void RecoverableMessageImpl::setRedelive
msg.deliver();//increment delivery count (but at present that isn't recorded durably)
}
+void RecoverableMessageImpl::computeExpiration(const boost::intrusive_ptr<ExpiryPolicy>& ep)
+{
+ msg.computeExpiration(ep);
+}
+
void RecoverableQueueImpl::recover(RecoverableMessage::shared_ptr msg)
{
dynamic_pointer_cast<RecoverableMessageImpl>(msg)->recover(queue);
Modified: qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/SemanticState.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/SemanticState.cpp?rev=1422901&r1=1422900&r2=1422901&view=diff
==============================================================================
--- qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/SemanticState.cpp (original)
+++ qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/SemanticState.cpp Mon Dec 17 12:33:05 2012
@@ -37,9 +37,11 @@
#include "qpid/log/Statement.h"
#include "qpid/ptr_map.h"
#include "qpid/broker/AclModule.h"
+#include "qpid/broker/FedOps.h"
#include <boost/bind.hpp>
#include <boost/format.hpp>
+#include <boost/tuple/tuple_comparison.hpp>
#include <iostream>
#include <sstream>
@@ -48,6 +50,11 @@
#include <assert.h>
+namespace {
+const std::string X_SCOPE("x-scope");
+const std::string SESSION("session");
+}
+
namespace qpid {
namespace broker {
@@ -87,6 +94,7 @@ void SemanticState::closed() {
if (dtxBuffer.get()) {
dtxBuffer->fail();
}
+ unbindSessionBindings();
requeue();
//now unsubscribe, which may trigger queue deletion and thus
@@ -303,14 +311,14 @@ Consumer(_name, type),
deliveryCount(0),
protocols(parent->getSession().getBroker().getProtocolRegistry())
{
- if (parent != 0 && queue.get() != 0 && queue->GetManagementObjectShared() !=0)
+ if (parent != 0 && queue.get() != 0 && queue->GetManagementObject() !=0)
{
ManagementAgent* agent = parent->session.getBroker().getManagementAgent();
qpid::management::Manageable* ms = dynamic_cast<qpid::management::Manageable*> (&(parent->session));
if (agent != 0)
{
- mgmtObject = _qmf::Subscription::shared_ptr(new _qmf::Subscription(agent, this, ms , queue->GetManagementObjectShared()->getObjectId(), getTag(),
+ mgmtObject = _qmf::Subscription::shared_ptr(new _qmf::Subscription(agent, this, ms , queue->GetManagementObject()->getObjectId(), getTag(),
!acquire, ackExpected, exclusive, ManagementAgent::toMap(arguments)));
agent->addObject (mgmtObject);
mgmtObject->set_creditMode("WINDOW");
@@ -318,7 +326,7 @@ Consumer(_name, type),
}
}
-ManagementObject::shared_ptr SemanticState::ConsumerImpl::GetManagementObjectShared (void) const
+ManagementObject::shared_ptr SemanticState::ConsumerImpl::GetManagementObject (void) const
{
return mgmtObject;
}
@@ -803,4 +811,63 @@ void SemanticState::detached()
}
}
+void SemanticState::addBinding(const string& queueName, const string& exchangeName,
+ const string& routingKey, const framing::FieldTable& arguments)
+{
+ QPID_LOG (debug, "SemanticState::addBinding ["
+ << "queue=" << queueName << ", "
+ << "exchange=" << exchangeName << ", "
+ << "key=" << routingKey << ", "
+ << "args=" << arguments << "]");
+ std::string fedOp = arguments.getAsString(qpidFedOp);
+ if ((arguments.isSet(qpidFedOp)) && (fedOp.empty())) {
+ fedOp = fedOpBind;
+ }
+ std::string fedOrigin = arguments.getAsString(qpidFedOrigin);
+ if ((arguments.getAsString(X_SCOPE) == SESSION) || (fedOp == fedOpBind)) {
+ bindings.insert(boost::make_tuple(queueName, exchangeName, routingKey, fedOrigin));
+ }
+ else if (fedOp == fedOpUnbind) {
+ bindings.erase(boost::make_tuple(queueName, exchangeName, routingKey, fedOrigin));
+ }
+}
+
+void SemanticState::removeBinding(const string& queueName, const string& exchangeName,
+ const string& routingKey)
+{
+ QPID_LOG (debug, "SemanticState::removeBinding ["
+ << "queue=" << queueName << ", "
+ << "exchange=" << exchangeName << ", "
+ << "key=" << routingKey)
+ bindings.erase(boost::make_tuple(queueName, exchangeName, routingKey, ""));
+}
+
+void SemanticState::unbindSessionBindings()
+{
+ //unbind session-scoped bindings
+ for (Bindings::iterator i = bindings.begin(); i != bindings.end(); i++) {
+ QPID_LOG (debug, "SemanticState::unbindSessionBindings ["
+ << "queue=" << i->get<0>() << ", "
+ << "exchange=" << i->get<1>()<< ", "
+ << "key=" << i->get<2>() << ", "
+ << "fedOrigin=" << i->get<3>() << "]");
+ try {
+ std::string fedOrigin = i->get<3>();
+ if (!fedOrigin.empty()) {
+ framing::FieldTable fedArguments;
+ fedArguments.setString(qpidFedOp, fedOpUnbind);
+ fedArguments.setString(qpidFedOrigin, fedOrigin);
+ session.getBroker().bind(i->get<0>(), i->get<1>(), i->get<2>(), fedArguments,
+ userID, connectionId);
+ } else {
+ session.getBroker().unbind(i->get<0>(), i->get<1>(), i->get<2>(),
+ userID, connectionId);
+ }
+ }
+ catch (...) {
+ }
+ }
+ bindings.clear();
+}
+
}} // namespace qpid::broker
Modified: qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/SemanticState.h
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/SemanticState.h?rev=1422901&r1=1422900&r2=1422901&view=diff
==============================================================================
--- qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/SemanticState.h (original)
+++ qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/SemanticState.h Mon Dec 17 12:33:05 2012
@@ -46,10 +46,12 @@
#include <list>
#include <map>
+#include <set>
#include <vector>
#include <boost/enable_shared_from_this.hpp>
#include <boost/cast.hpp>
+#include <boost/tuple/tuple.hpp>
namespace qpid {
namespace broker {
@@ -163,7 +165,7 @@ class SemanticState : private boost::non
// manageable entry points
QPID_BROKER_EXTERN management::ManagementObject::shared_ptr
- GetManagementObjectShared(void) const;
+ GetManagementObject(void) const;
QPID_BROKER_EXTERN management::Manageable::status_t
ManagementMethod(uint32_t methodId, management::Args& args, std::string& text);
@@ -173,6 +175,8 @@ class SemanticState : private boost::non
private:
typedef std::map<std::string, ConsumerImpl::shared_ptr> ConsumerImplMap;
+ typedef boost::tuple<std::string, std::string, std::string, std::string> Binding;
+ typedef std::set<Binding> Bindings;
SessionState& session;
ConsumerImplMap consumers;
@@ -190,6 +194,8 @@ class SemanticState : private boost::non
//needed for queue delete events in auto-delete:
const std::string connectionId;
+ Bindings bindings;
+
void checkDtxTimeout();
bool complete(DeliveryRecord&);
@@ -197,6 +203,7 @@ class SemanticState : private boost::non
void requestDispatch();
void cancel(ConsumerImpl::shared_ptr);
void disable(ConsumerImpl::shared_ptr);
+ void unbindSessionBindings();
public:
@@ -271,6 +278,11 @@ class SemanticState : private boost::non
void setAccumulatedAck(const framing::SequenceSet& s) { accumulatedAck = s; }
void record(const DeliveryRecord& delivery);
DtxBufferMap& getSuspendedXids() { return suspendedXids; }
+
+ void addBinding(const std::string& queueName, const std::string& exchangeName,
+ const std::string& routingKey, const framing::FieldTable& arguments);
+ void removeBinding(const std::string& queueName, const std::string& exchangeName,
+ const std::string& routingKey);
};
}} // namespace qpid::broker
Modified: qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/SessionAdapter.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/SessionAdapter.cpp?rev=1422901&r1=1422900&r2=1422901&view=diff
==============================================================================
--- qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/SessionAdapter.cpp (original)
+++ qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/SessionAdapter.cpp Mon Dec 17 12:33:05 2012
@@ -154,12 +154,14 @@ void SessionAdapter::ExchangeHandlerImpl
{
getBroker().bind(queueName, exchangeName, routingKey, arguments,
getConnection().getUserId(), getConnection().getUrl());
+ state.addBinding(queueName, exchangeName, routingKey, arguments);
}
void SessionAdapter::ExchangeHandlerImpl::unbind(const string& queueName,
const string& exchangeName,
const string& routingKey)
{
+ state.removeBinding(queueName, exchangeName, routingKey);
getBroker().unbind(queueName, exchangeName, routingKey,
getConnection().getUserId(), getConnection().getUrl());
}
Modified: qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/SessionState.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/SessionState.cpp?rev=1422901&r1=1422900&r2=1422901&view=diff
==============================================================================
--- qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/SessionState.cpp (original)
+++ qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/SessionState.cpp Mon Dec 17 12:33:05 2012
@@ -65,7 +65,7 @@ SessionState::SessionState(
}
void SessionState::addManagementObject() {
- if (GetManagementObjectShared()) return; // Already added.
+ if (GetManagementObject()) return; // Already added.
Manageable* parent = broker.GetVhostObject ();
if (parent != 0) {
ManagementAgent* agent = getBroker().getManagementAgent();
@@ -127,7 +127,7 @@ void SessionState::attach(SessionHandler
if (mgmtObject != 0)
{
mgmtObject->set_attached (1);
- mgmtObject->set_connectionRef (h.getConnection().GetManagementObjectShared()->getObjectId());
+ mgmtObject->set_connectionRef (h.getConnection().GetManagementObject()->getObjectId());
mgmtObject->set_channelId (h.getChannel());
}
asyncCommandCompleter->attached();
@@ -148,7 +148,7 @@ void SessionState::giveReadCredit(int32_
getConnection().outputTasks.giveReadCredit(credit);
}
-ManagementObject::shared_ptr SessionState::GetManagementObjectShared (void) const
+ManagementObject::shared_ptr SessionState::GetManagementObject(void) const
{
return mgmtObject;
}
Modified: qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/SessionState.h
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/SessionState.h?rev=1422901&r1=1422900&r2=1422901&view=diff
==============================================================================
--- qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/SessionState.h (original)
+++ qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/SessionState.h Mon Dec 17 12:33:05 2012
@@ -110,7 +110,7 @@ class SessionState : public qpid::Sessio
const qpid::types::Variant::Map& annotations, bool sync);
// Manageable entry points
- management::ManagementObject::shared_ptr GetManagementObjectShared (void) const;
+ management::ManagementObject::shared_ptr GetManagementObject(void) const;
management::Manageable::status_t
ManagementMethod (uint32_t methodId, management::Args& args, std::string&);
Modified: qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/System.h
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/System.h?rev=1422901&r1=1422900&r2=1422901&view=diff
==============================================================================
--- qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/System.h (original)
+++ qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/System.h Mon Dec 17 12:33:05 2012
@@ -45,7 +45,7 @@ class System : public management::Manage
System (std::string _dataDir, Broker* broker = 0);
- management::ManagementObject::shared_ptr GetManagementObjectShared (void) const
+ management::ManagementObject::shared_ptr GetManagementObject(void) const
{ return mgmtObject; }
Modified: qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/TopicExchange.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/TopicExchange.cpp?rev=1422901&r1=1422900&r2=1422901&view=diff
==============================================================================
--- qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/TopicExchange.cpp (original)
+++ qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/TopicExchange.cpp Mon Dec 17 12:33:05 2012
@@ -179,7 +179,7 @@ bool TopicExchange::bind(Queue::shared_p
}
}
- Binding::shared_ptr binding (new Binding (routingPattern, queue, this, FieldTable(), fedOrigin));
+ Binding::shared_ptr binding (new Binding (routingPattern, queue, this, args ? *args : FieldTable(), fedOrigin));
binding->startManagement();
bk->bindingVector.push_back(binding);
nBindings++;
Modified: qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/Vhost.h
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/Vhost.h?rev=1422901&r1=1422900&r2=1422901&view=diff
==============================================================================
--- qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/Vhost.h (original)
+++ qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/Vhost.h Mon Dec 17 12:33:05 2012
@@ -40,7 +40,7 @@ class Vhost : public management::Managea
Vhost (management::Manageable* parentBroker, Broker* broker = 0);
- management::ManagementObject::shared_ptr GetManagementObjectShared (void) const
+ management::ManagementObject::shared_ptr GetManagementObject (void) const
{ return mgmtObject; }
void setFederationTag(const std::string& tag);
};
Modified: qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/amqp/ManagedConnection.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/amqp/ManagedConnection.cpp?rev=1422901&r1=1422900&r2=1422901&view=diff
==============================================================================
--- qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/amqp/ManagedConnection.cpp (original)
+++ qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/amqp/ManagedConnection.cpp Mon Dec 17 12:33:05 2012
@@ -73,7 +73,7 @@ void ManagedConnection::setSaslSsf(int s
connection->set_saslSsf(ssf);
}
-qpid::management::ManagementObject::shared_ptr ManagedConnection::GetManagementObjectShared() const
+qpid::management::ManagementObject::shared_ptr ManagedConnection::GetManagementObject() const
{
return connection;
}
Modified: qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/amqp/ManagedConnection.h
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/amqp/ManagedConnection.h?rev=1422901&r1=1422900&r2=1422901&view=diff
==============================================================================
--- qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/amqp/ManagedConnection.h (original)
+++ qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/amqp/ManagedConnection.h Mon Dec 17 12:33:05 2012
@@ -44,7 +44,7 @@ class ManagedConnection : public qpid::m
std::string getUserid() const;
void setSaslMechanism(const std::string&);
void setSaslSsf(int);
- qpid::management::ManagementObject::shared_ptr GetManagementObjectShared() const;
+ qpid::management::ManagementObject::shared_ptr GetManagementObject() const;
bool isLocal(const ConnectionToken* t) const;
void incomingMessageReceived();
void outgoingMessageSent();
Modified: qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/amqp/ManagedOutgoingLink.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/amqp/ManagedOutgoingLink.cpp?rev=1422901&r1=1422900&r2=1422901&view=diff
==============================================================================
--- qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/amqp/ManagedOutgoingLink.cpp (original)
+++ qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/amqp/ManagedOutgoingLink.cpp Mon Dec 17 12:33:05 2012
@@ -37,7 +37,7 @@ ManagedOutgoingLink::ManagedOutgoingLink
{
qpid::management::ManagementAgent* agent = broker.getManagementAgent();
if (agent) {
- subscription = _qmf::Subscription::shared_ptr(new _qmf::Subscription(agent, this, &p, q.GetManagementObjectShared()->getObjectId(), id,
+ subscription = _qmf::Subscription::shared_ptr(new _qmf::Subscription(agent, this, &p, q.GetManagementObject()->getObjectId(), id,
false/*FIXME*/, true/*FIXME*/, topic, qpid::types::Variant::Map()));
agent->addObject(subscription);
subscription->set_creditMode("n/a");
@@ -48,7 +48,7 @@ ManagedOutgoingLink::~ManagedOutgoingLin
if (subscription != 0) subscription->resourceDestroy();
}
-qpid::management::ManagementObject::shared_ptr ManagedOutgoingLink::GetManagementObjectShared() const
+qpid::management::ManagementObject::shared_ptr ManagedOutgoingLink::GetManagementObject() const
{
return subscription;
}
Modified: qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/amqp/ManagedOutgoingLink.h
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/amqp/ManagedOutgoingLink.h?rev=1422901&r1=1422900&r2=1422901&view=diff
==============================================================================
--- qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/amqp/ManagedOutgoingLink.h (original)
+++ qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/amqp/ManagedOutgoingLink.h Mon Dec 17 12:33:05 2012
@@ -39,7 +39,7 @@ class ManagedOutgoingLink : public qpid:
public:
ManagedOutgoingLink(Broker& broker, Queue&, ManagedSession& parent, const std::string id, bool topic);
virtual ~ManagedOutgoingLink();
- qpid::management::ManagementObject::shared_ptr GetManagementObjectShared() const;
+ qpid::management::ManagementObject::shared_ptr GetManagementObject() const;
void outgoingMessageSent();
void outgoingMessageAccepted();
void outgoingMessageRejected();
Modified: qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/amqp/ManagedSession.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/amqp/ManagedSession.cpp?rev=1422901&r1=1422900&r2=1422901&view=diff
==============================================================================
--- qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/amqp/ManagedSession.cpp (original)
+++ qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/amqp/ManagedSession.cpp Mon Dec 17 12:33:05 2012
@@ -38,7 +38,7 @@ ManagedSession::ManagedSession(Broker& b
session->set_attached(true);
session->set_detachedLifespan(0);
session->clr_expireTime();
- session->set_connectionRef(parent.GetManagementObjectShared()->getObjectId());
+ session->set_connectionRef(parent.GetManagementObject()->getObjectId());
agent->addObject(session);
}
}
@@ -48,7 +48,7 @@ ManagedSession::~ManagedSession()
if (session) session->resourceDestroy();
}
-qpid::management::ManagementObject::shared_ptr ManagedSession::GetManagementObjectShared() const
+qpid::management::ManagementObject::shared_ptr ManagedSession::GetManagementObject() const
{
return session;
}
Modified: qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/amqp/ManagedSession.h
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/amqp/ManagedSession.h?rev=1422901&r1=1422900&r2=1422901&view=diff
==============================================================================
--- qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/amqp/ManagedSession.h (original)
+++ qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/broker/amqp/ManagedSession.h Mon Dec 17 12:33:05 2012
@@ -40,7 +40,7 @@ class ManagedSession : public qpid::mana
public:
ManagedSession(Broker& broker, ManagedConnection& parent, const std::string id);
virtual ~ManagedSession();
- qpid::management::ManagementObject::shared_ptr GetManagementObjectShared() const;
+ qpid::management::ManagementObject::shared_ptr GetManagementObject() const;
bool isLocal(const ConnectionToken* t) const;
void incomingMessageReceived();
void incomingMessageAccepted();
Modified: qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp?rev=1422901&r1=1422900&r2=1422901&view=diff
==============================================================================
--- qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp (original)
+++ qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp Mon Dec 17 12:33:05 2012
@@ -534,17 +534,19 @@ void BrokerReplicator::doEventBind(Varia
exchanges.find(values[EXNAME].asString());
boost::shared_ptr<Queue> queue =
queues.find(values[QNAME].asString());
+ framing::FieldTable args;
+ qpid::amqp_0_10::translate(asMapVoid(values[ARGS]), args);
// We only replicate binds for a replicated queue to replicated
// exchange that both exist locally.
if (exchange && replicationTest.replicateLevel(exchange->getArgs()) &&
- queue && replicationTest.replicateLevel(queue->getSettings().storeSettings))
+ queue && replicationTest.replicateLevel(queue->getSettings().storeSettings) &&
+ replicationTest.replicateLevel(args))
{
- framing::FieldTable args;
- qpid::amqp_0_10::translate(asMapVoid(values[ARGS]), args);
string key = values[KEY].asString();
QPID_LOG(debug, logPrefix << "Bind event: exchange=" << exchange->getName()
<< " queue=" << queue->getName()
- << " key=" << key);
+ << " key=" << key
+ << " args=" << args);
queue->bind(exchange, key, args);
}
}
@@ -559,13 +561,11 @@ void BrokerReplicator::doEventUnbind(Var
if (exchange && replicationTest.replicateLevel(exchange->getArgs()) &&
queue && replicationTest.replicateLevel(queue->getSettings().storeSettings))
{
- framing::FieldTable args;
- qpid::amqp_0_10::translate(asMapVoid(values[ARGS]), args);
string key = values[KEY].asString();
QPID_LOG(debug, logPrefix << "Unbind event: exchange=" << exchange->getName()
<< " queue=" << queue->getName()
<< " key=" << key);
- exchange->unbind(queue, key, &args);
+ exchange->unbind(queue, key, 0);
}
}
@@ -692,16 +692,19 @@ void BrokerReplicator::doResponseBind(Va
boost::shared_ptr<Exchange> exchange = exchanges.find(exName);
boost::shared_ptr<Queue> queue = queues.find(qName);
+ framing::FieldTable args;
+ qpid::amqp_0_10::translate(asMapVoid(values[ARGUMENTS]), args);
+
// Automatically replicate binding if queue and exchange exist and are replicated
if (exchange && replicationTest.replicateLevel(exchange->getArgs()) &&
- queue && replicationTest.replicateLevel(queue->getSettings().storeSettings))
+ queue && replicationTest.replicateLevel(queue->getSettings().storeSettings) &&
+ replicationTest.replicateLevel(args))
{
string key = values[BINDING_KEY].asString();
QPID_LOG(debug, logPrefix << "Bind response: exchange:" << exName
<< " queue:" << qName
- << " key:" << key);
- framing::FieldTable args;
- qpid::amqp_0_10::translate(asMapVoid(values[ARGUMENTS]), args);
+ << " key:" << key
+ << " args:" << args);
queue->bind(exchange, key, args);
}
}
@@ -837,6 +840,13 @@ void BrokerReplicator::autoDeleteCheck(b
}
}
+// Callback function for accumulating exchange candidates
+namespace {
+ void exchangeAccumulatorCallback(vector<boost::shared_ptr<Exchange> >& c, const Exchange::shared_ptr& i) {
+ c.push_back(i);
+ }
+}
+
void BrokerReplicator::disconnected() {
QPID_LOG(info, logPrefix << "Disconnected");
connection = 0;
@@ -844,7 +854,7 @@ void BrokerReplicator::disconnected() {
vector<boost::shared_ptr<Exchange> > collect;
// Make a copy so we can work outside the ExchangeRegistry lock
exchanges.eachExchange(
- boost::bind(&vector<boost::shared_ptr<Exchange> >::push_back, ref(collect), _1));
+ boost::bind(&exchangeAccumulatorCallback, boost::ref(collect), _1));
for_each(collect.begin(), collect.end(),
boost::bind(&BrokerReplicator::autoDeleteCheck, this, _1));
}
Modified: qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/ha/HaBroker.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/ha/HaBroker.cpp?rev=1422901&r1=1422900&r2=1422901&view=diff
==============================================================================
--- qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/ha/HaBroker.cpp (original)
+++ qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/ha/HaBroker.cpp Mon Dec 17 12:33:05 2012
@@ -79,6 +79,11 @@ HaBroker::HaBroker(broker::Broker& b, co
}
}
+namespace {
+const std::string NONE("none");
+bool isNone(const std::string& x) { return x.empty() || x == NONE; }
+}
+
// Called in Plugin::initialize
void HaBroker::initialize() {
@@ -110,11 +115,10 @@ void HaBroker::initialize() {
backup.reset(new Backup(*this, settings));
broker.getKnownBrokers = boost::bind(&HaBroker::getKnownBrokers, this);
statusCheck.reset(new StatusCheck(logPrefix, broker.getLinkHearbeatInterval(), brokerInfo));
+ if (!isNone(settings.publicUrl)) setPublicUrl(Url(settings.publicUrl));
+ if (!isNone(settings.brokerUrl)) setBrokerUrl(Url(settings.brokerUrl));
}
- if (!settings.clientUrl.empty()) setClientUrl(Url(settings.clientUrl));
- if (!settings.brokerUrl.empty()) setBrokerUrl(Url(settings.brokerUrl));
-
// NOTE: lock is not needed in a constructor, but create one
// to pass to functions that have a ScopedLock parameter.
@@ -182,7 +186,7 @@ Manageable::status_t HaBroker::Managemen
break;
}
case _qmf::HaBroker::METHOD_SETPUBLICURL: {
- setClientUrl(Url(dynamic_cast<_qmf::ArgsHaBrokerSetPublicUrl&>(args).i_url));
+ setPublicUrl(Url(dynamic_cast<_qmf::ArgsHaBrokerSetPublicUrl&>(args).i_url));
break;
}
case _qmf::HaBroker::METHOD_REPLICATE: {
@@ -217,19 +221,13 @@ Manageable::status_t HaBroker::Managemen
return Manageable::STATUS_OK;
}
-void HaBroker::setClientUrl(const Url& url) {
+void HaBroker::setPublicUrl(const Url& url) {
Mutex::ScopedLock l(lock);
- if (url.empty()) throw Exception("Invalid empty URL for HA client failover");
- clientUrl = url;
- updateClientUrl(l);
-}
-
-void HaBroker::updateClientUrl(Mutex::ScopedLock&) {
- Url url = clientUrl.empty() ? brokerUrl : clientUrl;
+ publicUrl = url;
mgmtObject->set_publicUrl(url.str());
knownBrokers.clear();
knownBrokers.push_back(url);
- QPID_LOG(debug, logPrefix << "Setting client URL to: " << url);
+ QPID_LOG(debug, logPrefix << "Setting public URL to: " << url);
}
void HaBroker::setBrokerUrl(const Url& url) {
@@ -238,10 +236,8 @@ void HaBroker::setBrokerUrl(const Url& u
Mutex::ScopedLock l(lock);
brokerUrl = url;
mgmtObject->set_brokersUrl(brokerUrl.str());
- QPID_LOG(info, logPrefix << "Broker URL set to: " << url);
+ QPID_LOG(info, logPrefix << "Brokers URL set to: " << url);
if (status == JOINING && statusCheck.get()) statusCheck->setUrl(url);
- // Updating broker URL also updates defaulted client URL:
- if (clientUrl.empty()) updateClientUrl(l);
b = backup;
}
if (b) b->setBrokerUrl(url); // Oustside lock, avoid deadlock
Modified: qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/ha/HaBroker.h
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/ha/HaBroker.h?rev=1422901&r1=1422900&r2=1422901&view=diff
==============================================================================
--- qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/ha/HaBroker.h (original)
+++ qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/ha/HaBroker.h Mon Dec 17 12:33:05 2012
@@ -71,7 +71,7 @@ class HaBroker : public management::Mana
void initialize();
// Implement Manageable.
- qpid::management::ManagementObject::shared_ptr GetManagementObjectShared() const { return mgmtObject; }
+ qpid::management::ManagementObject::shared_ptr GetManagementObject() const { return mgmtObject; }
management::Manageable::status_t ManagementMethod (
uint32_t methodId, management::Args& args, std::string& text);
@@ -100,7 +100,7 @@ class HaBroker : public management::Mana
types::Uuid getSystemId() const { return systemId; }
private:
- void setClientUrl(const Url&);
+ void setPublicUrl(const Url&);
void setBrokerUrl(const Url&);
void updateClientUrl(sys::Mutex::ScopedLock&);
@@ -125,7 +125,7 @@ class HaBroker : public management::Mana
boost::shared_ptr<Backup> backup;
boost::shared_ptr<Primary> primary;
qmf::org::apache::qpid::ha::HaBroker::shared_ptr mgmtObject;
- Url clientUrl, brokerUrl;
+ Url publicUrl, brokerUrl;
std::vector<Url> knownBrokers;
BrokerStatus status;
BrokerInfo brokerInfo;
Modified: qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/ha/HaPlugin.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/ha/HaPlugin.cpp?rev=1422901&r1=1422900&r2=1422901&view=diff
==============================================================================
--- qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/ha/HaPlugin.cpp (original)
+++ qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/ha/HaPlugin.cpp Mon Dec 17 12:33:05 2012
@@ -33,9 +33,11 @@ struct Options : public qpid::Options {
addOptions()
("ha-cluster", optValue(settings.cluster, "yes|no"),
"Join a HA active/passive cluster.")
+ ("ha-queue-replication", optValue(settings.queueReplication, "yes|no"),
+ "Enable replication of specific queues without joining a cluster")
("ha-brokers-url", optValue(settings.brokerUrl,"URL"),
"URL with address of each broker in the cluster.")
- ("ha-public-url", optValue(settings.clientUrl,"URL"),
+ ("ha-public-url", optValue(settings.publicUrl,"URL"),
"URL advertized to clients to connect to the cluster.")
("ha-replicate",
optValue(settings.replicateDefault, "LEVEL"),
@@ -68,7 +70,7 @@ struct HaPlugin : public Plugin {
void earlyInitialize(Plugin::Target& target) {
broker::Broker* broker = dynamic_cast<broker::Broker*>(&target);
- if (broker) {
+ if (broker && (settings.cluster || settings.queueReplication)) {
if (!broker->getManagementAgent()) {
QPID_LOG(info, "HA plugin disabled because management is disabled");
if (settings.cluster)
Modified: qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/ha/Settings.h
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/ha/Settings.h?rev=1422901&r1=1422900&r2=1422901&view=diff
==============================================================================
--- qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/ha/Settings.h (original)
+++ qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/ha/Settings.h Mon Dec 17 12:33:05 2012
@@ -35,12 +35,14 @@ namespace ha {
class Settings
{
public:
- Settings() : cluster(false), replicateDefault(NONE), backupTimeout(5),
+ Settings() : cluster(false), queueReplication(false),
+ replicateDefault(NONE), backupTimeout(5),
flowMessages(100), flowBytes(0)
{}
bool cluster; // True if we are a cluster member.
- std::string clientUrl;
+ bool queueReplication; // True if enabled.
+ std::string publicUrl;
std::string brokerUrl;
Enum<ReplicateLevel> replicateDefault;
std::string username, password, mechanism;
Modified: qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/management/Manageable.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/management/Manageable.cpp?rev=1422901&r1=1422900&r2=1422901&view=diff
==============================================================================
--- qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/management/Manageable.cpp (original)
+++ qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/management/Manageable.cpp Mon Dec 17 12:33:05 2012
@@ -41,16 +41,6 @@ string Manageable::StatusText (status_t
return "??";
}
-ManagementObject* Manageable::GetManagementObject(void) const
-{
- return 0;
-}
-
-ManagementObject::shared_ptr Manageable::GetManagementObjectShared() const
-{
- return ManagementObject::shared_ptr();
-}
-
Manageable::status_t Manageable::ManagementMethod (uint32_t, Args&, std::string&)
{
return STATUS_UNKNOWN_METHOD;
Modified: qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/management/ManagementAgent.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/management/ManagementAgent.cpp?rev=1422901&r1=1422900&r2=1422901&view=diff
==============================================================================
--- qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/management/ManagementAgent.cpp (original)
+++ qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/management/ManagementAgent.cpp Mon Dec 17 12:33:05 2012
@@ -698,7 +698,7 @@ void ManagementAgent::periodicProcessing
//
if (publish) {
uint64_t uptime = sys::Duration(startTime, sys::now());
- boost::dynamic_pointer_cast<_qmf::Broker>(broker->GetManagementObjectShared())->set_uptime(uptime);
+ boost::dynamic_pointer_cast<_qmf::Broker>(broker->GetManagementObject())->set_uptime(uptime);
qpid::sys::MemStat::loadMemInfo(memstat.get());
}
@@ -1722,7 +1722,7 @@ void ManagementAgent::handleAttachReques
string label;
uint32_t requestedBrokerBank, requestedAgentBank;
uint32_t assignedBank;
- ObjectId connectionRef = ((const ConnectionState*) connToken)->GetManagementObjectShared()->getObjectId();
+ ObjectId connectionRef = ((const ConnectionState*) connToken)->GetManagementObject()->getObjectId();
Uuid systemId;
moveNewObjects();
@@ -1754,7 +1754,7 @@ void ManagementAgent::handleAttachReques
agent->mgmtObject = _qmf::Agent::shared_ptr(new _qmf::Agent (this, agent.get()));
agent->mgmtObject->set_connectionRef(agent->connectionRef);
agent->mgmtObject->set_label (label);
- agent->mgmtObject->set_registeredTo (broker->GetManagementObjectShared()->getObjectId());
+ agent->mgmtObject->set_registeredTo (broker->GetManagementObject()->getObjectId());
agent->mgmtObject->set_systemId ((const unsigned char*)systemId.data());
agent->mgmtObject->set_brokerBank (brokerBank);
agent->mgmtObject->set_agentBank (assignedBank);
@@ -1831,7 +1831,7 @@ void ManagementAgent::handleGetQuery(Buf
if (className == "broker") {
uint64_t uptime = sys::Duration(startTime, sys::now());
- boost::dynamic_pointer_cast<_qmf::Broker>(broker->GetManagementObjectShared())->set_uptime(uptime);
+ boost::dynamic_pointer_cast<_qmf::Broker>(broker->GetManagementObject())->set_uptime(uptime);
}
@@ -1945,7 +1945,7 @@ void ManagementAgent::handleGetQuery(con
if (className == "broker") {
uint64_t uptime = sys::Duration(startTime, sys::now());
- boost::dynamic_pointer_cast<_qmf::Broker>(broker->GetManagementObjectShared())->set_uptime(uptime);
+ boost::dynamic_pointer_cast<_qmf::Broker>(broker->GetManagementObject())->set_uptime(uptime);
}
/*
Propchange: qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/management/ManagementAgent.cpp
------------------------------------------------------------------------------
Merged /qpid/trunk/qpid/cpp/src/qpid/management/ManagementAgent.cpp:r1415149-1422060
Modified: qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/management/ManagementAgent.h
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/management/ManagementAgent.h?rev=1422901&r1=1422900&r2=1422901&view=diff
==============================================================================
--- qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/management/ManagementAgent.h (original)
+++ qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/management/ManagementAgent.h Mon Dec 17 12:33:05 2012
@@ -211,7 +211,7 @@ private:
ObjectId connectionRef;
qmf::org::apache::qpid::broker::Agent::shared_ptr mgmtObject;
RemoteAgent(ManagementAgent& _agent) : agent(_agent) {}
- ManagementObject::shared_ptr GetManagementObjectShared (void) const { return mgmtObject; }
+ ManagementObject::shared_ptr GetManagementObject (void) const { return mgmtObject; }
virtual ~RemoteAgent ();
void mapEncode(qpid::types::Variant::Map& _map) const;
Propchange: qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/management/ManagementAgent.h
------------------------------------------------------------------------------
Merged /qpid/trunk/qpid/cpp/src/qpid/management/ManagementAgent.h:r1415149-1422060
Modified: qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/store/CMakeLists.txt
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/store/CMakeLists.txt?rev=1422901&r1=1422900&r2=1422901&view=diff
==============================================================================
--- qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/store/CMakeLists.txt (original)
+++ qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/qpid/store/CMakeLists.txt Mon Dec 17 12:33:05 2012
@@ -42,6 +42,7 @@ if (CMAKE_COMPILER_IS_GNUCXX)
set_target_properties (store PROPERTIES
PREFIX ""
+ COMPILE_DEFINITIONS _IN_QPID_BROKER
LINK_FLAGS "${GCC_CATCH_UNDEFINED}")
endif (CMAKE_COMPILER_IS_GNUCXX)
@@ -54,7 +55,9 @@ if (CMAKE_SYSTEM_NAME STREQUAL Windows)
endif (MSVC)
endif (CMAKE_SYSTEM_NAME STREQUAL Windows)
-set_target_properties (store PROPERTIES VERSION ${qpidc_version})
+set_target_properties (store PROPERTIES
+ COMPILE_DEFINITIONS _IN_QPID_BROKER
+ VERSION ${qpidc_version})
install (TARGETS store # RUNTIME
DESTINATION ${QPIDD_MODULE_DIR}
COMPONENT ${QPID_COMPONENT_BROKER})
@@ -81,6 +84,7 @@ if (BUILD_MSSQL)
ms-sql/State.cpp
ms-sql/TplRecordset.cpp
ms-sql/VariantHelper.cpp)
+ set_target_properties (mssql_store PROPERTIES COMPILE_DEFINITIONS _IN_QPID_BROKER)
target_link_libraries (mssql_store qpidbroker qpidcommon ${Boost_PROGRAM_OPTIONS_LIBRARY})
install (TARGETS mssql_store # RUNTIME
DESTINATION ${QPIDD_MODULE_DIR}
@@ -110,6 +114,7 @@ if (BUILD_MSCLFS)
ms-sql/State.cpp
ms-sql/VariantHelper.cpp)
include_directories(ms-sql)
+ set_target_properties (msclfs_store PROPERTIES COMPILE_DEFINITIONS _IN_QPID_BROKER)
target_link_libraries (msclfs_store qpidbroker qpidcommon ${Boost_PROGRAM_OPTIONS_LIBRARY} clfsw32.lib)
install (TARGETS msclfs_store # RUNTIME
DESTINATION ${QPIDD_MODULE_DIR}
Modified: qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/rdma.cmake
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/rdma.cmake?rev=1422901&r1=1422900&r2=1422901&view=diff
==============================================================================
--- qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/rdma.cmake (original)
+++ qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/rdma.cmake Mon Dec 17 12:33:05 2012
@@ -79,6 +79,7 @@ if (BUILD_RDMA)
add_library (rdma MODULE qpid/sys/RdmaIOPlugin.cpp)
target_link_libraries (rdma qpidbroker rdmawrap)
set_target_properties (rdma PROPERTIES
+ COMPILE_DEFINITIONS _IN_QPID_BROKER
LINK_FLAGS "${CATCH_UNDEFINED}"
PREFIX "")
Modified: qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/ssl.cmake
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/ssl.cmake?rev=1422901&r1=1422900&r2=1422901&view=diff
==============================================================================
--- qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/ssl.cmake (original)
+++ qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/ssl.cmake Mon Dec 17 12:33:05 2012
@@ -90,7 +90,8 @@ if (BUILD_SSL)
target_link_libraries (ssl qpidbroker sslcommon ${Boost_PROGRAM_OPTIONS_LIBRARY})
set_target_properties (ssl PROPERTIES
PREFIX ""
- COMPILE_FLAGS ${NSS_COMPILE_FLAGS})
+ COMPILE_FLAGS "${NSS_COMPILE_FLAGS}"
+ COMPILE_DEFINITIONS _IN_QPID_BROKER)
if (CMAKE_COMPILER_IS_GNUCXX)
set_target_properties(ssl PROPERTIES
LINK_FLAGS "${GCC_CATCH_UNDEFINED}")
Modified: qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/ssl.mk
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/ssl.mk?rev=1422901&r1=1422900&r2=1422901&view=diff
==============================================================================
--- qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/ssl.mk (original)
+++ qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/ssl.mk Mon Dec 17 12:33:05 2012
@@ -39,7 +39,7 @@ ssl_la_SOURCES = \
ssl_la_LIBADD= libqpidbroker.la libsslcommon.la
-ssl_la_CXXFLAGS=$(AM_CXXFLAGS) $(SSL_CFLAGS)
+ssl_la_CXXFLAGS=$(AM_CXXFLAGS) $(SSL_CFLAGS) -D_IN_QPID_BROKER
ssl_la_LDFLAGS = $(PLUGINLDFLAGS)
Propchange: qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/tests/
------------------------------------------------------------------------------
Merged /qpid/trunk/qpid/cpp/src/tests:r1415149-1422060
Modified: qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/tests/BrokerMgmtAgent.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/tests/BrokerMgmtAgent.cpp?rev=1422901&r1=1422900&r2=1422901&view=diff
==============================================================================
--- qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/tests/BrokerMgmtAgent.cpp (original)
+++ qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/tests/BrokerMgmtAgent.cpp Mon Dec 17 12:33:05 2012
@@ -123,7 +123,7 @@ class TestManageable : public qpid::mana
mgmtObj = tmp;
};
~TestManageable() { mgmtObj.reset(); }
- management::ManagementObject::shared_ptr GetManagementObjectShared() const { return mgmtObj; };
+ management::ManagementObject::shared_ptr GetManagementObject() const { return mgmtObj; };
static void validateTestObjectProperties(_qmf::TestObject& to)
{
// verify the default values are as expected. We don't check 'string1',
@@ -209,11 +209,11 @@ QPID_AUTO_TEST_CASE(v1ObjPublish)
// create a manageable test object
TestManageable *tm = new TestManageable(agent, std::string("obj1"));
- uint32_t objLen = tm->GetManagementObjectShared()->writePropertiesSize();
+ uint32_t objLen = tm->GetManagementObject()->writePropertiesSize();
Receiver r1 = fix->createV1DataIndRcvr("org.apache.qpid.broker.mgmt.test", "#");
- agent->addObject(tm->GetManagementObjectShared(), 1);
+ agent->addObject(tm->GetManagementObject(), 1);
// wait for the object to be published
Message m1;
@@ -234,7 +234,7 @@ QPID_AUTO_TEST_CASE(v1ObjPublish)
// destroy the object
- tm->GetManagementObjectShared()->resourceDestroy();
+ tm->GetManagementObject()->resourceDestroy();
// wait for the deleted object to be published
@@ -272,9 +272,9 @@ QPID_AUTO_TEST_CASE(v2ObjPublish)
TestManageable *tm = new TestManageable(agent, std::string("obj2"));
- Receiver r1 = fix->createV2DataIndRcvr(tm->GetManagementObjectShared()->getPackageName(), "#");
+ Receiver r1 = fix->createV2DataIndRcvr(tm->GetManagementObject()->getPackageName(), "#");
- agent->addObject(tm->GetManagementObjectShared(), "testobj-1");
+ agent->addObject(tm->GetManagementObject(), "testobj-1");
// wait for the object to be published
Message m1;
@@ -295,7 +295,7 @@ QPID_AUTO_TEST_CASE(v2ObjPublish)
// destroy the object
- tm->GetManagementObjectShared()->resourceDestroy();
+ tm->GetManagementObject()->resourceDestroy();
// wait for the deleted object to be published
@@ -335,11 +335,11 @@ QPID_AUTO_TEST_CASE(v1ExportDelObj)
// create a manageable test object
TestManageable *tm = new TestManageable(agent, std::string("myObj"));
- uint32_t objLen = tm->GetManagementObjectShared()->writePropertiesSize();
+ uint32_t objLen = tm->GetManagementObject()->writePropertiesSize();
Receiver r1 = fix->createV1DataIndRcvr("org.apache.qpid.broker.mgmt.test", "#");
- agent->addObject(tm->GetManagementObjectShared(), 1);
+ agent->addObject(tm->GetManagementObject(), 1);
// wait for the object to be published
Message m1;
@@ -352,7 +352,7 @@ QPID_AUTO_TEST_CASE(v1ExportDelObj)
// destroy the object, then immediately export (before the next poll cycle)
::qpid::management::ManagementAgent::DeletedObjectList delObjs;
- tm->GetManagementObjectShared()->resourceDestroy();
+ tm->GetManagementObject()->resourceDestroy();
agent->exportDeletedObjects( delObjs );
BOOST_CHECK(delObjs.size() == 1);
@@ -399,11 +399,11 @@ QPID_AUTO_TEST_CASE(v1ImportDelObj)
// create a manageable test object
TestManageable *tm = new TestManageable(agent, std::string("anObj"));
- uint32_t objLen = tm->GetManagementObjectShared()->writePropertiesSize();
+ uint32_t objLen = tm->GetManagementObject()->writePropertiesSize();
Receiver r1 = fix->createV1DataIndRcvr("org.apache.qpid.broker.mgmt.test", "#");
- agent->addObject(tm->GetManagementObjectShared(), 1);
+ agent->addObject(tm->GetManagementObject(), 1);
// wait for the object to be published
Message m1;
@@ -416,7 +416,7 @@ QPID_AUTO_TEST_CASE(v1ImportDelObj)
// destroy the object, then immediately export (before the next poll cycle)
::qpid::management::ManagementAgent::DeletedObjectList delObjs;
- tm->GetManagementObjectShared()->resourceDestroy();
+ tm->GetManagementObject()->resourceDestroy();
agent->exportDeletedObjects( delObjs );
BOOST_CHECK(delObjs.size() == 1);
@@ -478,8 +478,8 @@ QPID_AUTO_TEST_CASE(v1ExportFastDelObj)
// add, then immediately delete and export the object...
::qpid::management::ManagementAgent::DeletedObjectList delObjs;
- agent->addObject(tm->GetManagementObjectShared(), 999);
- tm->GetManagementObjectShared()->resourceDestroy();
+ agent->addObject(tm->GetManagementObject(), 999);
+ tm->GetManagementObject()->resourceDestroy();
agent->exportDeletedObjects( delObjs );
BOOST_CHECK(delObjs.size() == 1);
@@ -511,8 +511,8 @@ QPID_AUTO_TEST_CASE(v1ImportMultiDelObj)
// FOR ALL OBJECTS, so objLen will be the same. Otherwise the
// decodeV1ObjectUpdates() will fail (v1 lacks explict encoded length).
TestManageable *tm = new TestManageable(agent, key.str());
- objLen = tm->GetManagementObjectShared()->writePropertiesSize();
- agent->addObject(tm->GetManagementObjectShared(), i + 1);
+ objLen = tm->GetManagementObject()->writePropertiesSize();
+ agent->addObject(tm->GetManagementObject(), i + 1);
tmv.push_back(tm);
}
@@ -531,7 +531,7 @@ QPID_AUTO_TEST_CASE(v1ImportMultiDelObj)
uint32_t delCount = 0;
for (size_t i = 0; i < objCount; i += 2) {
- tmv[i]->GetManagementObjectShared()->resourceDestroy();
+ tmv[i]->GetManagementObject()->resourceDestroy();
delCount++;
}
@@ -604,8 +604,8 @@ QPID_AUTO_TEST_CASE(v2ImportMultiDelObj)
std::stringstream key;
key << "testobj-" << i;
TestManageable *tm = new TestManageable(agent, key.str());
- if (tm->GetManagementObjectShared()->writePropertiesSize()) {}
- agent->addObject(tm->GetManagementObjectShared(), key.str());
+ if (tm->GetManagementObject()->writePropertiesSize()) {}
+ agent->addObject(tm->GetManagementObject(), key.str());
tmv.push_back(tm);
}
@@ -624,7 +624,7 @@ QPID_AUTO_TEST_CASE(v2ImportMultiDelObj)
uint32_t delCount = 0;
for (size_t i = 0; i < objCount; i += 2) {
- tmv[i]->GetManagementObjectShared()->resourceDestroy();
+ tmv[i]->GetManagementObject()->resourceDestroy();
delCount++;
}
@@ -689,12 +689,12 @@ QPID_AUTO_TEST_CASE(v2RapidRestoreObj)
TestManageable *tm1 = new TestManageable(agent, std::string("obj2"));
TestManageable *tm2 = new TestManageable(agent, std::string("obj2"));
- Receiver r1 = fix->createV2DataIndRcvr(tm1->GetManagementObjectShared()->getPackageName(), "#");
+ Receiver r1 = fix->createV2DataIndRcvr(tm1->GetManagementObject()->getPackageName(), "#");
// add, then immediately delete and re-add a copy of the object
- agent->addObject(tm1->GetManagementObjectShared(), "testobj-1");
- tm1->GetManagementObjectShared()->resourceDestroy();
- agent->addObject(tm2->GetManagementObjectShared(), "testobj-1");
+ agent->addObject(tm1->GetManagementObject(), "testobj-1");
+ tm1->GetManagementObject()->resourceDestroy();
+ agent->addObject(tm2->GetManagementObject(), "testobj-1");
// expect: a delete notification, then an update notification
TestObjectVector objs;
Modified: qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/tests/CMakeLists.txt
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/tests/CMakeLists.txt?rev=1422901&r1=1422900&r2=1422901&view=diff
==============================================================================
--- qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/tests/CMakeLists.txt (original)
+++ qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/tests/CMakeLists.txt Mon Dec 17 12:33:05 2012
@@ -158,6 +158,7 @@ add_executable (unit_test unit_test
target_link_libraries (unit_test
${qpid_test_boost_libs}
qpidmessaging qpidbroker qmfconsole)
+set_target_properties (unit_test PROPERTIES COMPILE_DEFINITIONS _IN_QPID_BROKER)
remember_location(unit_test)
add_library (shlibtest MODULE shlibtest.cpp)
@@ -327,7 +328,9 @@ endif (PYTHON_EXECUTABLE)
add_library(test_store MODULE test_store.cpp)
target_link_libraries (test_store qpidbroker qpidcommon)
-set_target_properties (test_store PROPERTIES PREFIX "")
+set_target_properties (test_store PROPERTIES
+ COMPILE_DEFINITIONS _IN_QPID_BROKER
+ PREFIX "")
add_library (dlclose_noop MODULE dlclose_noop.c)
Modified: qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/tests/Makefile.am
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/tests/Makefile.am?rev=1422901&r1=1422900&r2=1422901&view=diff
==============================================================================
--- qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/tests/Makefile.am (original)
+++ qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/tests/Makefile.am Mon Dec 17 12:33:05 2012
@@ -17,7 +17,7 @@
# under the License.
#
-AM_CXXFLAGS = $(WARNING_CFLAGS) -DBOOST_TEST_DYN_LINK
+AM_CXXFLAGS = $(WARNING_CFLAGS) -DBOOST_TEST_DYN_LINK -D_IN_QPID_BROKER
INCLUDES = -I$(top_srcdir)/include -I$(top_builddir)/include -I$(top_srcdir)/src -I$(top_builddir)/src
PUBLIC_INCLUDES = -I$(top_srcdir)/include -I$(top_builddir)/include # Use public API only
QMF_GEN=$(top_srcdir)/managementgen/qmf-gen
@@ -28,6 +28,7 @@ abs_srcdir=@abs_srcdir@
extra_libs =
lib_client = $(abs_builddir)/../libqpidclient.la
lib_messaging = $(abs_builddir)/../libqpidmessaging.la
+lib_types = $(abs_builddir)/../libqpidtypes.la
lib_common = $(abs_builddir)/../libqpidcommon.la
lib_broker = $(abs_builddir)/../libqpidbroker.la
lib_console = $(abs_builddir)/../libqmfconsole.la
@@ -154,7 +155,7 @@ receiver_SOURCES = \
receiver.cpp \
TestOptions.h \
ConnectionOptions.h
-receiver_LDADD = $(lib_client) -lboost_program_options -lqpidcommon
+receiver_LDADD = $(lib_client) -lboost_program_options $(lib_common)
qpidexectest_PROGRAMS += sender
sender_SOURCES = \
@@ -162,7 +163,7 @@ sender_SOURCES = \
TestOptions.h \
ConnectionOptions.h \
Statistics.cpp
-sender_LDADD = $(lib_messaging) -lboost_program_options -lqpidcommon -lqpidtypes -lqpidclient
+sender_LDADD = $(lib_messaging) -lboost_program_options $(lib_common) $(lib_types) $(lib_client)
qpidexectest_PROGRAMS += qpid-receive
qpid_receive_SOURCES = \
@@ -171,7 +172,7 @@ qpid_receive_SOURCES = \
ConnectionOptions.h \
Statistics.h \
Statistics.cpp
-qpid_receive_LDADD = $(lib_messaging) -lboost_program_options -lqpidcommon -lqpidtypes
+qpid_receive_LDADD = $(lib_messaging) -lboost_program_options $(lib_common) $(lib_types)
qpidexectest_PROGRAMS += qpid-send
qpid_send_SOURCES = \
@@ -180,42 +181,42 @@ qpid_send_SOURCES = \
ConnectionOptions.h \
Statistics.h \
Statistics.cpp
-qpid_send_LDADD = $(lib_messaging) -lboost_program_options -lqpidcommon -lqpidtypes
+qpid_send_LDADD = $(lib_messaging) -lboost_program_options $(lib_common) $(lib_types)
qpidexectest_PROGRAMS+=qpid-perftest
qpid_perftest_SOURCES=qpid-perftest.cpp test_tools.h TestOptions.h ConnectionOptions.h
qpid_perftest_INCLUDES=$(PUBLIC_INCLUDES)
-qpid_perftest_LDADD=$(lib_client) -lboost_program_options -lqpidcommon
+qpid_perftest_LDADD=$(lib_client) -lboost_program_options $(lib_common)
qpidexectest_PROGRAMS+=qpid-txtest
qpid_txtest_INCLUDES=$(PUBLIC_INCLUDES)
qpid_txtest_SOURCES=qpid-txtest.cpp TestOptions.h ConnectionOptions.h
-qpid_txtest_LDADD=$(lib_client) -lboost_program_options -lqpidcommon
+qpid_txtest_LDADD=$(lib_client) -lboost_program_options $(lib_common)
qpidexectest_PROGRAMS+=qpid-latency-test
qpid_latency_test_INCLUDES=$(PUBLIC_INCLUDES)
qpid_latency_test_SOURCES=qpid-latency-test.cpp TestOptions.h ConnectionOptions.h
-qpid_latency_test_LDADD=$(lib_client) -lboost_program_options -lqpidcommon
+qpid_latency_test_LDADD=$(lib_client) -lboost_program_options $(lib_common)
qpidexectest_PROGRAMS+=qpid-client-test
qpid_client_test_INCLUDES=$(PUBLIC_INCLUDES)
qpid_client_test_SOURCES=qpid-client-test.cpp TestOptions.h ConnectionOptions.h
-qpid_client_test_LDADD=$(lib_client) -lboost_program_options -lqpidcommon
+qpid_client_test_LDADD=$(lib_client) -lboost_program_options $(lib_common)
qpidexectest_PROGRAMS+=qpid-topic-listener
qpid_topic_listener_INCLUDES=$(PUBLIC_INCLUDES)
qpid_topic_listener_SOURCES=qpid-topic-listener.cpp TestOptions.h ConnectionOptions.h
-qpid_topic_listener_LDADD=$(lib_client) -lboost_program_options -lqpidcommon
+qpid_topic_listener_LDADD=$(lib_client) -lboost_program_options $(lib_common)
qpidexectest_PROGRAMS+=qpid-topic-publisher
qpid_topic_publisher_INCLUDES=$(PUBLIC_INCLUDES)
qpid_topic_publisher_SOURCES=qpid-topic-publisher.cpp TestOptions.h ConnectionOptions.h
-qpid_topic_publisher_LDADD=$(lib_client) -lboost_program_options -lqpidcommon
+qpid_topic_publisher_LDADD=$(lib_client) -lboost_program_options $(lib_common)
qpidexectest_PROGRAMS+=qpid-ping
qpid_ping_INCLUDES=$(PUBLIC_INCLUDES)
qpid_ping_SOURCES=qpid-ping.cpp test_tools.h TestOptions.h ConnectionOptions.h
-qpid_ping_LDADD=$(lib_client) -lboost_program_options -lqpidcommon
+qpid_ping_LDADD=$(lib_client) -lboost_program_options $(lib_common)
#
# Other test programs
Modified: qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/tests/MessagingSessionTests.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/tests/MessagingSessionTests.cpp?rev=1422901&r1=1422900&r2=1422901&view=diff
==============================================================================
--- qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/tests/MessagingSessionTests.cpp (original)
+++ qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/tests/MessagingSessionTests.cpp Mon Dec 17 12:33:05 2012
@@ -1196,6 +1196,27 @@ QPID_AUTO_TEST_CASE(testBrowseOnly)
fix.session.acknowledge();
}
+QPID_AUTO_TEST_CASE(testLinkBindingCleanup)
+{
+ MessagingFixture fix;
+
+ Sender sender = fix.session.createSender("test.ex;{create:always,node:{type:topic}}");
+
+ Connection connection = fix.newConnection();
+ connection.open();
+
+ Session session(connection.createSession());
+ Receiver receiver1 = session.createReceiver("test.q;{create:always, node:{type:queue, x-bindings:[{exchange:test.ex,queue:test.q,key:#,arguments:{x-scope:session}}]}}");
+ Receiver receiver2 = fix.session.createReceiver("test.q;{create:never, delete:always}");
+ connection.close();
+
+ sender.send(Message("test-message"), true);
+
+ // The session-scoped binding should be removed when receiver1's network connection is lost
+ Message in;
+ BOOST_CHECK(!receiver2.fetch(in, Duration::IMMEDIATE));
+}
+
QPID_AUTO_TEST_SUITE_END()
}} // namespace qpid::tests
Modified: qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/tests/brokertest.py
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/tests/brokertest.py?rev=1422901&r1=1422900&r2=1422901&view=diff
==============================================================================
--- qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/tests/brokertest.py (original)
+++ qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/tests/brokertest.py Mon Dec 17 12:33:05 2012
@@ -203,7 +203,9 @@ class Popen(subprocess.Popen):
self.wait()
def kill(self):
- self.expect = EXPECT_EXIT_FAIL
+ # Set to EXPECT_UNKNOWN, EXPECT_EXIT_FAIL creates a race condition
+ # if the process exits normally concurrent with the call to kill.
+ self.expect = EXPECT_UNKNOWN
try: subprocess.Popen.kill(self)
except AttributeError: # No terminate method
try:
Propchange: qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/tests/cluster_tests.py
------------------------------------------------------------------------------
Merged /qpid/trunk/qpid/cpp/src/tests/cluster_tests.py:r1415149-1422060
Modified: qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/tests/federation.py
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/tests/federation.py?rev=1422901&r1=1422900&r2=1422901&view=diff
==============================================================================
--- qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/tests/federation.py (original)
+++ qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/tests/federation.py Mon Dec 17 12:33:05 2012
@@ -2604,3 +2604,109 @@ class FederationTests(TestBase010):
self.verify_cleanup()
+ def test_dynamic_bounce_unbinds_named_queue(self):
+ """ Verify that a propagated binding is removed when the connection is
+ bounced
+ """
+ session = self.session
+
+ # create the federation
+
+ self.startQmf()
+ qmf = self.qmf
+
+ self._setup_brokers()
+
+ # create exchange on each broker, and retrieve the corresponding
+ # management object for that exchange
+
+ exchanges=[]
+ for _b in self._brokers[0:2]:
+ _b.client_session.exchange_declare(exchange="fedX", type="direct")
+ self.assertEqual(_b.client_session.exchange_query(name="fedX").type,
+ "direct", "exchange_declare failed!")
+ # pull the exchange out of qmf...
+ retries = 0
+ my_exchange = None
+ timeout = time() + 10
+ while my_exchange is None and time() <= timeout:
+ objs = qmf.getObjects(_broker=_b.qmf_broker, _class="exchange")
+ for ooo in objs:
+ if ooo.name == "fedX":
+ my_exchange = ooo
+ break
+ if my_exchange is None:
+ self.fail("QMF failed to find new exchange!")
+ exchanges.append(my_exchange)
+
+ # on the destination broker, create a binding for propagation
+ self._brokers[0].client_session.queue_declare(queue="fedDstQ")
+ self._brokers[0].client_session.exchange_bind(queue="fedDstQ", exchange="fedX", binding_key="spud")
+
+ # on the source broker, create a bridge queue
+ self._brokers[1].client_session.queue_declare(queue="fedSrcQ")
+
+ # connect B1 --> B0
+ result = self._brokers[0].qmf_object.create( "link",
+ "Link-dynamic",
+ {"host":self._brokers[1].host,
+ "port":self._brokers[1].port}, False)
+ self.assertEqual(result.status, 0)
+
+ # bridge the "fedX" exchange:
+ result = self._brokers[0].qmf_object.create("bridge",
+ "Bridge-dynamic",
+ {"link":"Link-dynamic",
+ "src":"fedX",
+ "dest":"fedX",
+ "dynamic":True,
+ "queue":"fedSrcQ"}, False)
+ self.assertEqual(result.status, 0)
+
+ # wait for the inter-broker links to become operational
+ operational = False
+ timeout = time() + 10
+ while not operational and time() <= timeout:
+ operational = True
+ for _l in qmf.getObjects(_class="link"):
+ #print("Link=%s:%s %s" % (_l.host, _l.port, str(_l.state)))
+ if _l.state != "Operational":
+ operational = False
+ self.failUnless(operational, "inter-broker links failed to become operational.")
+
+ # wait until the binding key has propagated to the src broker
+ exchanges[1].update()
+ timeout = time() + 10
+ while exchanges[1].bindingCount < 1 and time() <= timeout:
+ exchanges[1].update()
+ self.failUnless(exchanges[1].bindingCount == 1)
+
+ #
+ # Tear down the bridges between the two exchanges, then wait
+ # for the bindings to be cleaned up
+ #
+ for _b in qmf.getObjects(_class="bridge"):
+ result = _b.close()
+ self.assertEqual(result.status, 0)
+ exchanges[1].update()
+ timeout = time() + 10
+ while exchanges[1].bindingCount != 0 and time() <= timeout:
+ exchanges[1].update()
+ self.failUnless(exchanges[1].bindingCount == 0)
+
+ self._brokers[1].client_session.queue_delete(queue="fedSrcQ")
+
+ for _b in qmf.getObjects(_class="bridge"):
+ result = _b.close()
+ self.assertEqual(result.status, 0)
+
+ for _l in qmf.getObjects(_class="link"):
+ result = _l.close()
+ self.assertEqual(result.status, 0)
+
+ for _b in self._brokers[0:2]:
+ _b.client_session.exchange_delete(exchange="fedX")
+
+ self._teardown_brokers()
+
+ self.verify_cleanup()
Modified: qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/tests/ha_test.py
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/tests/ha_test.py?rev=1422901&r1=1422900&r2=1422901&view=diff
==============================================================================
--- qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/tests/ha_test.py (original)
+++ qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/tests/ha_test.py Mon Dec 17 12:33:05 2012
@@ -100,7 +100,7 @@ class HaBroker(Broker):
self.qpid_ha_script.main_except(["", "-b", url]+args)
def promote(self): self.ready(); self.qpid_ha(["promote"])
- def set_client_url(self, url): self.qpid_ha(["set", "--public-url", url])
+ def set_public_url(self, url): self.qpid_ha(["set", "--public-url", url])
def set_brokers_url(self, url): self.qpid_ha(["set", "--brokers-url", url])
def replicate(self, from_broker, queue): self.qpid_ha(["replicate", from_broker, queue])
@@ -113,10 +113,12 @@ class HaBroker(Broker):
self._agent = QmfAgent(self.host_port())
return self._agent
- def ha_status(self):
+ def qmf(self):
hb = self.agent().getHaBroker()
hb.update()
- return hb.status
+ return hb
+
+ def ha_status(self): return self.qmf().status
def wait_status(self, status):
def try_get_status():
@@ -234,7 +236,9 @@ class HaCluster(object):
def update_urls(self):
self.url = ",".join([b.host_port() for b in self])
if len(self) > 1: # No failover addresses on a 1 cluster.
- for b in self: b.set_brokers_url(self.url)
+ for b in self:
+ b.set_brokers_url(self.url)
+ b.set_public_url(self.url)
def connect(self, i):
"""Connect with reconnect_urls"""
Modified: qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/tests/ha_tests.py
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/tests/ha_tests.py?rev=1422901&r1=1422900&r2=1422901&view=diff
==============================================================================
--- qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/tests/ha_tests.py (original)
+++ qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/tests/ha_tests.py Mon Dec 17 12:33:05 2012
@@ -279,11 +279,13 @@ class ReplicationTests(HaBrokerTest):
"""Test replication of individual queues outside of cluster mode"""
l = LogLevel(ERROR) # Hide expected WARNING log messages from failover.
try:
- primary = HaBroker(self, name="primary", ha_cluster=False)
+ primary = HaBroker(self, name="primary", ha_cluster=False,
+ args=["--ha-queue-replication=yes"]);
pc = primary.connect()
ps = pc.session().sender("q;{create:always}")
pr = pc.session().receiver("q;{create:always}")
- backup = HaBroker(self, name="backup", ha_cluster=False)
+ backup = HaBroker(self, name="backup", ha_cluster=False,
+ args=["--ha-queue-replication=yes"])
br = backup.connect().session().receiver("q;{create:always}")
# Set up replication with qpid-ha
@@ -304,7 +306,8 @@ class ReplicationTests(HaBrokerTest):
finally: l.restore()
def test_queue_replica_failover(self):
- """Test individual queue replication from a cluster to a standalone backup broker, verify it fails over."""
+ """Test individual queue replication from a cluster to a standalone
+ backup broker, verify it fails over."""
l = LogLevel(ERROR) # Hide expected WARNING log messages from failover.
try:
cluster = HaCluster(self, 2)
@@ -312,7 +315,8 @@ class ReplicationTests(HaBrokerTest):
pc = cluster.connect(0)
ps = pc.session().sender("q;{create:always}")
pr = pc.session().receiver("q;{create:always}")
- backup = HaBroker(self, name="backup", ha_cluster=False)
+ backup = HaBroker(self, name="backup", ha_cluster=False,
+ args=["--ha-queue-replication=yes"])
br = backup.connect().session().receiver("q;{create:always}")
backup.replicate(cluster.url, "q")
ps.send("a")
@@ -474,6 +478,23 @@ class ReplicationTests(HaBrokerTest):
self.fail("Excpected no-such-queue exception")
except NotFound: pass
+ def test_replicate_binding(self):
+ """Verify that binding replication can be disabled"""
+ primary = HaBroker(self, name="primary", expect=EXPECT_EXIT_FAIL)
+ primary.promote()
+ backup = HaBroker(self, name="backup", brokers_url=primary.host_port())
+ ps = primary.connect().session()
+ ps.sender("ex;{create:always,node:{type:topic,x-declare:{arguments:{'qpid.replicate':all}, type:'fanout'}}}")
+ ps.sender("q;{create:always,node:{type:queue,x-declare:{arguments:{'qpid.replicate':all}},x-bindings:[{exchange:'ex',queue:'q',key:'',arguments:{'qpid.replicate':none}}]}}")
+ backup.wait_backup("q")
+
+ primary.kill()
+ assert retry(lambda: not is_running(primary.pid)) # Wait for primary to die
+ backup.promote()
+ bs = backup.connect_admin().session()
+ bs.sender("ex").send(Message("msg"))
+ self.assert_browse_retry(bs, "q", [])
+
def test_invalid_replication(self):
"""Verify that we reject an attempt to declare a queue with invalid replication value."""
cluster = HaCluster(self, 1, ha_replicate="all")
@@ -761,9 +782,9 @@ acl deny all all
cluster[1].wait_queue("q0")
cluster[1].wait_queue("q1")
cluster[0].kill()
- cluster[1].wait_queue("q1") # Not timed out yet
- cluster[1].wait_no_queue("q1", timeout=2) # Wait for timeout
- cluster[1].wait_no_queue("q0", timeout=2)
+ cluster[1].wait_queue("q1") # Not timed out yet
+ cluster[1].wait_no_queue("q1") # Wait for timeout
+ cluster[1].wait_no_queue("q0")
def test_alt_exchange_dup(self):
"""QPID-4349: if a queue has an alterante exchange and is deleted the
@@ -1114,6 +1135,38 @@ class RecoveryTests(HaBrokerTest):
cluster.bounce(0, promote_next=False)
cluster[0].promote()
+
+class ConfigurationTests(HaBrokerTest):
+ """Tests for configuration settings."""
+
+ def test_client_broker_url(self):
+ """Check that setting of broker and public URLs obeys correct defaulting
+ and precedence"""
+
+ def check(broker, brokers, public):
+ qmf = broker.qmf()
+ self.assertEqual(brokers, qmf.brokersUrl)
+ self.assertEqual(public, qmf.publicUrl)
+
+ def start(brokers, public, known=None):
+ args=[]
+ if brokers: args.append("--ha-brokers-url="+brokers)
+ if public: args.append("--ha-public-url="+public)
+ if known: args.append("--known-hosts-url="+known)
+ return HaBroker(self, args=args)
+
+ # Both set explictily, no defaulting
+ b = start("foo:123", "bar:456")
+ check(b, "amqp:tcp:foo:123", "amqp:tcp:bar:456")
+ b.set_brokers_url("foo:999")
+ check(b, "amqp:tcp:foo:999", "amqp:tcp:bar:456")
+ b.set_public_url("bar:999")
+ check(b, "amqp:tcp:foo:999", "amqp:tcp:bar:999")
+
+ # Allow "none" to mean "not set"
+ b = start("none", "none")
+ check(b, "", "")
+
if __name__ == "__main__":
shutil.rmtree("brokertest.tmp", True)
qpid_ha = os.getenv("QPID_HA_EXEC")
Modified: qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/tests/testagent.mk
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/tests/testagent.mk?rev=1422901&r1=1422900&r2=1422901&view=diff
==============================================================================
--- qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/tests/testagent.mk (original)
+++ qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/tests/testagent.mk Mon Dec 17 12:33:05 2012
@@ -46,6 +46,6 @@ testagent-testagent.$(OBJEXT): $(TESTAGE
qpidexectest_PROGRAMS+=testagent
testagent_CXXFLAGS=$(CXXFLAGS) -Itestagent_gen
testagent_SOURCES=testagent.cpp $(TESTAGENT_GEN_SRC)
-testagent_LDADD=$(top_builddir)/src/libqmf.la -lqpidcommon -lqpidtypes -lqpidclient
+testagent_LDADD=$(top_builddir)/src/libqmf.la $(top_builddir)/src/libqpidcommon.la $(top_builddir)/src/libqpidtypes.la $(top_builddir)/src/libqpidclient.la
EXTRA_DIST+=testagent.xml
Modified: qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/xml.mk
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/xml.mk?rev=1422901&r1=1422900&r2=1422901&view=diff
==============================================================================
--- qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/xml.mk (original)
+++ qpid/branches/java-broker-config-qpid-4390/qpid/cpp/src/xml.mk Mon Dec 17 12:33:05 2012
@@ -24,6 +24,6 @@ xml_la_SOURCES = \
qpid/xml/XmlExchangePlugin.cpp
xml_la_LIBADD = -lxerces-c -lxqilla libqpidbroker.la
-
+xml_la_CXXFLAGS = $(AM_CXXFLAGS) -D_IN_QPID_BROKER
xml_la_LDFLAGS = $(PLUGINLDFLAGS)
Modified: qpid/branches/java-broker-config-qpid-4390/qpid/doc/book/src/cpp-broker/Active-Passive-Cluster.xml
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-config-qpid-4390/qpid/doc/book/src/cpp-broker/Active-Passive-Cluster.xml?rev=1422901&r1=1422900&r2=1422901&view=diff
==============================================================================
--- qpid/branches/java-broker-config-qpid-4390/qpid/doc/book/src/cpp-broker/Active-Passive-Cluster.xml (original)
+++ qpid/branches/java-broker-config-qpid-4390/qpid/doc/book/src/cpp-broker/Active-Passive-Cluster.xml Mon Dec 17 12:33:05 2012
@@ -173,6 +173,13 @@ under the License.
</listitem>
</itemizedlist>
</para>
+ <para>
+ You should not enable the old and new cluster modules at the same time
+ in a broker, as they may interfere with each other. In other words you
+ should not set <literal>cluster-name</literal> at the same time as
+ either <literal>ha-cluster</literal> or
+ <literal>ha-queue-replication</literal>
+ </para>
</section>
<section>
<title>Limitations</title>
@@ -254,6 +261,14 @@ under the License.
</row>
<row>
<entry>
+ <literal>ha-queue-replication <replaceable>yes|no</replaceable></literal>
+ </entry>
+ <entry>
+ Enable replication of specific queues without joining a cluster, see <xref linkend="ha-queue-replication"/>.
+ </entry>
+ </row>
+ <row>
+ <entry>
<literal>ha-brokers-url <replaceable>URL</replaceable></literal>
</entry>
<entry>
@@ -273,8 +288,7 @@ ssl_addr = "ssl:" host [":" port]'
</footnote>
used by cluster brokers to connect to each other. The URL should
contain a comma separated list of the broker addresses, rather than a
- virtual IP address. For example:
- <literal>amqp:node1.exaple.com,node2.exaple.com,node3.exaple.com</literal>
+ virtual IP address.
</para>
</entry>
</row>
@@ -282,14 +296,18 @@ ssl_addr = "ssl:" host [":" port]'
<entry><literal>ha-public-url <replaceable>URL</replaceable></literal> </entry>
<entry>
<para>
- The URL <footnoteref linkend="ha-url-grammar"/> used by clients to connect to the cluster. This can be a list or
- a single virtual IP address. A virtual IP address is recommended as it
- simplifies deployment. If not specified this defaults to the value of
- <literal>ha-brokers-url</literal>.
+ The URL <footnoteref linkend="ha-url-grammar"/> is advertised to
+ clients as the "known-hosts" for fail-over. It can be a list or
+ a single virtual IP address. A virtual IP address is recommended.
</para>
<para>
- This option allows you to put client traffic on a different network from
- broker traffic, which is recommended.
+ Using this option you can put client and broker traffic on
+ separate networks, which is recommended.
+ </para>
+ <para>
+ Note: When HA clustering is enabled the broker option
+ <literal>known-hosts-url</literal> is ignored and over-ridden by
+ the <literal>ha-public-url</literal> setting.
</para>
</entry>
</row>
@@ -548,7 +566,7 @@ NOTE: fencing is not shown, you must con
</section>
<section id="ha-creating-replicated">
- <title>Creating replicated queues and exchanges</title>
+ <title>Controlling replication of queues and exchanges</title>
<para>
By default, queues and exchanges are not replicated automatically. You can change
the default behavior by setting the <literal>ha-replicate</literal> configuration
@@ -849,6 +867,30 @@ NOTE: fencing is not shown, you must con
or to simulate a cluster on a single node. For deployment, a resource manager is required.
</para>
</section>
+ <section id="ha-queue-replication">
+ <title>Replicating specific queues</title>
+ <para>
+ In addition to the automatic replication performed in a cluster, you can
+ set up replication for specific queues between arbitrary brokers, even if
+ the brokers are not members of a cluster. The command:
+ </para>
+ <programlisting>
+ qpid-ha replicate <replaceable>QUEUE</replaceable> <replaceable>REMOTE-BROKER</replaceable>
+ </programlisting>
+ <para>
+ sets up replication of <replaceable>QUEUE</replaceable> on <replaceable>REMOTE-BROKER</replaceable> to <replaceable>QUEUE</replaceable> on the current broker.
+ </para>
+ <para>
+ Set the configuration option
+ <literal>ha-queue-replication=yes</literal> on both brokers to enable this
+ feature on non-cluster brokers. It is automatically enabled for brokers
+ that are part of a cluster.
+ </para>
+ <para>
+ Note that this feature does not provide automatic fail-over, for that you
+ need to run a cluster.
+ </para>
+ </section>
</section>
<!-- LocalWords: scalability rgmanager multicast RGManager mailto LVQ qpidd IP dequeued Transactional username
Propchange: qpid/branches/java-broker-config-qpid-4390/qpid/java/
------------------------------------------------------------------------------
Merged /qpid/trunk/qpid/java:r1415149-1422060
Propchange: qpid/branches/java-broker-config-qpid-4390/qpid/java/amqp-1-0-client/
------------------------------------------------------------------------------
Merged /qpid/trunk/qpid/java/amqp-1-0-client:r1415149-1422060
Propchange: qpid/branches/java-broker-config-qpid-4390/qpid/java/amqp-1-0-client-jms/
------------------------------------------------------------------------------
Merged /qpid/trunk/qpid/java/amqp-1-0-client-jms:r1415149-1422060
Modified: qpid/branches/java-broker-config-qpid-4390/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/ConnectionFactoryImpl.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-config-qpid-4390/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/ConnectionFactoryImpl.java?rev=1422901&r1=1422900&r2=1422901&view=diff
==============================================================================
--- qpid/branches/java-broker-config-qpid-4390/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/ConnectionFactoryImpl.java (original)
+++ qpid/branches/java-broker-config-qpid-4390/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/ConnectionFactoryImpl.java Mon Dec 17 12:33:05 2012
@@ -43,6 +43,8 @@ public class ConnectionFactoryImpl imple
private String _remoteHost;
private boolean _ssl;
+ private String _queuePrefix;
+ private String _topicPrefix;
public ConnectionFactoryImpl(final String host,
final int port,
@@ -90,12 +92,15 @@ public class ConnectionFactoryImpl imple
public ConnectionImpl createConnection() throws JMSException
{
- return new ConnectionImpl(_host, _port, _username, _password, _clientId, _remoteHost, _ssl);
+ return createConnection(_username, _password);
}
public ConnectionImpl createConnection(final String username, final String password) throws JMSException
{
- return new ConnectionImpl(_host, _port, username, password, _clientId, _remoteHost, _ssl);
+ ConnectionImpl connection = new ConnectionImpl(_host, _port, username, password, _clientId, _remoteHost, _ssl);
+ connection.setQueuePrefix(_queuePrefix);
+ connection.setTopicPrefix(_topicPrefix);
+ return connection;
}
public static ConnectionFactoryImpl createFromURL(final String urlString) throws MalformedURLException
@@ -211,4 +216,23 @@ public class ConnectionFactoryImpl imple
return connection;
}
+ public String getTopicPrefix()
+ {
+ return _topicPrefix;
+ }
+
+ public void setTopicPrefix(String topicPrefix)
+ {
+ _topicPrefix = topicPrefix;
+ }
+
+ public String getQueuePrefix()
+ {
+ return _queuePrefix;
+ }
+
+ public void setQueuePrefix(String queuePrefix)
+ {
+ _queuePrefix = queuePrefix;
+ }
}
Modified: qpid/branches/java-broker-config-qpid-4390/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/ConnectionImpl.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-config-qpid-4390/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/ConnectionImpl.java?rev=1422901&r1=1422900&r2=1422901&view=diff
==============================================================================
--- qpid/branches/java-broker-config-qpid-4390/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/ConnectionImpl.java (original)
+++ qpid/branches/java-broker-config-qpid-4390/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/ConnectionImpl.java Mon Dec 17 12:33:05 2012
@@ -25,9 +25,8 @@ import org.apache.qpid.amqp_1_0.transpor
import javax.jms.*;
import javax.jms.IllegalStateException;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.List;
+import javax.jms.Queue;
+import java.util.*;
public class ConnectionImpl implements Connection, QueueConnection, TopicConnection
{
@@ -50,6 +49,8 @@ public class ConnectionImpl implements C
private final String _remoteHost;
private final boolean _ssl;
private String _clientId;
+ private String _queuePrefix;
+ private String _topicPrefix;
private static enum State
@@ -379,4 +380,78 @@ public class ConnectionImpl implements C
{
_isTopicConnection = topicConnection;
}
+
+ public String getTopicPrefix()
+ {
+ return _topicPrefix;
+ }
+
+ public void setTopicPrefix(String topicPrefix)
+ {
+ _topicPrefix = topicPrefix;
+ }
+
+ public String getQueuePrefix()
+ {
+ return _queuePrefix;
+ }
+
+ public void setQueuePrefix(String queueprefix)
+ {
+ _queuePrefix = queueprefix;
+ }
+
+ DecodedDestination toDecodedDestination(DestinationImpl dest)
+ {
+ String address = dest.getAddress();
+ Set<String> kind = null;
+ Class clazz = dest.getClass();
+ if( clazz==QueueImpl.class )
+ {
+ kind = MessageImpl.JMS_QUEUE_ATTRIBUTES;
+ if( _queuePrefix!=null )
+ {
+ // Avoid double prefixing..
+ if( !address.startsWith(_queuePrefix) )
+ {
+ address = _queuePrefix+address;
+ }
+ }
+ }
+ else if( clazz==TopicImpl.class )
+ {
+ kind = MessageImpl.JMS_TOPIC_ATTRIBUTES;
+ if( _topicPrefix!=null )
+ {
+ // Avoid double prefixing..
+ if( !address.startsWith(_topicPrefix) )
+ {
+ address = _topicPrefix+address;
+ }
+ }
+ }
+ else if( clazz==TemporaryQueueImpl.class )
+ {
+ kind = MessageImpl.JMS_TEMP_QUEUE_ATTRIBUTES;
+ }
+ else if( clazz==TemporaryTopicImpl.class )
+ {
+ kind = MessageImpl.JMS_TEMP_TOPIC_ATTRIBUTES;
+ }
+ return new DecodedDestination(address, kind);
+ }
+
+ DecodedDestination toDecodedDestination(String address, Set<String> kind)
+ {
+ if( (kind == null || kind.equals(MessageImpl.JMS_QUEUE_ATTRIBUTES)) && _queuePrefix!=null && address.startsWith(_queuePrefix))
+ {
+ return new DecodedDestination(address.substring(_queuePrefix.length()), MessageImpl.JMS_QUEUE_ATTRIBUTES);
+ }
+ if( (kind == null || kind.equals(MessageImpl.JMS_TOPIC_ATTRIBUTES)) && _topicPrefix!=null && address.startsWith(_topicPrefix))
+ {
+ return new DecodedDestination(address.substring(_topicPrefix.length()), MessageImpl.JMS_TOPIC_ATTRIBUTES);
+ }
+ return new DecodedDestination(address, kind);
+ }
+
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org