You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ac...@apache.org on 2010/11/22 17:15:07 UTC

svn commit: r1037763 - in /qpid/trunk/qpid: cpp/src/qpid/cluster/ExpiryPolicy.cpp cpp/src/qpid/cluster/UpdateClient.cpp cpp/src/tests/cluster_tests.py python/qpid/brokertest.py

Author: aconway
Date: Mon Nov 22 16:15:06 2010
New Revision: 1037763

URL: http://svn.apache.org/viewvc?rev=1037763&view=rev
Log:
QPID-2956: cluster broker exits with "error deliveryRecord no update message."

The following sequence of events was causing a broker joining the cluster to shutdown:
- a client acquires or browses a message with TTL set.
- the message expires.
- a new broker joins before the client has acknowledged the message.

Modified:
    qpid/trunk/qpid/cpp/src/qpid/cluster/ExpiryPolicy.cpp
    qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateClient.cpp
    qpid/trunk/qpid/cpp/src/tests/cluster_tests.py
    qpid/trunk/qpid/python/qpid/brokertest.py

Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/ExpiryPolicy.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/ExpiryPolicy.cpp?rev=1037763&r1=1037762&r2=1037763&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/ExpiryPolicy.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/ExpiryPolicy.cpp Mon Nov 22 16:15:06 2010
@@ -31,7 +31,7 @@ namespace qpid {
 namespace cluster {
 
 ExpiryPolicy::ExpiryPolicy(Multicaster& m, const MemberId& id, sys::Timer& t)
-    : expiryId(0), expiredPolicy(new Expired), mcast(m), memberId(id), timer(t) {}
+    : expiryId(1), expiredPolicy(new Expired), mcast(m), memberId(id), timer(t) {}
 
 struct ExpiryTask : public sys::TimerTask {
     ExpiryTask(const boost::intrusive_ptr<ExpiryPolicy>& policy, uint64_t id, sys::AbsTime when)
@@ -61,12 +61,17 @@ void ExpiryPolicy::willExpire(broker::Me
         // them as independenty messages so we can have multiple messages
         // with the same expiry ID.
         //
-        // TODO: fix update to avoid duplicating messages.
         sys::Mutex::ScopedLock l(lock);
-        id = expiryId++;        // if this is an update, this expiryId may already exist
-        assert(unexpiredByMessage.find(&m) == unexpiredByMessage.end());
-        unexpiredById.insert(IdMessageMap::value_type(id, &m));
-        unexpiredByMessage[&m] = id;
+        id = expiryId++;
+        if (!id) {              // This is an update of an already-expired message.
+            m.setExpiryPolicy(expiredPolicy);
+        }
+        else {
+            assert(unexpiredByMessage.find(&m) == unexpiredByMessage.end());
+            // If this is an update, the id may already exist
+            unexpiredById.insert(IdMessageMap::value_type(id, &m));
+            unexpiredByMessage[&m] = id;
+        }
     }
     timer.add(new ExpiryTask(this, id, m.getExpiration()));
 }

Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateClient.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateClient.cpp?rev=1037763&r1=1037762&r2=1037763&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateClient.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateClient.cpp Mon Nov 22 16:15:06 2010
@@ -272,8 +272,7 @@ class MessageUpdater {
         // Send the expiry ID if necessary.
         if (message.payload->getProperties<DeliveryProperties>()->getTtl()) {
             boost::optional<uint64_t> expiryId = expiry.getId(*message.payload);
-            if (!expiryId) return; // Message already expired, don't replicate.
-            ClusterConnectionProxy(session).expiryId(*expiryId);
+            ClusterConnectionProxy(session).expiryId(expiryId?*expiryId:0);
         }
 
         // We can't send a broker::Message via the normal client API,
@@ -408,7 +407,8 @@ void UpdateClient::updateSession(broker:
 
     QPID_LOG(debug, updaterId << " updating unacknowledged messages.");
     broker::DeliveryRecords& drs = ss->getSemanticState().getUnacked();
-    std::for_each(drs.begin(), drs.end(),  boost::bind(&UpdateClient::updateUnacked, this, _1));
+    std::for_each(drs.begin(), drs.end(),
+                  boost::bind(&UpdateClient::updateUnacked, this, _1));
 
     updateTxState(ss->getSemanticState());           // Tx transaction state.
 

Modified: qpid/trunk/qpid/cpp/src/tests/cluster_tests.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/cluster_tests.py?rev=1037763&r1=1037762&r2=1037763&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/cluster_tests.py (original)
+++ qpid/trunk/qpid/cpp/src/tests/cluster_tests.py Mon Nov 22 16:15:06 2010
@@ -245,6 +245,25 @@ acl allow all all
         session1 = cluster[1].connect().session()
         for q in queues: self.assert_browse(session1, "q1", ["foo"])
 
+    def test_dr_no_message(self):
+        """Regression test for https://bugzilla.redhat.com/show_bug.cgi?id=655141
+        Joining broker crashes with 'error deliveryRecord no update message'
+        """
+
+        cluster = self.cluster(1)
+        session0 = cluster[0].connect().session()
+        s = session0.sender("q1;{create:always}")
+        s.send(Message("a", ttl=0.05), sync=False)
+        s.send(Message("b", ttl=0.05), sync=False)
+        r1 = session0.receiver("q1")
+        self.assertEqual("a", r1.fetch(timeout=0).content)
+        r2 = session0.receiver("q1;{mode:browse}")
+        self.assertEqual("b", r2.fetch(timeout=0).content)
+        # Leave messages un-acknowledged, let the expire, then start new broker.
+        time.sleep(.1)
+        cluster.start()
+        self.assertRaises(Empty, cluster[1].connect().session().receiver("q1").fetch,0)
+
 class LongTests(BrokerTest):
     """Tests that can run for a long time if -DDURATION=<minutes> is set"""
     def duration(self):
@@ -274,7 +293,7 @@ class LongTests(BrokerTest):
             i += 1
             b = cluster.start(expect=EXPECT_EXIT_FAIL)
             ErrorGenerator(b)
-            time.sleep(min(5,self.duration()/2))
+            time.sleep(5)
         sender.stop()
         receiver.stop()
         for i in range(i, len(cluster)): cluster[i].kill()
@@ -363,7 +382,7 @@ class LongTests(BrokerTest):
         start_mclients(cluster[alive])
 
         while time.time() < endtime:
-            time.sleep(max(5,self.duration()/4))
+            time.sleep(5)
             for b in cluster[alive:]: b.ready() # Check if a broker crashed.
             # Kill the first broker, expect the clients to fail. 
             b = cluster[alive]

Modified: qpid/trunk/qpid/python/qpid/brokertest.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/python/qpid/brokertest.py?rev=1037763&r1=1037762&r2=1037763&view=diff
==============================================================================
--- qpid/trunk/qpid/python/qpid/brokertest.py (original)
+++ qpid/trunk/qpid/python/qpid/brokertest.py Mon Nov 22 16:15:06 2010
@@ -335,7 +335,7 @@ class Broker(Popen):
             try: self._port = int(self.stdout.readline())
             except ValueError:
                 raise Exception("Can't get port for broker %s (%s)%s" %
-                                (self.name, self.pname, error_line(self.log,4)))
+                                (self.name, self.pname, error_line(self.log,5)))
         return self._port
 
     def unexpected(self,msg):
@@ -409,7 +409,7 @@ class Broker(Popen):
         # First make sure the broker is listening by checking the log.
         if not retry(self.log_ready, timeout=30):
             raise Exception(
-                "Timed out waiting for broker %s%s"%(self.name, error_line(self.log,4)))
+                "Timed out waiting for broker %s%s"%(self.name, error_line(self.log,5)))
         # Create a connection and a session. For a cluster broker this will
         # return after cluster init has finished.
         try:
@@ -417,7 +417,7 @@ class Broker(Popen):
             try: c.session()
             finally: c.close()
         except: raise RethrownException(
-            "Broker %s failed ready test%s"%(self.name,error_line(self.log,4)))
+            "Broker %s failed ready test%s"%(self.name,error_line(self.log, 5)))
 
     def store_state(self):
         uuids = open(os.path.join(self.datadir, "cluster", "store.status")).readlines()



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org