You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ac...@apache.org on 2010/01/20 18:07:54 UTC

svn commit: r901282 - in /qpid/trunk/qpid/cpp/src: Makefile.am qpid/broker/Broker.cpp qpid/broker/Queue.cpp qpid/broker/SemanticState.cpp qpid/cluster/Cluster.cpp qpid/sys/ClusterSafe.cpp qpid/sys/ClusterSafe.h

Author: aconway
Date: Wed Jan 20 17:07:54 2010
New Revision: 901282

URL: http://svn.apache.org/viewvc?rev=901282&view=rev
Log:

Cluster-safe assertions.

Assert that replicated data structures are modified in a cluster-safe
context - in cluster delivery thread or during update.  Assertions
added to Queue.cpp and SemanticState.cpp.

Added:
    qpid/trunk/qpid/cpp/src/qpid/sys/ClusterSafe.cpp
    qpid/trunk/qpid/cpp/src/qpid/sys/ClusterSafe.h
Modified:
    qpid/trunk/qpid/cpp/src/Makefile.am
    qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp
    qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp
    qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp
    qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp

Modified: qpid/trunk/qpid/cpp/src/Makefile.am
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/Makefile.am?rev=901282&r1=901281&r2=901282&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/Makefile.am (original)
+++ qpid/trunk/qpid/cpp/src/Makefile.am Wed Jan 20 17:07:54 2010
@@ -433,6 +433,8 @@
   qpid/sys/AtomicValue_gcc.h			\
   qpid/sys/AtomicValue_mutex.h			\
   qpid/sys/BlockingQueue.h			\
+  qpid/sys/ClusterSafe.h    			\
+  qpid/sys/ClusterSafe.cpp  			\
   qpid/sys/Codec.h				\
   qpid/sys/ConnectionCodec.h			\
   qpid/sys/ConnectionInputHandler.h		\

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp?rev=901282&r1=901281&r2=901282&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp Wed Jan 20 17:07:54 2010
@@ -92,7 +92,7 @@
     tcpNoDelay(false),
     requireEncrypted(false),
     maxSessionRate(0),
-    asyncQueueEvents(true)
+    asyncQueueEvents(false)     // Must be false in a cluster.
 {
     int c = sys::SystemInfo::concurrency();
     workerThreads=c+1;

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp?rev=901282&r1=901281&r2=901282&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp Wed Jan 20 17:07:54 2010
@@ -33,6 +33,7 @@
 #include "qpid/management/ManagementAgent.h"
 #include "qpid/framing/reply_exceptions.h"
 #include "qpid/framing/FieldTable.h"
+#include "qpid/sys/ClusterSafe.h"
 #include "qpid/sys/Monitor.h"
 #include "qpid/sys/Time.h"
 #include "qmf/org/apache/qpid/broker/ArgsQueuePurge.h"
@@ -44,6 +45,7 @@
 #include <boost/bind.hpp>
 #include <boost/intrusive_ptr.hpp>
 
+
 using namespace qpid::broker;
 using namespace qpid::sys;
 using namespace qpid::framing;
@@ -144,7 +146,6 @@
 }
 
 void Queue::deliver(boost::intrusive_ptr<Message>& msg){
-
     if (msg->isImmediate() && getConsumerCount() == 0) {
         if (alternateExchange) {
             DeliverableMessage deliverable(msg);
@@ -165,7 +166,7 @@
             push(msg);
         }
         mgntEnqStats(msg);
-        QPID_LOG(debug, "Message " << msg << " enqueued on " << name << "[" << this << "]");
+        QPID_LOG(debug, "Message " << msg << " enqueued on " << name);
     }
 }
 
