You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by gs...@apache.org on 2009/03/09 16:58:21 UTC

svn commit: r751719 - in /qpid/trunk/qpid/cpp/src: qpid/broker/ qpid/cluster/ qpid/replication/ tests/

Author: gsim
Date: Mon Mar  9 15:58:17 2009
New Revision: 751719

URL: http://svn.apache.org/viewvc?rev=751719&view=rev
Log:
QPID-1721: Fixes for replication between clusters when new members are added

* suppress event generation during node catch up 
* ensure sequence counters used for duplicate detection are synchronised in both primary and dr clusters when new members join
* connect queue with the event manager within queue registry rather than adapter as the latter path is not used for catchup


Added:
    qpid/trunk/qpid/cpp/src/tests/clustered_replication_test   (with props)
Modified:
    qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp
    qpid/trunk/qpid/cpp/src/qpid/broker/Exchange.cpp
    qpid/trunk/qpid/cpp/src/qpid/broker/Exchange.h
    qpid/trunk/qpid/cpp/src/qpid/broker/Message.cpp
    qpid/trunk/qpid/cpp/src/qpid/broker/Message.h
    qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp
    qpid/trunk/qpid/cpp/src/qpid/broker/Queue.h
    qpid/trunk/qpid/cpp/src/qpid/broker/QueueEvents.cpp
    qpid/trunk/qpid/cpp/src/qpid/broker/QueueEvents.h
    qpid/trunk/qpid/cpp/src/qpid/broker/QueueRegistry.cpp
    qpid/trunk/qpid/cpp/src/qpid/broker/QueueRegistry.h
    qpid/trunk/qpid/cpp/src/qpid/broker/SessionAdapter.cpp
    qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp
    qpid/trunk/qpid/cpp/src/qpid/replication/ReplicatingEventListener.cpp
    qpid/trunk/qpid/cpp/src/qpid/replication/ReplicatingEventListener.h
    qpid/trunk/qpid/cpp/src/qpid/replication/ReplicationExchange.cpp
    qpid/trunk/qpid/cpp/src/qpid/replication/ReplicationExchange.h
    qpid/trunk/qpid/cpp/src/tests/cluster.mk

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp?rev=751719&r1=751718&r2=751719&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp Mon Mar  9 15:58:17 2009
@@ -199,6 +199,7 @@
     }
 
     QueuePolicy::setDefaultMaxSize(conf.queueLimit);
+    queues.setQueueEvents(&queueEvents);
 
     // Early-Initialize plugins
     const Plugin::Plugins& plugins=Plugin::getPlugins();

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/Exchange.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/Exchange.cpp?rev=751719&r1=751718&r2=751719&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/Exchange.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/Exchange.cpp Mon Mar  9 15:58:17 2009
@@ -102,8 +102,8 @@
 
 Exchange::Exchange(const string& _name, bool _durable, const qpid::framing::FieldTable& _args,
                    Manageable* parent)
-    : name(_name), durable(_durable), args(_args), alternateUsers(0), persistenceId(0), 
-      sequence(false), sequenceNo(0), ive(false), mgmtExchange(0)
+    : name(_name), durable(_durable), alternateUsers(0), persistenceId(0), 
+      args(_args), sequence(false), sequenceNo(0), ive(false), mgmtExchange(0)
 {
     if (parent != 0)
     {

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/Exchange.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/Exchange.h?rev=751719&r1=751718&r2=751719&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/Exchange.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/Exchange.h Mon Mar  9 15:58:17 2009
@@ -58,12 +58,12 @@
 private:
     const std::string name;
     const bool durable;
-    mutable qpid::framing::FieldTable args;
     boost::shared_ptr<Exchange> alternate;
     uint32_t alternateUsers;
     mutable uint64_t persistenceId;
 
 protected:
+    mutable qpid::framing::FieldTable args;
     bool sequence;
     mutable qpid::sys::Mutex sequenceLock;
     int64_t sequenceNo;
@@ -146,7 +146,7 @@
     void setPersistenceId(uint64_t id) const;
     uint64_t getPersistenceId() const { return persistenceId; }
     uint32_t encodedSize() const;
-    void encode(framing::Buffer& buffer) const; 
+    virtual void encode(framing::Buffer& buffer) const; 
 
     static Exchange::shared_ptr decode(ExchangeRegistry& exchanges, framing::Buffer& buffer);
 

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/Message.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/Message.cpp?rev=751719&r1=751718&r2=751719&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/Message.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/Message.cpp Mon Mar  9 15:58:17 2009
@@ -382,4 +382,9 @@
 void Message::setDequeueCompleteCallback(MessageCallback& cb) { dequeueCallback = &cb; }
 void Message::resetDequeueCompleteCallback() { dequeueCallback = 0; }
 
+framing::FieldTable& Message::getOrInsertHeaders()
+{
+    return getProperties<MessageProperties>()->getApplicationHeaders();
+}
+
 }} // namespace qpid::broker

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/Message.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/Message.h?rev=751719&r1=751718&r2=751719&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/Message.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/Message.h Mon Mar  9 15:58:17 2009
@@ -72,6 +72,7 @@
     std::string getExchangeName() const;
     bool isImmediate() const;
     const framing::FieldTable* getApplicationHeaders() const;
