You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ac...@apache.org on 2010/01/20 18:07:54 UTC
svn commit: r901282 - in /qpid/trunk/qpid/cpp/src: Makefile.am
qpid/broker/Broker.cpp qpid/broker/Queue.cpp qpid/broker/SemanticState.cpp
qpid/cluster/Cluster.cpp qpid/sys/ClusterSafe.cpp qpid/sys/ClusterSafe.h
Author: aconway
Date: Wed Jan 20 17:07:54 2010
New Revision: 901282
URL: http://svn.apache.org/viewvc?rev=901282&view=rev
Log:
Cluster-safe assertions.
Assert that replicated data structures are modified in a cluster-safe
context - in cluster delivery thread or during update. Assertions
added to Queue.cpp and SemanticState.cpp.
Added:
qpid/trunk/qpid/cpp/src/qpid/sys/ClusterSafe.cpp
qpid/trunk/qpid/cpp/src/qpid/sys/ClusterSafe.h
Modified:
qpid/trunk/qpid/cpp/src/Makefile.am
qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp
qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp
qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp
qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp
Modified: qpid/trunk/qpid/cpp/src/Makefile.am
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/Makefile.am?rev=901282&r1=901281&r2=901282&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/Makefile.am (original)
+++ qpid/trunk/qpid/cpp/src/Makefile.am Wed Jan 20 17:07:54 2010
@@ -433,6 +433,8 @@
qpid/sys/AtomicValue_gcc.h \
qpid/sys/AtomicValue_mutex.h \
qpid/sys/BlockingQueue.h \
+ qpid/sys/ClusterSafe.h \
+ qpid/sys/ClusterSafe.cpp \
qpid/sys/Codec.h \
qpid/sys/ConnectionCodec.h \
qpid/sys/ConnectionInputHandler.h \
Modified: qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp?rev=901282&r1=901281&r2=901282&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp Wed Jan 20 17:07:54 2010
@@ -92,7 +92,7 @@
tcpNoDelay(false),
requireEncrypted(false),
maxSessionRate(0),
- asyncQueueEvents(true)
+ asyncQueueEvents(false) // Must be false in a cluster.
{
int c = sys::SystemInfo::concurrency();
workerThreads=c+1;
Modified: qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp?rev=901282&r1=901281&r2=901282&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp Wed Jan 20 17:07:54 2010
@@ -33,6 +33,7 @@
#include "qpid/management/ManagementAgent.h"
#include "qpid/framing/reply_exceptions.h"
#include "qpid/framing/FieldTable.h"
+#include "qpid/sys/ClusterSafe.h"
#include "qpid/sys/Monitor.h"
#include "qpid/sys/Time.h"
#include "qmf/org/apache/qpid/broker/ArgsQueuePurge.h"
@@ -44,6 +45,7 @@
#include <boost/bind.hpp>
#include <boost/intrusive_ptr.hpp>
+
using namespace qpid::broker;
using namespace qpid::sys;
using namespace qpid::framing;
@@ -144,7 +146,6 @@
}
void Queue::deliver(boost::intrusive_ptr<Message>& msg){
-
if (msg->isImmediate() && getConsumerCount() == 0) {
if (alternateExchange) {
DeliverableMessage deliverable(msg);
@@ -165,7 +166,7 @@
push(msg);
}
mgntEnqStats(msg);
- QPID_LOG(debug, "Message " << msg << " enqueued on " << name << "[" << this << "]");
+ QPID_LOG(debug, "Message " << msg << " enqueued on " << name);
}
}
@@ -202,6 +203,7 @@
}
void Queue::requeue(const QueuedMessage& msg){
+ assertClusterSafe();
QueueListeners::NotificationSet copy;
{
Mutex::ScopedLock locker(messageLock);
@@ -222,6 +224,7 @@
}
void Queue::clearLVQIndex(const QueuedMessage& msg){
+ assertClusterSafe();
const framing::FieldTable* ft = msg.payload ? msg.payload->getApplicationHeaders() : 0;
if (lastValueQueue && ft){
string key = ft->getAsString(qpidVQMatchProperty);
@@ -232,6 +235,7 @@
bool Queue::acquireMessageAt(const SequenceNumber& position, QueuedMessage& message)
{
Mutex::ScopedLock locker(messageLock);
+ assertClusterSafe();
QPID_LOG(debug, "Attempting to acquire message at " << position);
Messages::iterator i = findAt(position);
@@ -251,6 +255,8 @@
bool Queue::acquire(const QueuedMessage& msg) {
Mutex::ScopedLock locker(messageLock);
+ assertClusterSafe();
+
QPID_LOG(debug, "attempting to acquire " << msg.position);
Messages::iterator i = findAt(msg.position);
if ((i != messages.end() && i->position == msg.position) && // note that in some cases payload not be set
@@ -272,6 +278,7 @@
void Queue::notifyListener()
{
+ assertClusterSafe();
QueueListeners::NotificationSet set;
{
Mutex::ScopedLock locker(messageLock);
@@ -366,6 +373,7 @@
void Queue::removeListener(Consumer::shared_ptr c)
{
+ assertClusterSafe();
QueueListeners::NotificationSet set;
{
Mutex::ScopedLock locker(messageLock);
@@ -440,6 +448,7 @@
}
void Queue::consume(Consumer::shared_ptr c, bool requestExclusive){
+ assertClusterSafe();
Mutex::ScopedLock locker(consumerLock);
if(exclusive) {
throw ResourceLockedException(
@@ -539,6 +548,7 @@
void Queue::popMsg(QueuedMessage& qmsg)
{
+ assertClusterSafe();
const framing::FieldTable* ft = qmsg.payload->getApplicationHeaders();
if (lastValueQueue && ft){
string key = ft->getAsString(qpidVQMatchProperty);
@@ -549,6 +559,7 @@
}
void Queue::push(boost::intrusive_ptr<Message>& msg, bool isRecovery){
+ assertClusterSafe();
QueueListeners::NotificationSet copy;
{
Mutex::ScopedLock locker(messageLock);
Modified: qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp?rev=901282&r1=901281&r2=901282&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp Wed Jan 20 17:07:54 2010
@@ -34,6 +34,7 @@
#include "qpid/framing/SequenceSet.h"
#include "qpid/framing/IsInSequenceSet.h"
#include "qpid/log/Statement.h"
+#include "qpid/sys/ClusterSafe.h"
#include "qpid/ptr_map.h"
#include "qpid/broker/AclModule.h"
@@ -47,7 +48,6 @@
#include <assert.h>
-
namespace qpid {
namespace broker {
@@ -308,6 +308,7 @@
bool SemanticState::ConsumerImpl::deliver(QueuedMessage& msg)
{
+ assertClusterSafe();
allocateCredit(msg.payload);
DeliveryRecord record(msg, queue, name, acquire, !ackExpected, windowing);
bool sync = syncFrequency && ++deliveryCount >= syncFrequency;
@@ -331,6 +332,7 @@
bool SemanticState::ConsumerImpl::accept(intrusive_ptr<Message> msg)
{
+ assertClusterSafe();
// FIXME aconway 2009-06-08: if we have byte & message credit but
// checkCredit fails because the message is to big, we should
// remain on queue's listener list for possible smaller messages
@@ -354,6 +356,7 @@
void SemanticState::ConsumerImpl::allocateCredit(intrusive_ptr<Message>& msg)
{
+ assertClusterSafe();
uint32_t originalMsgCredit = msgCredit;
uint32_t originalByteCredit = byteCredit;
if (msgCredit != 0xFFFFFFFF) {
@@ -387,6 +390,7 @@
void SemanticState::cancel(ConsumerImpl::shared_ptr c)
{
+ assertClusterSafe();
c->disableNotify();
if (session.isAttached())
session.getConnection().outputTasks.removeOutputTask(c.get());
@@ -468,6 +472,7 @@
void SemanticState::ConsumerImpl::requestDispatch()
{
+ assertClusterSafe();
if (blocked) {
parent->session.getConnection().outputTasks.addOutputTask(this);
parent->session.getConnection().outputTasks.activateOutput();
@@ -565,6 +570,7 @@
void SemanticState::ConsumerImpl::setWindowMode()
{
+ assertClusterSafe();
windowing = true;
if (mgmtObject){
mgmtObject->set_creditMode("WINDOW");
@@ -573,6 +579,7 @@
void SemanticState::ConsumerImpl::setCreditMode()
{
+ assertClusterSafe();
windowing = false;
if (mgmtObject){
mgmtObject->set_creditMode("CREDIT");
@@ -581,6 +588,7 @@
void SemanticState::ConsumerImpl::addByteCredit(uint32_t value)
{
+ assertClusterSafe();
if (byteCredit != 0xFFFFFFFF) {
if (value == 0xFFFFFFFF) byteCredit = value;
else byteCredit += value;
@@ -589,6 +597,7 @@
void SemanticState::ConsumerImpl::addMessageCredit(uint32_t value)
{
+ assertClusterSafe();
if (msgCredit != 0xFFFFFFFF) {
if (value == 0xFFFFFFFF) msgCredit = value;
else msgCredit += value;
@@ -614,6 +623,7 @@
void SemanticState::ConsumerImpl::stop()
{
+ assertClusterSafe();
msgCredit = 0;
byteCredit = 0;
}
@@ -667,12 +677,14 @@
void SemanticState::ConsumerImpl::enableNotify()
{
Mutex::ScopedLock l(lock);
+ assertClusterSafe();
notifyEnabled = true;
}
void SemanticState::ConsumerImpl::disableNotify()
{
Mutex::ScopedLock l(lock);
+ assertClusterSafe();
notifyEnabled = false;
}
@@ -684,6 +696,7 @@
void SemanticState::ConsumerImpl::notify()
{
Mutex::ScopedLock l(lock);
+ assertClusterSafe();
if (notifyEnabled) {
parent->session.getConnection().outputTasks.addOutputTask(this);
parent->session.getConnection().outputTasks.activateOutput();
@@ -708,6 +721,7 @@
}
void SemanticState::accepted(const SequenceSet& commands) {
+ assertClusterSafe();
if (txBuffer.get()) {
//in transactional mode, don't dequeue or remove, just
//maintain set of acknowledged messages:
@@ -740,6 +754,7 @@
}
void SemanticState::completed(const SequenceSet& commands) {
+ assertClusterSafe();
DeliveryRecords::iterator removed =
remove_if(unacked.begin(), unacked.end(),
isInSequenceSetAnd(commands,
@@ -750,6 +765,7 @@
void SemanticState::attached()
{
+ assertClusterSafe();
for (ConsumerImplMap::iterator i = consumers.begin(); i != consumers.end(); i++) {
i->second->enableNotify();
session.getConnection().outputTasks.addOutputTask(i->second.get());
@@ -759,6 +775,7 @@
void SemanticState::detached()
{
+ assertClusterSafe();
for (ConsumerImplMap::iterator i = consumers.begin(); i != consumers.end(); i++) {
i->second->disableNotify();
session.getConnection().outputTasks.removeOutputTask(i->second.get());
Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp?rev=901282&r1=901281&r2=901282&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp Wed Jan 20 17:07:54 2010
@@ -105,6 +105,7 @@
*/
#include "qpid/Exception.h"
#include "qpid/cluster/Cluster.h"
+#include "qpid/sys/ClusterSafe.h"
#include "qpid/cluster/ClusterSettings.h"
#include "qpid/cluster/Connection.h"
#include "qpid/cluster/UpdateClient.h"
@@ -152,6 +153,7 @@
#include <map>
#include <ostream>
+
namespace qpid {
namespace cluster {
using namespace qpid;
@@ -357,6 +359,7 @@
state = LEFT;
QPID_LOG(notice, *this << " leaving cluster " << name);
// Finalize connections now now to avoid problems later in destructor.
+ ClusterSafeScope css; // Don't trigger cluster-safe assertions.
LEAVE_TRY(localConnections.clear());
LEAVE_TRY(connections.clear());
LEAVE_TRY(broker::SignalHandler::shutdown());
@@ -440,6 +443,7 @@
// Handler for deliverFrameQueue.
// This thread executes the main logic.
void Cluster::deliveredFrame(const EventFrame& efConst) {
+ sys::ClusterSafeScope css; // Don't trigger cluster-safe asserts.
Mutex::ScopedLock l(lock);
if (state == LEFT) return;
EventFrame e(efConst);
@@ -560,6 +564,7 @@
if (mgmtObject!=0) mgmtObject->set_status("ACTIVE");
mcast.setReady();
broker.getQueueEvents().enable();
+ enableClusterSafe(); // Enable cluster-safe assertions.
}
void Cluster::initMapCompleted(Lock& l) {
@@ -650,6 +655,7 @@
// callbacks will be invoked.
//
void Cluster::brokerShutdown() {
+ sys::ClusterSafeScope css; // Don't trigger cluster-safe asserts.
try { cpg.shutdown(); }
catch (const std::exception& e) {
QPID_LOG(error, *this << " shutting down CPG: " << e.what());
Added: qpid/trunk/qpid/cpp/src/qpid/sys/ClusterSafe.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/sys/ClusterSafe.cpp?rev=901282&view=auto
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/sys/ClusterSafe.cpp (added)
+++ qpid/trunk/qpid/cpp/src/qpid/sys/ClusterSafe.cpp Wed Jan 20 17:07:54 2010
@@ -0,0 +1,47 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "ClusterSafe.h"
+#include "qpid/log/Statement.h"
+#include "qpid/sys/Thread.h"
+#include <stdlib.h>
+
+namespace qpid {
+namespace sys {
+
+namespace {
+bool inCluster = false;
+QPID_TSS bool inContext = false;
+}
+
+void assertClusterSafe() {
+ if (inCluster && !inContext) {
+ QPID_LOG(critical, "Modified cluster state outside of cluster context");
+ ::abort();
+ }
+}
+
+ClusterSafeScope::ClusterSafeScope() { inContext = true; }
+ClusterSafeScope::~ClusterSafeScope() { inContext = false; }
+
+void enableClusterSafe() { inCluster = true; }
+
+}} // namespace qpid::sys
Added: qpid/trunk/qpid/cpp/src/qpid/sys/ClusterSafe.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/sys/ClusterSafe.h?rev=901282&view=auto
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/sys/ClusterSafe.h (added)
+++ qpid/trunk/qpid/cpp/src/qpid/sys/ClusterSafe.h Wed Jan 20 17:07:54 2010
@@ -0,0 +1,60 @@
+#ifndef QPID_SYS_CLUSTERSAFE_H
+#define QPID_SYS_CLUSTERSAFE_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+namespace qpid {
+namespace sys {
+
+/**
+ * Assertion to add to code that modifies clustered state.
+ *
+ * In a non-clustered broker this is a no-op.
+ *
+ * In a clustered broker, checks that it is being called
+ * in a context where it is safe to modify clustered state.
+ * If not it aborts the process as this is a serious bug.
+ *
+ * This function is in the common library rather than the cluster
+ * library because it is called by code in the broker library.
+ */
+void assertClusterSafe();
+
+/**
+ * Base class for classes that encapsulate state which is replicated
+ * to all members of a cluster. Acts as a marker for clustered state
+ * and provides functions to assist detecting bugs in cluster
+ * behavior.
+ */
+struct ClusterSafeScope {
+ ClusterSafeScope();
+ ~ClusterSafeScope();
+};
+
+/**
+ * Enable cluster-safe assertions. By defaul they are no-ops.
+ */
+void enableClusterSafe();
+
+}} // namespace qpid::sys
+
+#endif /*!QPID_SYS_CLUSTERSAFE_H*/
---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project: http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org