You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by gs...@apache.org on 2006/10/31 16:38:37 UTC
svn commit: r469530 - in /incubator/qpid/trunk/qpid/cpp:
src/qpid/broker/AccumulatedAck.cpp src/qpid/broker/TxPublish.h
test/unit/qpid/broker/AccumulatedAckTest.cpp
test/unit/qpid/broker/TxPublishTest.cpp
Author: gsim
Date: Tue Oct 31 07:38:36 2006
New Revision: 469530
URL: http://svn.apache.org/viewvc?view=rev&rev=469530
Log:
Added doc & unit tests.
Added:
incubator/qpid/trunk/qpid/cpp/test/unit/qpid/broker/TxPublishTest.cpp (with props)
Modified:
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/AccumulatedAck.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/TxPublish.h
incubator/qpid/trunk/qpid/cpp/test/unit/qpid/broker/AccumulatedAckTest.cpp
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/AccumulatedAck.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/AccumulatedAck.cpp?view=diff&rev=469530&r1=469529&r2=469530
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/AccumulatedAck.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/AccumulatedAck.cpp Tue Oct 31 07:38:36 2006
@@ -25,7 +25,7 @@
if(multiple){
if(tag > range) range = tag;
//else don't care, it is already counted
- }else if(tag < range){
+ }else if(tag > range){
individual.push_back(tag);
}
}
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/TxPublish.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/TxPublish.h?view=diff&rev=469530&r1=469529&r2=469530
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/TxPublish.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/TxPublish.h Tue Oct 31 07:38:36 2006
@@ -29,6 +29,16 @@
namespace qpid {
namespace broker {
+ /**
+ * Defines the behaviour for publish operations on a
+ * transactional channel. Messages are routed through
+ * exchanges when received but are not at that stage delivered
+ * to the matching queues, rather the queues are held in an
+ * instance of this class. On prepare() the message is marked
+ * enqueued to the relevant queues in the MessagesStore. On
+ * commit() the messages will be passed to the queue for
+ * dispatch or to be added to the in-memory queue.
+ */
class TxPublish : public TxOp, public Deliverable{
class Prepare{
Message::shared_ptr& msg;
Modified: incubator/qpid/trunk/qpid/cpp/test/unit/qpid/broker/AccumulatedAckTest.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/test/unit/qpid/broker/AccumulatedAckTest.cpp?view=diff&rev=469530&r1=469529&r2=469530
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/test/unit/qpid/broker/AccumulatedAckTest.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/test/unit/qpid/broker/AccumulatedAckTest.cpp Tue Oct 31 07:38:36 2006
@@ -27,6 +27,7 @@
{
CPPUNIT_TEST_SUITE(AccumulatedAckTest);
CPPUNIT_TEST(testCovers);
+ CPPUNIT_TEST(testUpdateAndConsolidate);
CPPUNIT_TEST_SUITE_END();
public:
@@ -48,6 +49,30 @@
CPPUNIT_ASSERT(!ack.covers(6));
CPPUNIT_ASSERT(!ack.covers(8));
CPPUNIT_ASSERT(!ack.covers(10));
+ }
+
+ void testUpdateAndConsolidate()
+ {
+ AccumulatedAck ack;
+ ack.clear();
+ ack.update(1, false);
+ ack.update(3, false);
+ ack.update(10, false);
+ ack.update(8, false);
+ ack.update(6, false);
+ ack.update(3, true);
+ ack.update(2, true);
+ ack.update(5, true);
+ ack.consolidate();
+ CPPUNIT_ASSERT_EQUAL((u_int64_t) 5, ack.range);
+ CPPUNIT_ASSERT_EQUAL((size_t) 3, ack.individual.size());
+ list<u_int64_t>::iterator i = ack.individual.begin();
+ CPPUNIT_ASSERT_EQUAL((u_int64_t) 6, *i);
+ i++;
+ CPPUNIT_ASSERT_EQUAL((u_int64_t) 8, *i);
+ i++;
+ CPPUNIT_ASSERT_EQUAL((u_int64_t) 10, *i);
+
}
};
Added: incubator/qpid/trunk/qpid/cpp/test/unit/qpid/broker/TxPublishTest.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/test/unit/qpid/broker/TxPublishTest.cpp?view=auto&rev=469530
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/test/unit/qpid/broker/TxPublishTest.cpp (added)
+++ incubator/qpid/trunk/qpid/cpp/test/unit/qpid/broker/TxPublishTest.cpp Tue Oct 31 07:38:36 2006
@@ -0,0 +1,103 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+#include "qpid/broker/MessageStore.h"
+#include "qpid/broker/TxPublish.h"
+#include <qpid_test_plugin.h>
+#include <iostream>
+#include <list>
+#include <vector>
+
+using std::list;
+using std::pair;
+using std::vector;
+using namespace qpid::broker;
+
+class TxPublishTest : public CppUnit::TestCase
+{
+
+ class TestMessageStore : public MessageStore
+ {
+ public:
+ vector< pair<string, Message::shared_ptr> > enqueued;
+
+ void enqueue(Message::shared_ptr& msg, const string& queue, const string * const /*xid*/)
+ {
+ enqueued.push_back(pair<string, Message::shared_ptr>(queue,msg));
+ }
+
+ //dont care about any of the other methods:
+ void dequeue(Message::shared_ptr&, const string&, const string * const){}
+ void committed(const string * const){}
+ void aborted(const string * const){}
+ void begin(){}
+ void commit(){}
+ void abort(){}
+ ~TestMessageStore(){}
+ };
+
+ CPPUNIT_TEST_SUITE(TxPublishTest);
+ CPPUNIT_TEST(testPrepare);
+ CPPUNIT_TEST(testCommit);
+ CPPUNIT_TEST_SUITE_END();
+
+
+ TestMessageStore store;
+ Queue::shared_ptr queue1;
+ Queue::shared_ptr queue2;
+ Message::shared_ptr msg;
+ TxPublish op;
+
+
+ public:
+
+ TxPublishTest() : queue1(new Queue("queue1", true, false, &store, 0)),
+ queue2(new Queue("queue2", true, false, &store, 0)),
+ msg(new Message(0, "exchange", "routing_key", false, false)),
+ op(msg)
+ {
+ op.deliverTo(queue1);
+ op.deliverTo(queue2);
+ }
+
+ void testPrepare()
+ {
+ //ensure messages are enqueued in store
+ op.prepare();
+ CPPUNIT_ASSERT_EQUAL((size_t) 2, store.enqueued.size());
+ CPPUNIT_ASSERT_EQUAL(string("queue1"), store.enqueued[0].first);
+ CPPUNIT_ASSERT_EQUAL(msg, store.enqueued[0].second);
+ CPPUNIT_ASSERT_EQUAL(string("queue2"), store.enqueued[1].first);
+ CPPUNIT_ASSERT_EQUAL(msg, store.enqueued[1].second);
+ }
+
+ void testCommit()
+ {
+ //ensure messages are delivered to queue
+ op.commit();
+ CPPUNIT_ASSERT_EQUAL((u_int32_t) 1, queue1->getMessageCount());
+ CPPUNIT_ASSERT_EQUAL(msg, queue1->dequeue());
+
+ CPPUNIT_ASSERT_EQUAL((u_int32_t) 1, queue2->getMessageCount());
+ CPPUNIT_ASSERT_EQUAL(msg, queue2->dequeue());
+ }
+};
+
+// Make this test suite a plugin.
+CPPUNIT_PLUGIN_IMPLEMENT();
+CPPUNIT_TEST_SUITE_REGISTRATION(TxPublishTest);
+
Propchange: incubator/qpid/trunk/qpid/cpp/test/unit/qpid/broker/TxPublishTest.cpp
------------------------------------------------------------------------------
svn:eol-style = native