+    framing::FieldTable& getOrInsertHeaders();
     bool isPersistent();
     bool requiresAccept();
 

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp?rev=751719&r1=751718&r2=751719&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp Mon Mar  9 15:58:17 2009
@@ -93,7 +93,8 @@
     policyExceeded(false),
     mgmtObject(0),
     eventMode(0),
-    eventMgr(0)
+    eventMgr(0),
+    insertSeqNo(0)
 {
     if (parent != 0)
     {
@@ -551,6 +552,7 @@
         Mutex::ScopedLock locker(messageLock);   
         QueuedMessage qm(this, msg, ++sequence);
         if (policy.get()) policy->tryEnqueue(qm);
+        if (insertSeqNo) msg->getOrInsertHeaders().setInt64(seqNoKey, sequence);
          
         LVQ::iterator i;
         const framing::FieldTable* ft = msg->getApplicationHeaders();
@@ -578,8 +580,9 @@
             messages.push_back(qm);
             listeners.populate(copy);
         }
-        if (eventMode && eventMgr) {
-            eventMgr->enqueued(qm);
+        if (eventMode) {
+            if (eventMgr) eventMgr->enqueued(qm);
+            else QPID_LOG(warning, "Enqueue manager not set, events not generated for " << getName());
         }
     }
     copy.notify();
@@ -989,3 +992,9 @@
     for_each(pendingDequeues.begin(), pendingDequeues.end(), boost::bind(&Queue::dequeue, this, (TransactionContext*) 0, _1));
     pendingDequeues.clear();
 }
+
+void Queue::insertSequenceNumbers(const std::string& key)
+{
+    seqNoKey = key;
+    insertSeqNo = !seqNoKey.empty();
+}

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/Queue.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.h?rev=751719&r1=751718&r2=751719&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/Queue.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/Queue.h Mon Mar  9 15:58:17 2009
@@ -101,6 +101,8 @@
             RateTracker dequeueTracker;
             int eventMode;
             QueueEvents* eventMgr;
+            bool insertSeqNo;
+            std::string seqNoKey;
 
             void push(boost::intrusive_ptr<Message>& msg, bool isRecovery=false);
             void setPolicy(std::auto_ptr<QueuePolicy> policy);
@@ -291,7 +293,7 @@
             void setPosition(framing::SequenceNumber pos);
             int getEventMode();
             void setQueueEventManager(QueueEvents&);
-
+            void insertSequenceNumbers(const std::string& key);
             /**
              * Notify queue that recovery has completed.
              */

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/QueueEvents.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/QueueEvents.cpp?rev=751719&r1=751718&r2=751719&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/QueueEvents.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/QueueEvents.cpp Mon Mar  9 15:58:17 2009
@@ -20,12 +20,13 @@
  */
 #include "QueueEvents.h"
 #include "qpid/Exception.h"