@@ -202,6 +203,7 @@
 }
 
 void Queue::requeue(const QueuedMessage& msg){
+    assertClusterSafe();
     QueueListeners::NotificationSet copy;
     {    
         Mutex::ScopedLock locker(messageLock);
@@ -222,6 +224,7 @@
 }
 
 void Queue::clearLVQIndex(const QueuedMessage& msg){
+    assertClusterSafe();
     const framing::FieldTable* ft = msg.payload ? msg.payload->getApplicationHeaders() : 0;
     if (lastValueQueue && ft){
         string key = ft->getAsString(qpidVQMatchProperty);
@@ -232,6 +235,7 @@
 bool Queue::acquireMessageAt(const SequenceNumber& position, QueuedMessage& message) 
 {
     Mutex::ScopedLock locker(messageLock);
+    assertClusterSafe();
     QPID_LOG(debug, "Attempting to acquire message at " << position);
     
     Messages::iterator i = findAt(position); 
@@ -251,6 +255,8 @@
 
 bool Queue::acquire(const QueuedMessage& msg) {
     Mutex::ScopedLock locker(messageLock);
+    assertClusterSafe();
+
     QPID_LOG(debug, "attempting to acquire " << msg.position);
     Messages::iterator i = findAt(msg.position); 
     if ((i != messages.end() && i->position == msg.position) && // note that in some cases payload not be set
@@ -272,6 +278,7 @@
 
 void Queue::notifyListener()
 {
+    assertClusterSafe();
     QueueListeners::NotificationSet set;
     {
         Mutex::ScopedLock locker(messageLock);
@@ -366,6 +373,7 @@
 
 void Queue::removeListener(Consumer::shared_ptr c)
 {
+    assertClusterSafe();
     QueueListeners::NotificationSet set;
     {
         Mutex::ScopedLock locker(messageLock);
@@ -440,6 +448,7 @@
 }
 
 void Queue::consume(Consumer::shared_ptr c, bool requestExclusive){
+    assertClusterSafe();
     Mutex::ScopedLock locker(consumerLock);
     if(exclusive) {
         throw ResourceLockedException(
@@ -539,6 +548,7 @@
 
 void Queue::popMsg(QueuedMessage& qmsg)
 {
+    assertClusterSafe();
     const framing::FieldTable* ft = qmsg.payload->getApplicationHeaders();
     if (lastValueQueue && ft){
         string key = ft->getAsString(qpidVQMatchProperty);
@@ -549,6 +559,7 @@
 }
 
 void Queue::push(boost::intrusive_ptr<Message>& msg, bool isRecovery){
+    assertClusterSafe();
     QueueListeners::NotificationSet copy;
     {
         Mutex::ScopedLock locker(messageLock);   

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp?rev=901282&r1=901281&r2=901282&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp Wed Jan 20 17:07:54 2010
@@ -34,6 +34,7 @@
 #include "qpid/framing/SequenceSet.h"
 #include "qpid/framing/IsInSequenceSet.h"
 #include "qpid/log/Statement.h"
+#include "qpid/sys/ClusterSafe.h"
 #include "qpid/ptr_map.h"
 #include "qpid/broker/AclModule.h"
 
@@ -47,7 +48,6 @@
 
 #include <assert.h>
 
-
 namespace qpid {
 namespace broker {
 
@@ -308,6 +308,7 @@
 
 bool SemanticState::ConsumerImpl::deliver(QueuedMessage& msg)
 {
+    assertClusterSafe();
     allocateCredit(msg.payload);
     DeliveryRecord record(msg, queue, name, acquire, !ackExpected, windowing);
     bool sync = syncFrequency && ++deliveryCount >= syncFrequency;
@@ -331,6 +332,7 @@
 
 bool SemanticState::ConsumerImpl::accept(intrusive_ptr<Message> msg)
 {
+    assertClusterSafe();
     // FIXME aconway 2009-06-08: if we have byte & message credit but
     // checkCredit fails because the message is to big, we should
     // remain on queue's listener list for possible smaller messages
@@ -354,6 +356,7 @@
 
 void SemanticState::ConsumerImpl::allocateCredit(intrusive_ptr<Message>& msg)
 {
+    assertClusterSafe();
     uint32_t originalMsgCredit = msgCredit;
     uint32_t originalByteCredit = byteCredit;        
     if (msgCredit != 0xFFFFFFFF) {
@@ -387,6 +390,7 @@
 
 void SemanticState::cancel(ConsumerImpl::shared_ptr c)
 {
+    assertClusterSafe();
     c->disableNotify();
     if (session.isAttached())
         session.getConnection().outputTasks.removeOutputTask(c.get());
@@ -468,6 +472,7 @@
 
 void SemanticState::ConsumerImpl::requestDispatch()
 {
+    assertClusterSafe();
     if (blocked) {
         parent->session.getConnection().outputTasks.addOutputTask(this);
         parent->session.getConnection().outputTasks.activateOutput();
@@ -565,6 +570,7 @@
 
 void SemanticState::ConsumerImpl::setWindowMode()
 {
+    assertClusterSafe();
     windowing = true;
     if (mgmtObject){
         mgmtObject->set_creditMode("WINDOW");
@@ -573,6 +579,7 @@
 
 void SemanticState::ConsumerImpl::setCreditMode()
 {
+    assertClusterSafe();
     windowing = false;
     if (mgmtObject){
         mgmtObject->set_creditMode("CREDIT");
@@ -581,6 +588,7 @@
 
 void SemanticState::ConsumerImpl::addByteCredit(uint32_t value)
 {
+    assertClusterSafe();
     if (byteCredit != 0xFFFFFFFF) {
         if (value == 0xFFFFFFFF) byteCredit = value;
         else byteCredit += value;
@@ -589,6 +597,7 @@
 
 void SemanticState::ConsumerImpl::addMessageCredit(uint32_t value)
 {
+    assertClusterSafe();
     if (msgCredit != 0xFFFFFFFF) {
         if (value == 0xFFFFFFFF) msgCredit = value;
         else msgCredit += value;
@@ -614,6 +623,7 @@
 
 void SemanticState::ConsumerImpl::stop()
 {
+    assertClusterSafe();
     msgCredit = 0;
     byteCredit = 0;
 }
@@ -667,12 +677,14 @@
 void SemanticState::ConsumerImpl::enableNotify()
 {
     Mutex::ScopedLock l(lock);
+    assertClusterSafe();
     notifyEnabled = true;
 }
 
 void SemanticState::ConsumerImpl::disableNotify()
 {
     Mutex::ScopedLock l(lock);
+    assertClusterSafe();
     notifyEnabled = false;
 }
 
@@ -684,6 +696,7 @@
 void SemanticState::ConsumerImpl::notify()
 {
     Mutex::ScopedLock l(lock);
+    assertClusterSafe();
     if (notifyEnabled) {
         parent->session.getConnection().outputTasks.addOutputTask(this);
         parent->session.getConnection().outputTasks.activateOutput();
@@ -708,6 +721,7 @@
 }
 
 void SemanticState::accepted(const SequenceSet& commands) {
+    assertClusterSafe();
     if (txBuffer.get()) {
         //in transactional mode, don't dequeue or remove, just
         //maintain set of acknowledged messages:
@@ -740,6 +754,7 @@
 }
 
 void SemanticState::completed(const SequenceSet& commands) {
+    assertClusterSafe();
     DeliveryRecords::iterator removed =
         remove_if(unacked.begin(), unacked.end(),
                   isInSequenceSetAnd(commands,
@@ -750,6 +765,7 @@
 
 void SemanticState::attached()
 {
+    assertClusterSafe();
     for (ConsumerImplMap::iterator i = consumers.begin(); i != consumers.end(); i++) {
         i->second->enableNotify();
         session.getConnection().outputTasks.addOutputTask(i->second.get());
@@ -759,6 +775,7 @@
 
 void SemanticState::detached()
 {
+    assertClusterSafe();
     for (ConsumerImplMap::iterator i = consumers.begin(); i != consumers.end(); i++) {
         i->second->disableNotify();
         session.getConnection().outputTasks.removeOutputTask(i->second.get());

Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp?rev=901282&r1=901281&r2=901282&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp Wed Jan 20 17:07:54 2010
@@ -105,6 +105,7 @@
  */
 #include "qpid/Exception.h"
 #include "qpid/cluster/Cluster.h"
+#include "qpid/sys/ClusterSafe.h"
 #include "qpid/cluster/ClusterSettings.h"
 #include "qpid/cluster/Connection.h"
 #include "qpid/cluster/UpdateClient.h"
@@ -152,6 +153,7 @@
 #include <map>
 #include <ostream>
 
+
 namespace qpid {
 namespace cluster {
 using namespace qpid;
@@ -357,6 +359,7 @@
         state = LEFT;
         QPID_LOG(notice, *this << " leaving cluster " << name);
         // Finalize connections now now to avoid problems later in destructor.
+        ClusterSafeScope css;   // Don't trigger cluster-safe assertions.
         LEAVE_TRY(localConnections.clear());
         LEAVE_TRY(connections.clear());
         LEAVE_TRY(broker::SignalHandler::shutdown());
@@ -440,6 +443,7 @@
 // Handler for deliverFrameQueue.
 // This thread executes the main logic.
 void Cluster::deliveredFrame(const EventFrame& efConst) {
+    sys::ClusterSafeScope css; // Don't trigger cluster-safe asserts.
     Mutex::ScopedLock l(lock);
     if (state == LEFT) return;
     EventFrame e(efConst);
@@ -560,6 +564,7 @@
     if (mgmtObject!=0) mgmtObject->set_status("ACTIVE");
     mcast.setReady();
     broker.getQueueEvents().enable();
+    enableClusterSafe();    // Enable cluster-safe assertions.
 }
 
 void Cluster::initMapCompleted(Lock& l) {
@@ -650,6 +655,7 @@
 // callbacks will be invoked.
 // 
 void Cluster::brokerShutdown()  {
+    sys::ClusterSafeScope css; // Don't trigger cluster-safe asserts.
     try { cpg.shutdown(); }
     catch (const std::exception& e) {
         QPID_LOG(error, *this << " shutting down CPG: " << e.what());

Added: qpid/trunk/qpid/cpp/src/qpid/sys/ClusterSafe.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/sys/ClusterSafe.cpp?rev=901282&view=auto
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/sys/ClusterSafe.cpp (added)
+++ qpid/trunk/qpid/cpp/src/qpid/sys/ClusterSafe.cpp Wed Jan 20 17:07:54 2010
@@ -0,0 +1,47 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "ClusterSafe.h"
+#include "qpid/log/Statement.h"
+#include "qpid/sys/Thread.h"
+#include <stdlib.h>
+
+namespace qpid {
+namespace sys {
+
+namespace {
+bool inCluster = false;
+QPID_TSS bool inContext = false;
+}
+
+void assertClusterSafe()  {
+    if (inCluster && !inContext) {
+        QPID_LOG(critical, "Modified cluster state outside of cluster context");
+        ::abort();
+    }
+}
+
+ClusterSafeScope::ClusterSafeScope() { inContext = true; }
+ClusterSafeScope::~ClusterSafeScope() { inContext = false; }
+
+void enableClusterSafe() { inCluster = true; }
+
+}} // namespace qpid::sys

Added: qpid/trunk/qpid/cpp/src/qpid/sys/ClusterSafe.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/sys/ClusterSafe.h?rev=901282&view=auto
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/sys/ClusterSafe.h (added)
+++ qpid/trunk/qpid/cpp/src/qpid/sys/ClusterSafe.h Wed Jan 20 17:07:54 2010
@@ -0,0 +1,60 @@
+#ifndef QPID_SYS_CLUSTERSAFE_H
+#define QPID_SYS_CLUSTERSAFE_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+namespace qpid {
+namespace sys {
+
+/**
+ * Assertion to add to code that modifies clustered state.
+ *
+ * In a non-clustered broker this is a no-op.
+ *
+ * In a clustered broker, checks that it is being called
+ * in a context where it is safe to  modify clustered state.
+ * If not it aborts the process as this is a serious bug.
+ *
+ * This function is in the common library rather than the cluster
+ * library because it is called by code in the broker library.
+ */
+void assertClusterSafe();
+
+/**
+ * Base class for classes that encapsulate state which is replicated
+ * to all members of a cluster. Acts as a marker for clustered state
+ * and provides functions to assist detecting bugs in cluster
+ * behavior.
+ */
+struct ClusterSafeScope {
+    ClusterSafeScope();
+    ~ClusterSafeScope();
+};
+
+/**
+ * Enable cluster-safe assertions. By defaul they are no-ops.
+ */
+void enableClusterSafe();
+
+}} // namespace qpid::sys
+
+#endif  /*!QPID_SYS_CLUSTERSAFE_H*/



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org