You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by gs...@apache.org on 2006/10/31 16:38:37 UTC

svn commit: r469530 - in /incubator/qpid/trunk/qpid/cpp: src/qpid/broker/AccumulatedAck.cpp src/qpid/broker/TxPublish.h test/unit/qpid/broker/AccumulatedAckTest.cpp test/unit/qpid/broker/TxPublishTest.cpp

Author: gsim
Date: Tue Oct 31 07:38:36 2006
New Revision: 469530

URL: http://svn.apache.org/viewvc?view=rev&rev=469530
Log:
Added doc & unit tests.


Added:
    incubator/qpid/trunk/qpid/cpp/test/unit/qpid/broker/TxPublishTest.cpp   (with props)
Modified:
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/AccumulatedAck.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/TxPublish.h
    incubator/qpid/trunk/qpid/cpp/test/unit/qpid/broker/AccumulatedAckTest.cpp

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/AccumulatedAck.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/AccumulatedAck.cpp?view=diff&rev=469530&r1=469529&r2=469530
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/AccumulatedAck.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/AccumulatedAck.cpp Tue Oct 31 07:38:36 2006
@@ -25,7 +25,7 @@
     if(multiple){
         if(tag > range) range = tag;
         //else don't care, it is already counted
-    }else if(tag < range){
+    }else if(tag > range){
         individual.push_back(tag);
     }
 }

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/TxPublish.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/TxPublish.h?view=diff&rev=469530&r1=469529&r2=469530
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/TxPublish.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/TxPublish.h Tue Oct 31 07:38:36 2006
@@ -29,6 +29,16 @@
 
 namespace qpid {
     namespace broker {
+        /**
+         * Defines the behaviour for publish operations on a
+         * transactional channel. Messages are routed through
+         * exchanges when received but are not at that stage delivered
+         * to the matching queues, rather the queues are held in an
+         * instance of this class. On prepare() the message is marked
+         * enqueued to the relevant queues in the MessagesStore. On
+         * commit() the messages will be passed to the queue for
+         * dispatch or to be added to the in-memory queue.
+         */
         class TxPublish : public TxOp, public Deliverable{
             class Prepare{
                 Message::shared_ptr& msg;

Modified: incubator/qpid/trunk/qpid/cpp/test/unit/qpid/broker/AccumulatedAckTest.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/test/unit/qpid/broker/AccumulatedAckTest.cpp?view=diff&rev=469530&r1=469529&r2=469530
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/test/unit/qpid/broker/AccumulatedAckTest.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/test/unit/qpid/broker/AccumulatedAckTest.cpp Tue Oct 31 07:38:36 2006
@@ -27,6 +27,7 @@
 {
         CPPUNIT_TEST_SUITE(AccumulatedAckTest);
         CPPUNIT_TEST(testCovers);
+        CPPUNIT_TEST(testUpdateAndConsolidate);
         CPPUNIT_TEST_SUITE_END();
 
     public:
@@ -48,6 +49,30 @@
             CPPUNIT_ASSERT(!ack.covers(6));
             CPPUNIT_ASSERT(!ack.covers(8));
             CPPUNIT_ASSERT(!ack.covers(10));
+        }
+
+        void testUpdateAndConsolidate()
+        {
+            AccumulatedAck ack;
+            ack.clear();
+            ack.update(1, false);
+            ack.update(3, false);
+            ack.update(10, false);
+            ack.update(8, false);
+            ack.update(6, false);
+            ack.update(3, true);
+            ack.update(2, true);
+            ack.update(5, true);
+            ack.consolidate();
+            CPPUNIT_ASSERT_EQUAL((u_int64_t) 5, ack.range);
+            CPPUNIT_ASSERT_EQUAL((size_t) 3, ack.individual.size());
+            list<u_int64_t>::iterator i = ack.individual.begin();
+            CPPUNIT_ASSERT_EQUAL((u_int64_t) 6, *i);
+            i++;
+            CPPUNIT_ASSERT_EQUAL((u_int64_t) 8, *i);
+            i++;
+            CPPUNIT_ASSERT_EQUAL((u_int64_t) 10, *i);
+
         }
 };
 

Added: incubator/qpid/trunk/qpid/cpp/test/unit/qpid/broker/TxPublishTest.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/test/unit/qpid/broker/TxPublishTest.cpp?view=auto&rev=469530
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/test/unit/qpid/broker/TxPublishTest.cpp (added)
+++ incubator/qpid/trunk/qpid/cpp/test/unit/qpid/broker/TxPublishTest.cpp Tue Oct 31 07:38:36 2006
@@ -0,0 +1,103 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+#include "qpid/broker/MessageStore.h"
+#include "qpid/broker/TxPublish.h"
+#include <qpid_test_plugin.h>
+#include <iostream>
+#include <list>
+#include <vector>
+
+using std::list;
+using std::pair;
+using std::vector;
+using namespace qpid::broker;
+
+class TxPublishTest : public CppUnit::TestCase  
+{
+
+        class TestMessageStore : public MessageStore
+        {
+            public:
+                vector< pair<string, Message::shared_ptr> > enqueued;
+
+                void enqueue(Message::shared_ptr& msg, const string& queue, const string * const /*xid*/)
+                {
+                    enqueued.push_back(pair<string, Message::shared_ptr>(queue,msg));
+                }
+
+                //dont care about any of the other methods:
+                void dequeue(Message::shared_ptr&, const string&, const string * const){}
+                void committed(const string * const){}
+                void aborted(const string * const){}
+                void begin(){}
+                void commit(){}
+                void abort(){}        
+                ~TestMessageStore(){}
+        };
+
+        CPPUNIT_TEST_SUITE(TxPublishTest);
+        CPPUNIT_TEST(testPrepare);
+        CPPUNIT_TEST(testCommit);
+        CPPUNIT_TEST_SUITE_END();
+
+
+        TestMessageStore store;
+        Queue::shared_ptr queue1;
+        Queue::shared_ptr queue2;
+        Message::shared_ptr msg;
+        TxPublish op;
+
+
+    public:
+
+        TxPublishTest() : queue1(new Queue("queue1", true, false, &store, 0)), 
+                          queue2(new Queue("queue2", true, false, &store, 0)), 
+                          msg(new Message(0, "exchange", "routing_key", false, false)),
+                          op(msg)
+        {
+            op.deliverTo(queue1);
+            op.deliverTo(queue2);
+        }      
+
+        void testPrepare()
+        {
+            //ensure messages are enqueued in store
+            op.prepare();
+            CPPUNIT_ASSERT_EQUAL((size_t) 2, store.enqueued.size());
+            CPPUNIT_ASSERT_EQUAL(string("queue1"), store.enqueued[0].first);
+            CPPUNIT_ASSERT_EQUAL(msg, store.enqueued[0].second);
+            CPPUNIT_ASSERT_EQUAL(string("queue2"), store.enqueued[1].first);
+            CPPUNIT_ASSERT_EQUAL(msg, store.enqueued[1].second);
+        }
+
+        void testCommit()
+        {
+            //ensure messages are delivered to queue
+            op.commit();
+            CPPUNIT_ASSERT_EQUAL((u_int32_t) 1, queue1->getMessageCount());
+            CPPUNIT_ASSERT_EQUAL(msg, queue1->dequeue());
+
+            CPPUNIT_ASSERT_EQUAL((u_int32_t) 1, queue2->getMessageCount());
+            CPPUNIT_ASSERT_EQUAL(msg, queue2->dequeue());            
+        }
+};
+
+// Make this test suite a plugin.
+CPPUNIT_PLUGIN_IMPLEMENT();
+CPPUNIT_TEST_SUITE_REGISTRATION(TxPublishTest);
+

Propchange: incubator/qpid/trunk/qpid/cpp/test/unit/qpid/broker/TxPublishTest.cpp
------------------------------------------------------------------------------
    svn:eol-style = native