+#include "qpid/log/Statement.h"
 
 namespace qpid {
 namespace broker {
 
 QueueEvents::QueueEvents(const boost::shared_ptr<sys::Poller>& poller) : 
-    eventQueue(boost::bind(&QueueEvents::handle, this, _1), poller) 
+    eventQueue(boost::bind(&QueueEvents::handle, this, _1), poller), enabled(true) 
 {
     eventQueue.start();
 }
@@ -37,12 +38,12 @@
 
 void QueueEvents::enqueued(const QueuedMessage& m)
 {
-    eventQueue.push(Event(ENQUEUE, m));
+    if (enabled) eventQueue.push(Event(ENQUEUE, m));
 }
 
 void QueueEvents::dequeued(const QueuedMessage& m)
 {
-    eventQueue.push(Event(DEQUEUE, m));
+    if (enabled) eventQueue.push(Event(DEQUEUE, m));
 }
 
 void QueueEvents::registerListener(const std::string& id, const EventListener& listener)
@@ -81,6 +82,18 @@
     if (!eventQueue.empty() && !listeners.empty()) eventQueue.shutdown();
 }
 
+void QueueEvents::enable()
+{
+    enabled = true;
+    QPID_LOG(debug, "Queue events enabled");
+}
+
+void QueueEvents::disable()
+{
+    enabled = false;
+    QPID_LOG(debug, "Queue events disabled");
+}
+
 QueueEvents::Event::Event(EventType t, const QueuedMessage& m) : type(t), msg(m) {}
 
 

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/QueueEvents.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/QueueEvents.h?rev=751719&r1=751718&r2=751719&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/QueueEvents.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/QueueEvents.h Mon Mar  9 15:58:17 2009
@@ -59,6 +59,8 @@
     void dequeued(const QueuedMessage&);
     void registerListener(const std::string& id, const EventListener&);
     void unregisterListener(const std::string& id);
+    void enable();
+    void disable();
     //process all outstanding events
     void shutdown();
   private:
@@ -67,6 +69,7 @@
 
     EventQueue eventQueue;
     Listeners listeners;
+    volatile bool enabled;
     qpid::sys::Mutex lock;//protect listeners from concurrent access
     
     void handle(EventQueue::Queue& e);

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/QueueRegistry.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/QueueRegistry.cpp?rev=751719&r1=751718&r2=751719&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/QueueRegistry.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/QueueRegistry.cpp Mon Mar  9 15:58:17 2009
@@ -19,6 +19,7 @@
  *
  */
 #include "QueueRegistry.h"
+#include "QueueEvents.h"
 #include "qpid/log/Statement.h"
 #include <sstream>
 #include <assert.h>
@@ -27,7 +28,7 @@
 using namespace qpid::sys;
 
 QueueRegistry::QueueRegistry() :
-    counter(1), store(0), parent(0), lastNode(false) {}
+    counter(1), store(0), events(0), parent(0), lastNode(false) {}
 
 QueueRegistry::~QueueRegistry(){}
 
@@ -43,7 +44,8 @@
     if (i == queues.end()) {
         Queue::shared_ptr queue(new Queue(name, autoDelete, durable ? store : 0, owner, parent));
         queues[name] = queue;
-		if (lastNode) queue->setLastNodeFailure();
+        if (lastNode) queue->setLastNodeFailure();
+        if (events) queue->setQueueEventManager(*events);
 
         return std::pair<Queue::shared_ptr, bool>(queue, true);
     } else {
@@ -105,3 +107,7 @@
     lastNode = _lastNode;
 }
 
+void QueueRegistry::setQueueEvents(QueueEvents* e)
+{
+    events = e;
+}

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/QueueRegistry.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/QueueRegistry.h?rev=751719&r1=751718&r2=751719&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/QueueRegistry.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/QueueRegistry.h Mon Mar  9 15:58:17 2009
@@ -31,6 +31,8 @@
 namespace qpid {
 namespace broker {
 
+class QueueEvents;
+
 /**
  * A registry of queues indexed by queue name.
  *
@@ -86,6 +88,8 @@
      */
     string generateName();
 
+    void setQueueEvents(QueueEvents*);
+
     /**
      * Set the store to use.  May only be called once.
      */
@@ -120,6 +124,7 @@
     mutable qpid::sys::RWlock lock;
     int counter;
     MessageStore* store;
+    QueueEvents* events;
     management::Manageable* parent;
     bool lastNode; //used to set mode on queue declare
 

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/SessionAdapter.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/SessionAdapter.cpp?rev=751719&r1=751718&r2=751719&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/SessionAdapter.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/SessionAdapter.cpp Mon Mar  9 15:58:17 2009
@@ -362,10 +362,6 @@
             getBroker().getExchanges().getDefault()->bind(queue, name, 0);
             queue->bound(getBroker().getExchanges().getDefault()->getName(), name, arguments);
 
-            //if event generation is turned on, pass in a pointer to
-            //the QueueEvents instance to use
-            if (queue->getEventMode()) queue->setQueueEventManager(getBroker().getQueueEvents());
-
             //handle automatic cleanup:
             if (exclusive) {
                 exclusiveQueues.push_back(queue);

Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp?rev=751719&r1=751718&r2=751719&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp Mon Mar  9 15:58:17 2009
@@ -358,6 +358,7 @@
     state = READY;
     if (mgmtObject!=0) mgmtObject->set_status("ACTIVE");
     mcast.release();
+    broker.getQueueEvents().enable();
 }
 
 void Cluster::configChange(const MemberId&, const std::string& addresses, Lock& l) {
@@ -385,8 +386,9 @@
             elders = map.getAlive();
             elders.erase(self);
             broker.getLinks().setPassive(true);
+            broker.getQueueEvents().disable();
         }
-    }
+    } 
     else if (state >= CATCHUP && memberChange) {
         memberUpdate(l);
         elders = ClusterMap::intersection(elders, map.getAlive());

Modified: qpid/trunk/qpid/cpp/src/qpid/replication/ReplicatingEventListener.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/replication/ReplicatingEventListener.cpp?rev=751719&r1=751718&r2=751719&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/replication/ReplicatingEventListener.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/replication/ReplicatingEventListener.cpp Mon Mar  9 15:58:17 2009
@@ -57,7 +57,6 @@
 {
     FieldTable headers;
     headers.setString(REPLICATION_TARGET_QUEUE, dequeued.queue->getName());
-    headers.setInt(REPLICATION_EVENT_SEQNO, ++sequence);
     headers.setInt(REPLICATION_EVENT_TYPE, DEQUEUE);
     headers.setInt(DEQUEUED_MESSAGE_POSITION, dequeued.position);
     boost::intrusive_ptr<Message> msg(createMessage(headers));
@@ -69,7 +68,6 @@
     boost::intrusive_ptr<Message> msg(cloneMessage(*(enqueued.queue), enqueued.payload));
     FieldTable& headers = msg->getProperties<MessageProperties>()->getApplicationHeaders();
     headers.setString(REPLICATION_TARGET_QUEUE, enqueued.queue->getName());
-    headers.setInt(REPLICATION_EVENT_SEQNO, ++sequence);
     headers.setInt(REPLICATION_EVENT_TYPE, ENQUEUE);
     queue->deliver(msg);
 }
@@ -138,6 +136,7 @@
               queue = broker->getQueues().find(options.queue);
           }
           if (queue) {
+              queue->insertSequenceNumbers(REPLICATION_EVENT_SEQNO);
               QueueEvents::EventListener callback = boost::bind(&ReplicatingEventListener::handle, this, _1);
               broker->getQueueEvents().registerListener(options.name, callback);
               QPID_LOG(info, "Registered replicating queue event listener");

Modified: qpid/trunk/qpid/cpp/src/qpid/replication/ReplicatingEventListener.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/replication/ReplicatingEventListener.h?rev=751719&r1=751718&r2=751719&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/replication/ReplicatingEventListener.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/replication/ReplicatingEventListener.h Mon Mar  9 15:58:17 2009
@@ -58,7 +58,6 @@
 
     PluginOptions options;    
     qpid::broker::Queue::shared_ptr queue;
-    qpid::framing::SequenceNumber sequence;
 
     void deliverDequeueMessage(const qpid::broker::QueuedMessage& enqueued);
     void deliverEnqueueMessage(const qpid::broker::QueuedMessage& enqueued);

Modified: qpid/trunk/qpid/cpp/src/qpid/replication/ReplicationExchange.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/replication/ReplicationExchange.cpp?rev=751719&r1=751718&r2=751719&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/replication/ReplicationExchange.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/replication/ReplicationExchange.cpp Mon Mar  9 15:58:17 2009
@@ -34,11 +34,13 @@
 using namespace qpid::framing;
 using namespace qpid::replication::constants;
 
+const std::string SEQUENCE_VALUE("qpid.replication-event.sequence");
 ReplicationExchange::ReplicationExchange(const std::string& name, bool durable, 
                                          const FieldTable& args,
                                          QueueRegistry& qr,
                                          Manageable* parent) 
-    : Exchange(name, durable, args, parent), queues(qr), init(false) {}
+    : Exchange(name, durable, args, parent), queues(qr), sequence(args.getAsInt64(SEQUENCE_VALUE)), init(false)
+ {}
 
 std::string ReplicationExchange::getType() const { return typeName; }            
 
@@ -135,6 +137,13 @@
 const std::string ReplicationExchange::typeName("replication");
 
 
+void ReplicationExchange::encode(Buffer& buffer) const
+{
+    args.setInt64(std::string(SEQUENCE_VALUE), sequence);
+    Exchange::encode(buffer);
+}
+
+
 struct ReplicationExchangePlugin : Plugin
 {
     Broker* broker;

Modified: qpid/trunk/qpid/cpp/src/qpid/replication/ReplicationExchange.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/replication/ReplicationExchange.h?rev=751719&r1=751718&r2=751719&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/replication/ReplicationExchange.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/replication/ReplicationExchange.h Mon Mar  9 15:58:17 2009
@@ -22,6 +22,7 @@
  *
  */
 #include "qpid/broker/Exchange.h"
+#include "qpid/framing/Buffer.h"
 #include "qpid/framing/SequenceNumber.h"
 
 namespace qpid {
@@ -58,6 +59,7 @@
     bool isDuplicate(const qpid::framing::FieldTable* args);
     void handleEnqueueEvent(const qpid::framing::FieldTable* args, qpid::broker::Deliverable& msg);
     void handleDequeueEvent(const qpid::framing::FieldTable* args);
+    void encode(framing::Buffer& buffer) const;
 };
 }} // namespace qpid::replication
 

Modified: qpid/trunk/qpid/cpp/src/tests/cluster.mk
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/cluster.mk?rev=751719&r1=751718&r2=751719&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/cluster.mk (original)
+++ qpid/trunk/qpid/cpp/src/tests/cluster.mk Mon Mar  9 15:58:17 2009
@@ -29,8 +29,9 @@
 
 
 # ais_check checks pre-requisites for cluster tests and runs them if ok.
-TESTS+=ais_check federated_cluster_test
-EXTRA_DIST+=ais_check start_cluster stop_cluster restart_cluster cluster_python_tests cluster_python_tests_failing.txt federated_cluster_test
+TESTS+=ais_check federated_cluster_test clustered_replication_test
+EXTRA_DIST+=ais_check start_cluster stop_cluster restart_cluster cluster_python_tests cluster_python_tests_failing.txt \
+  federated_cluster_test clustered_replication_test
 
 check_PROGRAMS+=cluster_test
 cluster_test_SOURCES=unit_test.cpp cluster_test.cpp ClusterFixture.cpp ClusterFixture.h ForkedBroker.h ForkedBroker.cpp

Added: qpid/trunk/qpid/cpp/src/tests/clustered_replication_test
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/clustered_replication_test?rev=751719&view=auto
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/clustered_replication_test (added)
+++ qpid/trunk/qpid/cpp/src/tests/clustered_replication_test Mon Mar  9 15:58:17 2009
@@ -0,0 +1,127 @@
+#!/bin/sh
+
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+# 
+#   http://www.apache.org/licenses/LICENSE-2.0
+# 
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+# Test reliability of the replication feature in the face of link
+# failures:
+srcdir=`dirname $0`
+PYTHON_DIR=$srcdir/../../../python
+
+trap stop_brokers INT EXIT
+
+fail() {
+    echo $1
+    exit 1
+}
+with_ais_group() {
+    id -nG | grep '\<ais\>' >/dev/null || { echo "You are not a member of the ais group." 1>&2; exit 1; }
+    echo $* | newgrp ais
+}
+
+stop_brokers() {
+    if [[ $PRIMARY1 ]] ; then
+        ../qpidd -q --port $PRIMARY1
+        unset PRIMARY1
+    fi      
+    if [[ $PRIMARY2 ]] ; then
+        ../qpidd -q --port $PRIMARY2
+        unset PRIMARY2
+    fi      
+    if [[ $DR1 ]] ; then
+        ../qpidd -q --port $DR1
+        unset DR1
+    fi
+    if [[ $DR2 ]] ; then
+        ../qpidd -q --port $DR2
+        unset DR2
+    fi
+}
+
+if test -d ${PYTHON_DIR}; then
+    id -nG | grep '\<ais\>' >/dev/null || \
+        NOGROUP="You are not a member of the ais group."
+    ps -u root | grep 'aisexec\|corosync' >/dev/null || \
+        NOAISEXEC="The aisexec or corosync daemon is not running as root"
+
+    if test -n "$NOGROUP" -o -n "$NOAISEXEC"; then
+        cat <<EOF
+Not running federation to cluster test because:
+    $NOGROUP
+    $NOAISEXEC
+EOF
+        exit 0;
+    fi
+
+    #todo: these cluster names need to be unique to prevent clashes
+    PRIMARY_CLUSTER=PRIMARY_$(hostname)_$(pwd)
+    DR_CLUSTER=DR_$(hostname)_$(pwd)
+
+    GENERAL_OPTS="--auth no --no-module-dir --no-data-dir --daemon --port 0  --log-enable notice+ --log-to-stderr false"
+    PRIMARY_OPTS="--load-module ../.libs/replicating_listener.so --create-replication-queue true --replication-queue REPLICATION_QUEUE --load-module ../.libs/cluster.so --cluster-name $PRIMARY_CLUSTER"
+    DR_OPTS="--load-module ../.libs/replication_exchange.so --load-module ../.libs/cluster.so --cluster-name $DR_CLUSTER"
+
+    rm -f repl*.tmp #cleanup any files left from previous run
+
+    #start first node of primary cluster and set up test queue
+    echo Starting primary cluster
+    PRIMARY1=$(with_ais_group ../qpidd $GENERAL_OPTS $PRIMARY_OPTS --log-to-file repl.primary.1.tmp) || fail "Could not start node"
+    $PYTHON_DIR/commands/qpid-config -a "localhost:$PRIMARY1" add queue test-queue --generate-queue-events 2
+    $PYTHON_DIR/commands/qpid-config -a "localhost:$PRIMARY1" add queue control-queue --generate-queue-events 1
+
+    #send 10 messages, consume 5 of them
+    for i in `seq 1 10`; do echo Message$i; done | ./sender --port $PRIMARY1
+    ./receiver --port $PRIMARY1 --messages 5 > /dev/null
+
+    #add new node to primary cluster, testing correct transfer of state:
+    echo Adding node to primary cluster
+    PRIMARY2=$(with_ais_group ../qpidd $GENERAL_OPTS $PRIMARY_OPTS --log-to-file repl.primary.2.tmp)
+
+    #start DR cluster, set up test queue there and establish replication bridge
+    echo Starting DR cluster
+    DR1=$(with_ais_group ../qpidd $GENERAL_OPTS $DR_OPTS --log-to-file repl.dr.1.tmp)
+    DR2=$(with_ais_group ../qpidd $GENERAL_OPTS $DR_OPTS --log-to-file repl.dr.2.tmp)
+
+    $PYTHON_DIR/commands/qpid-config -a "localhost:$DR1" add queue test-queue
+    $PYTHON_DIR/commands/qpid-config -a "localhost:$DR1" add queue control-queue
+    $PYTHON_DIR/commands/qpid-config -a "localhost:$DR1" add exchange replication REPLICATION_EXCHANGE
+    $PYTHON_DIR/commands/qpid-route queue add localhost:$DR2 localhost:$PRIMARY2 REPLICATION_EXCHANGE REPLICATION_QUEUE
+
+    #send more messages to primary
+    for i in `seq 11 20`; do echo Message$i; done | ./sender --port $PRIMARY1 --send-eos 1
+
+    #wait for replication events to all be processed:        
+    echo Waiting for replication to complete
+    echo Done | ./sender --port $PRIMARY1 --routing-key control-queue --send-eos 1
+    ./receiver --queue control-queue --port $DR1 > /dev/null
+
+    #verify contents of test queue on dr cluster:
+    echo Verifying...    
+    ./receiver --port $DR2 > repl.out.tmp
+    for i in `seq 6 20`; do echo Message$i; done | diff repl.out.tmp - || FAIL=1
+
+    if [[ $FAIL ]]; then
+        echo Clustered replication test failed: expectations not met!
+        exit 1
+    else 
+        echo Clustered replication test passed
+        rm -f repl*.tmp
+    fi
+
+fi

Propchange: qpid/trunk/qpid/cpp/src/tests/clustered_replication_test
------------------------------------------------------------------------------
    svn:executable = *



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org