You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ac...@apache.org on 2011/02/08 21:28:16 UTC
svn commit: r1068554 - in /qpid/trunk/qpid/cpp/src:
qpid/broker/SessionHandler.cpp qpid/broker/SessionState.cpp
qpid/broker/SessionState.h tests/cluster_tests.fail
Author: aconway
Date: Tue Feb 8 20:28:16 2011
New Revision: 1068554
URL: http://svn.apache.org/viewvc?rev=1068554&view=rev
Log:
QPID-3045 - sporadic failure of cluster_tests.ShortTests.test_route_update
Sporadically the test was failing because the session associated with
an inter-broker bridge was created out of order with other
objects. This is unlikely to cause a fatal cluster inconsistency in
practice but it has been corrected in any case. The fix was to delay
creation of the management object for a bridge session till a point
which is consistent on all cluster members.
Modified:
qpid/trunk/qpid/cpp/src/qpid/broker/SessionHandler.cpp
qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.cpp
qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.h
qpid/trunk/qpid/cpp/src/tests/cluster_tests.fail
Modified: qpid/trunk/qpid/cpp/src/qpid/broker/SessionHandler.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/SessionHandler.cpp?rev=1068554&r1=1068553&r2=1068554&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/SessionHandler.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/SessionHandler.cpp Tue Feb 8 20:28:16 2011
@@ -33,7 +33,7 @@ using namespace qpid::sys;
SessionHandler::SessionHandler(Connection& c, ChannelId ch)
: amqp_0_10::SessionHandler(&c.getOutput(), ch),
- connection(c),
+ connection(c),
proxy(out),
clusterOrderProxy(c.getClusterOrderOutput() ? new SetChannelProxy(ch, c.getClusterOrderOutput()) : 0)
{}
@@ -69,7 +69,7 @@ void SessionHandler::handleDetach() {
if (session.get())
connection.getBroker().getSessionManager().detach(session);
assert(!session.get());
- connection.closeChannel(channel.get());
+ connection.closeChannel(channel.get());
}
void SessionHandler::setState(const std::string& name, bool force) {
@@ -78,7 +78,7 @@ void SessionHandler::setState(const std:
session = connection.broker.getSessionManager().attach(*this, id, force);
}
-void SessionHandler::detaching()
+void SessionHandler::detaching()
{
assert(session.get());
session->disableOutput();
@@ -98,7 +98,10 @@ void SessionHandler::attachAs(const std:
{
SessionId id(connection.getUserId(), name);
SessionState::Configuration config = connection.broker.getSessionManager().getSessionConfig();
- session.reset(new SessionState(connection.getBroker(), *this, id, config));
+ // Delay creating management object till attached(). In a cluster,
+ // only the active link broker calls attachAs but all brokers
+ // receive the subsequent attached() call.
+ session.reset(new SessionState(connection.getBroker(), *this, id, config, true));
sendAttach(false);
}
@@ -109,6 +112,7 @@ void SessionHandler::attachAs(const std:
void SessionHandler::attached(const std::string& name)
{
if (session.get()) {
+ session->addManagementObject(); // Delayed from attachAs()
amqp_0_10::SessionHandler::attached(name);
} else {
SessionId id(connection.getUserId(), name);
@@ -117,5 +121,5 @@ void SessionHandler::attached(const std:
markReadyToSend();
}
}
-
+
}} // namespace qpid::broker
Modified: qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.cpp?rev=1068554&r1=1068553&r2=1068554&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.cpp Tue Feb 8 20:28:16 2011
@@ -53,7 +53,8 @@ using qpid::sys::AbsTime;
namespace _qmf = qmf::org::apache::qpid::broker;
SessionState::SessionState(
- Broker& b, SessionHandler& h, const SessionId& id, const SessionState::Configuration& config)
+ Broker& b, SessionHandler& h, const SessionId& id,
+ const SessionState::Configuration& config, bool delayManagement)
: qpid::SessionState(id, config),
broker(b), handler(&h),
semanticState(*this, *this),
@@ -71,6 +72,12 @@ SessionState::SessionState(
QPID_LOG(warning, getId() << ": Unable to flow control client - client doesn't support");
}
}
+ if (!delayManagement) addManagementObject();
+ attach(h);
+}
+
+void SessionState::addManagementObject() {
+ if (GetManagementObject()) return; // Already added.
Manageable* parent = broker.GetVhostObject ();
if (parent != 0) {
ManagementAgent* agent = getBroker().getManagementAgent();
@@ -80,11 +87,11 @@ SessionState::SessionState(
mgmtObject->set_attached (0);
mgmtObject->set_detachedLifespan (0);
mgmtObject->clr_expireTime();
- if (rateFlowcontrol) mgmtObject->set_maxClientRate(maxRate);
+ if (rateFlowcontrol)
+ mgmtObject->set_maxClientRate(rateFlowcontrol->getRate());
agent->addObject(mgmtObject);
}
}
- attach(h);
}
SessionState::~SessionState() {
Modified: qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.h?rev=1068554&r1=1068553&r2=1068554&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.h Tue Feb 8 20:28:16 2011
@@ -72,7 +72,8 @@ class SessionState : public qpid::Sessio
public framing::FrameHandler::InOutHandler
{
public:
- SessionState(Broker&, SessionHandler&, const SessionId&, const SessionState::Configuration&);
+ SessionState(Broker&, SessionHandler&, const SessionId&,
+ const SessionState::Configuration&, bool delayManagement=false);
~SessionState();
bool isAttached() const { return handler; }
@@ -122,8 +123,11 @@ class SessionState : public qpid::Sessio
const SessionId& getSessionId() const { return getId(); }
- private:
+ // Used to delay creation of management object for sessions
+ // belonging to inter-broker bridges
+ void addManagementObject();
+ private:
void handleCommand(framing::AMQMethodBody* method, const framing::SequenceNumber& id);
void handleContent(framing::AMQFrame& frame, const framing::SequenceNumber& id);
void enqueued(boost::intrusive_ptr<Message> msg);
Modified: qpid/trunk/qpid/cpp/src/tests/cluster_tests.fail
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/cluster_tests.fail?rev=1068554&r1=1068553&r2=1068554&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/cluster_tests.fail (original)
+++ qpid/trunk/qpid/cpp/src/tests/cluster_tests.fail Tue Feb 8 20:28:16 2011
@@ -1,3 +1,3 @@
-cluster_tests.ShortTests.test_route_update
+
---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project: http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org