You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ac...@apache.org on 2006/12/01 06:11:53 UTC
svn commit: r481159 [12/12] - in /incubator/qpid/trunk/qpid/cpp: ./
build-aux/ gen/ lib/ lib/broker/ lib/client/ lib/common/
lib/common/framing/ lib/common/sys/ lib/common/sys/apr/
lib/common/sys/posix/ m4/ src/ src/qpid/ src/qpid/apr/ src/qpid/broker/...
Added: incubator/qpid/trunk/qpid/cpp/tests/MessageTest.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/tests/MessageTest.cpp?view=auto&rev=481159
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/tests/MessageTest.cpp (added)
+++ incubator/qpid/trunk/qpid/cpp/tests/MessageTest.cpp Thu Nov 30 21:11:45 2006
@@ -0,0 +1,90 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include <BrokerMessage.h>
+#include <qpid_test_plugin.h>
+#include <iostream>
+
+using namespace boost;
+using namespace qpid::broker;
+using namespace qpid::framing;
+
+struct DummyHandler : OutputHandler{
+ std::vector<AMQFrame*> frames;
+
+ virtual void send(AMQFrame* frame){
+ frames.push_back(frame);
+ }
+};
+
+class MessageTest : public CppUnit::TestCase
+{
+ CPPUNIT_TEST_SUITE(MessageTest);
+ CPPUNIT_TEST(testEncodeDecode);
+ CPPUNIT_TEST_SUITE_END();
+
+ public:
+
+ void testEncodeDecode()
+ {
+ string exchange = "MyExchange";
+ string routingKey = "MyRoutingKey";
+ string messageId = "MyMessage";
+ string data1("abcdefg");
+ string data2("hijklmn");
+
+ Message::shared_ptr msg = Message::shared_ptr(new Message(0, exchange, routingKey, false, false));
+ AMQHeaderBody::shared_ptr header(new AMQHeaderBody(BASIC));
+ header->setContentSize(14);
+ AMQContentBody::shared_ptr part1(new AMQContentBody(data1));
+ AMQContentBody::shared_ptr part2(new AMQContentBody(data2));
+ msg->setHeader(header);
+ msg->addContent(part1);
+ msg->addContent(part2);
+
+ msg->getHeaderProperties()->setMessageId(messageId);
+ msg->getHeaderProperties()->setDeliveryMode(PERSISTENT);
+ msg->getHeaderProperties()->getHeaders().setString("abc", "xyz");
+
+ Buffer buffer(msg->encodedSize());
+ msg->encode(buffer);
+ buffer.flip();
+
+ msg = Message::shared_ptr(new Message(buffer));
+ CPPUNIT_ASSERT_EQUAL(exchange, msg->getExchange());
+ CPPUNIT_ASSERT_EQUAL(routingKey, msg->getRoutingKey());
+ CPPUNIT_ASSERT_EQUAL(messageId, msg->getHeaderProperties()->getMessageId());
+ CPPUNIT_ASSERT_EQUAL((u_int8_t) PERSISTENT, msg->getHeaderProperties()->getDeliveryMode());
+ CPPUNIT_ASSERT_EQUAL(string("xyz"), msg->getHeaderProperties()->getHeaders().getString("abc"));
+ CPPUNIT_ASSERT_EQUAL((u_int64_t) 14, msg->contentSize());
+
+ DummyHandler handler;
+ msg->deliver(&handler, 0, "ignore", 0, 100);
+ CPPUNIT_ASSERT_EQUAL((size_t) 3, handler.frames.size());
+ AMQContentBody::shared_ptr contentBody(dynamic_pointer_cast<AMQContentBody, AMQBody>(handler.frames[2]->getBody()));
+ CPPUNIT_ASSERT(contentBody);
+ CPPUNIT_ASSERT_EQUAL(data1 + data2, contentBody->getData());
+ }
+};
+
+// Make this test suite a plugin.
+CPPUNIT_PLUGIN_IMPLEMENT();
+CPPUNIT_TEST_SUITE_REGISTRATION(MessageTest);
+
Propchange: incubator/qpid/trunk/qpid/cpp/tests/MessageTest.cpp
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: incubator/qpid/trunk/qpid/cpp/tests/MessageTest.cpp
------------------------------------------------------------------------------
svn:keywords = Rev Date
Added: incubator/qpid/trunk/qpid/cpp/tests/QueueRegistryTest.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/tests/QueueRegistryTest.cpp?view=auto&rev=481159
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/tests/QueueRegistryTest.cpp (added)
+++ incubator/qpid/trunk/qpid/cpp/tests/QueueRegistryTest.cpp Thu Nov 30 21:11:45 2006
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include <QueueRegistry.h>
+#include <qpid_test_plugin.h>
+#include <string>
+
+using namespace qpid::broker;
+
+class QueueRegistryTest : public CppUnit::TestCase
+{
+ CPPUNIT_TEST_SUITE(QueueRegistryTest);
+ CPPUNIT_TEST(testDeclare);
+ CPPUNIT_TEST(testDeclareTmp);
+ CPPUNIT_TEST(testFind);
+ CPPUNIT_TEST(testDestroy);
+ CPPUNIT_TEST_SUITE_END();
+
+ private:
+ std::string foo, bar;
+ QueueRegistry reg;
+ std::pair<Queue::shared_ptr, bool> qc;
+
+ public:
+ void setUp() {
+ foo = "foo";
+ bar = "bar";
+ }
+
+ void testDeclare() {
+ qc = reg.declare(foo, false, 0, 0);
+ Queue::shared_ptr q = qc.first;
+ CPPUNIT_ASSERT(q);
+ CPPUNIT_ASSERT(qc.second); // New queue
+ CPPUNIT_ASSERT_EQUAL(foo, q->getName());
+
+ qc = reg.declare(foo, false, 0, 0);
+ CPPUNIT_ASSERT_EQUAL(q, qc.first);
+ CPPUNIT_ASSERT(!qc.second);
+
+ qc = reg.declare(bar, false, 0, 0);
+ q = qc.first;
+ CPPUNIT_ASSERT(q);
+ CPPUNIT_ASSERT_EQUAL(true, qc.second);
+ CPPUNIT_ASSERT_EQUAL(bar, q->getName());
+ }
+
+ void testDeclareTmp()
+ {
+ qc = reg.declare(std::string(), false, 0, 0);
+ CPPUNIT_ASSERT(qc.second);
+ CPPUNIT_ASSERT_EQUAL(std::string("tmp_1"), qc.first->getName());
+ }
+
+ void testFind() {
+ CPPUNIT_ASSERT(reg.find(foo) == 0);
+
+ reg.declare(foo, false, 0, 0);
+ reg.declare(bar, false, 0, 0);
+ Queue::shared_ptr q = reg.find(bar);
+ CPPUNIT_ASSERT(q);
+ CPPUNIT_ASSERT_EQUAL(bar, q->getName());
+ }
+
+ void testDestroy() {
+ qc = reg.declare(foo, false, 0, 0);
+ reg.destroy(foo);
+ // Queue is gone from the registry.
+ CPPUNIT_ASSERT(reg.find(foo) == 0);
+ // Queue is not actually destroyed till we drop our reference.
+ CPPUNIT_ASSERT_EQUAL(foo, qc.first->getName());
+ // We shoud be the only reference.
+ CPPUNIT_ASSERT_EQUAL(1L, qc.first.use_count());
+ }
+};
+
+// Make this test suite a plugin.
+CPPUNIT_PLUGIN_IMPLEMENT();
+CPPUNIT_TEST_SUITE_REGISTRATION(QueueRegistryTest);
Propchange: incubator/qpid/trunk/qpid/cpp/tests/QueueRegistryTest.cpp
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: incubator/qpid/trunk/qpid/cpp/tests/QueueRegistryTest.cpp
------------------------------------------------------------------------------
svn:keywords = Rev Date
Added: incubator/qpid/trunk/qpid/cpp/tests/QueueTest.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/tests/QueueTest.cpp?view=auto&rev=481159
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/tests/QueueTest.cpp (added)
+++ incubator/qpid/trunk/qpid/cpp/tests/QueueTest.cpp Thu Nov 30 21:11:45 2006
@@ -0,0 +1,179 @@
+ /*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include <BrokerQueue.h>
+#include <QueueRegistry.h>
+#include <qpid_test_plugin.h>
+#include <iostream>
+
+using namespace qpid::broker;
+using namespace qpid::sys;
+
+
+class TestBinding : public virtual Binding{
+ bool cancelled;
+
+public:
+ TestBinding();
+ virtual void cancel();
+ bool isCancelled();
+};
+
+class TestConsumer : public virtual Consumer{
+public:
+ Message::shared_ptr last;
+
+ virtual bool deliver(Message::shared_ptr& msg);
+};
+
+
+class QueueTest : public CppUnit::TestCase
+{
+ CPPUNIT_TEST_SUITE(QueueTest);
+ CPPUNIT_TEST(testConsumers);
+ CPPUNIT_TEST(testBinding);
+ CPPUNIT_TEST(testRegistry);
+ CPPUNIT_TEST(testDequeue);
+ CPPUNIT_TEST_SUITE_END();
+
+ public:
+ void testConsumers(){
+ Queue::shared_ptr queue(new Queue("my_queue", true));
+
+ //Test adding consumers:
+ TestConsumer c1;
+ TestConsumer c2;
+ queue->consume(&c1);
+ queue->consume(&c2);
+
+ CPPUNIT_ASSERT_EQUAL(u_int32_t(2), queue->getConsumerCount());
+
+ //Test basic delivery:
+ Message::shared_ptr msg1 = Message::shared_ptr(new Message(0, "e", "A", true, true));
+ Message::shared_ptr msg2 = Message::shared_ptr(new Message(0, "e", "B", true, true));
+ Message::shared_ptr msg3 = Message::shared_ptr(new Message(0, "e", "C", true, true));
+
+ queue->deliver(msg1);
+ CPPUNIT_ASSERT_EQUAL(msg1.get(), c1.last.get());
+
+ queue->deliver(msg2);
+ CPPUNIT_ASSERT_EQUAL(msg2.get(), c2.last.get());
+
+ queue->deliver(msg3);
+ CPPUNIT_ASSERT_EQUAL(msg3.get(), c1.last.get());
+
+ //Test cancellation:
+ queue->cancel(&c1);
+ CPPUNIT_ASSERT_EQUAL(u_int32_t(1), queue->getConsumerCount());
+ queue->cancel(&c2);
+ CPPUNIT_ASSERT_EQUAL(u_int32_t(0), queue->getConsumerCount());
+ }
+
+ void testBinding(){
+ Queue::shared_ptr queue(new Queue("my_queue", true));
+ //Test bindings:
+ TestBinding a;
+ TestBinding b;
+ queue->bound(&a);
+ queue->bound(&b);
+
+ queue.reset();
+
+ CPPUNIT_ASSERT(a.isCancelled());
+ CPPUNIT_ASSERT(b.isCancelled());
+ }
+
+ void testRegistry(){
+ //Test use of queues in registry:
+ QueueRegistry registry;
+ registry.declare("queue1", true, true);
+ registry.declare("queue2", true, true);
+ registry.declare("queue3", true, true);
+
+ CPPUNIT_ASSERT(registry.find("queue1"));
+ CPPUNIT_ASSERT(registry.find("queue2"));
+ CPPUNIT_ASSERT(registry.find("queue3"));
+
+ registry.destroy("queue1");
+ registry.destroy("queue2");
+ registry.destroy("queue3");
+
+ CPPUNIT_ASSERT(!registry.find("queue1"));
+ CPPUNIT_ASSERT(!registry.find("queue2"));
+ CPPUNIT_ASSERT(!registry.find("queue3"));
+ }
+
+ void testDequeue(){
+ Queue::shared_ptr queue(new Queue("my_queue", true));
+
+ Message::shared_ptr msg1 = Message::shared_ptr(new Message(0, "e", "A", true, true));
+ Message::shared_ptr msg2 = Message::shared_ptr(new Message(0, "e", "B", true, true));
+ Message::shared_ptr msg3 = Message::shared_ptr(new Message(0, "e", "C", true, true));
+ Message::shared_ptr received;
+
+ queue->deliver(msg1);
+ queue->deliver(msg2);
+ queue->deliver(msg3);
+
+ CPPUNIT_ASSERT_EQUAL(u_int32_t(3), queue->getMessageCount());
+
+ received = queue->dequeue();
+ CPPUNIT_ASSERT_EQUAL(msg1.get(), received.get());
+ CPPUNIT_ASSERT_EQUAL(u_int32_t(2), queue->getMessageCount());
+
+ received = queue->dequeue();
+ CPPUNIT_ASSERT_EQUAL(msg2.get(), received.get());
+ CPPUNIT_ASSERT_EQUAL(u_int32_t(1), queue->getMessageCount());
+
+ TestConsumer consumer;
+ queue->consume(&consumer);
+ queue->dispatch();
+ CPPUNIT_ASSERT_EQUAL(msg3.get(), consumer.last.get());
+ CPPUNIT_ASSERT_EQUAL(u_int32_t(0), queue->getMessageCount());
+
+ received = queue->dequeue();
+ CPPUNIT_ASSERT(!received);
+ CPPUNIT_ASSERT_EQUAL(u_int32_t(0), queue->getMessageCount());
+
+ }
+};
+
+// Make this test suite a plugin.
+CPPUNIT_PLUGIN_IMPLEMENT();
+CPPUNIT_TEST_SUITE_REGISTRATION(QueueTest);
+
+//TestBinding
+TestBinding::TestBinding() : cancelled(false) {}
+
+void TestBinding::cancel(){
+ CPPUNIT_ASSERT(!cancelled);
+ cancelled = true;
+}
+
+bool TestBinding::isCancelled(){
+ return cancelled;
+}
+
+//TestConsumer
+bool TestConsumer::deliver(Message::shared_ptr& msg){
+ last = msg;
+ return true;
+}
+
Propchange: incubator/qpid/trunk/qpid/cpp/tests/QueueTest.cpp
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: incubator/qpid/trunk/qpid/cpp/tests/QueueTest.cpp
------------------------------------------------------------------------------
svn:keywords = Rev Date
Added: incubator/qpid/trunk/qpid/cpp/tests/TopicExchangeTest.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/tests/TopicExchangeTest.cpp?view=auto&rev=481159
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/tests/TopicExchangeTest.cpp (added)
+++ incubator/qpid/trunk/qpid/cpp/tests/TopicExchangeTest.cpp Thu Nov 30 21:11:45 2006
@@ -0,0 +1,200 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+#include <TopicExchange.h>
+#include <qpid_test_plugin.h>
+
+using namespace qpid::broker;
+
+Tokens makeTokens(char** begin, char** end)
+{
+ Tokens t;
+ t.insert(t.end(), begin, end);
+ return t;
+}
+
+// Calculate size of an array.
+#define LEN(a) (sizeof(a)/sizeof(a[0]))
+
+// Convert array to token vector
+#define TOKENS(a) makeTokens(a, a + LEN(a))
+
+// Allow CPPUNIT_EQUALS to print a Tokens.
+CppUnit::OStringStream& operator <<(CppUnit::OStringStream& out, const Tokens& v)
+{
+ out << "[ ";
+ for (Tokens::const_iterator i = v.begin();
+ i != v.end(); ++i)
+ {
+ out << '"' << *i << '"' << (i+1 == v.end() ? "]" : ", ");
+ }
+ return out;
+}
+
+
+class TokensTest : public CppUnit::TestCase
+{
+ CPPUNIT_TEST_SUITE(TokensTest);
+ CPPUNIT_TEST(testTokens);
+ CPPUNIT_TEST_SUITE_END();
+
+ public:
+ void testTokens()
+ {
+ Tokens tokens("hello.world");
+ char* expect[] = {"hello", "world"};
+ CPPUNIT_ASSERT_EQUAL(TOKENS(expect), tokens);
+
+ tokens = "a.b.c";
+ char* expect2[] = { "a", "b", "c" };
+ CPPUNIT_ASSERT_EQUAL(TOKENS(expect2), tokens);
+
+ tokens = "";
+ CPPUNIT_ASSERT(tokens.empty());
+
+ tokens = "x";
+ char* expect3[] = { "x" };
+ CPPUNIT_ASSERT_EQUAL(TOKENS(expect3), tokens);
+
+ tokens = (".x");
+ char* expect4[] = { "", "x" };
+ CPPUNIT_ASSERT_EQUAL(TOKENS(expect4), tokens);
+
+ tokens = ("x.");
+ char* expect5[] = { "x", "" };
+ CPPUNIT_ASSERT_EQUAL(TOKENS(expect5), tokens);
+
+ tokens = (".");
+ char* expect6[] = { "", "" };
+ CPPUNIT_ASSERT_EQUAL(TOKENS(expect6), tokens);
+
+ tokens = ("..");
+ char* expect7[] = { "", "", "" };
+ CPPUNIT_ASSERT_EQUAL(TOKENS(expect7), tokens);
+ }
+
+};
+
+#define ASSERT_NORMALIZED(expect, pattern) \
+ CPPUNIT_ASSERT_EQUAL(Tokens(expect), static_cast<Tokens>(TopicPattern(pattern)))
+class TopicPatternTest : public CppUnit::TestCase
+{
+ CPPUNIT_TEST_SUITE(TopicPatternTest);
+ CPPUNIT_TEST(testNormalize);
+ CPPUNIT_TEST(testPlain);
+ CPPUNIT_TEST(testStar);
+ CPPUNIT_TEST(testHash);
+ CPPUNIT_TEST(testMixed);
+ CPPUNIT_TEST(testCombo);
+ CPPUNIT_TEST_SUITE_END();
+
+ public:
+
+ void testNormalize()
+ {
+ CPPUNIT_ASSERT(TopicPattern("").empty());
+ ASSERT_NORMALIZED("a.b.c", "a.b.c");
+ ASSERT_NORMALIZED("a.*.c", "a.*.c");
+ ASSERT_NORMALIZED("#", "#");
+ ASSERT_NORMALIZED("#", "#.#.#.#");
+ ASSERT_NORMALIZED("*.*.*.#", "#.*.#.*.#.#.*");
+ ASSERT_NORMALIZED("a.*.*.*.#", "a.*.#.*.#.*.#");
+ ASSERT_NORMALIZED("a.*.*.*.#", "a.*.#.*.#.*");
+ }
+
+ void testPlain() {
+ TopicPattern p("ab.cd.e");
+ CPPUNIT_ASSERT(p.match("ab.cd.e"));
+ CPPUNIT_ASSERT(!p.match("abx.cd.e"));
+ CPPUNIT_ASSERT(!p.match("ab.cd"));
+ CPPUNIT_ASSERT(!p.match("ab.cd..e."));
+ CPPUNIT_ASSERT(!p.match("ab.cd.e."));
+ CPPUNIT_ASSERT(!p.match(".ab.cd.e"));
+
+ p = "";
+ CPPUNIT_ASSERT(p.match(""));
+
+ p = ".";
+ CPPUNIT_ASSERT(p.match("."));
+ }
+
+
+ void testStar()
+ {
+ TopicPattern p("a.*.b");
+ CPPUNIT_ASSERT(p.match("a.xx.b"));
+ CPPUNIT_ASSERT(!p.match("a.b"));
+
+ p = "*.x";
+ CPPUNIT_ASSERT(p.match("y.x"));
+ CPPUNIT_ASSERT(p.match(".x"));
+ CPPUNIT_ASSERT(!p.match("x"));
+
+ p = "x.x.*";
+ CPPUNIT_ASSERT(p.match("x.x.y"));
+ CPPUNIT_ASSERT(p.match("x.x."));
+ CPPUNIT_ASSERT(!p.match("x.x"));
+ CPPUNIT_ASSERT(!p.match("q.x.y"));
+ }
+
+ void testHash()
+ {
+ TopicPattern p("a.#.b");
+ CPPUNIT_ASSERT(p.match("a.b"));
+ CPPUNIT_ASSERT(p.match("a.x.b"));
+ CPPUNIT_ASSERT(p.match("a..x.y.zz.b"));
+ CPPUNIT_ASSERT(!p.match("a.b."));
+ CPPUNIT_ASSERT(!p.match("q.x.b"));
+
+ p = "a.#";
+ CPPUNIT_ASSERT(p.match("a"));
+ CPPUNIT_ASSERT(p.match("a.b"));
+ CPPUNIT_ASSERT(p.match("a.b.c"));
+
+ p = "#.a";
+ CPPUNIT_ASSERT(p.match("a"));
+ CPPUNIT_ASSERT(p.match("x.y.a"));
+ }
+
+ void testMixed()
+ {
+ TopicPattern p("*.x.#.y");
+ CPPUNIT_ASSERT(p.match("a.x.y"));
+ CPPUNIT_ASSERT(p.match("a.x.p.qq.y"));
+ CPPUNIT_ASSERT(!p.match("a.a.x.y"));
+ CPPUNIT_ASSERT(!p.match("aa.x.b.c"));
+
+ p = "a.#.b.*";
+ CPPUNIT_ASSERT(p.match("a.b.x"));
+ CPPUNIT_ASSERT(p.match("a.x.x.x.b.x"));
+ }
+
+ void testCombo() {
+ TopicPattern p("*.#.#.*.*.#");
+ CPPUNIT_ASSERT(p.match("x.y.z"));
+ CPPUNIT_ASSERT(p.match("x.y.z.a.b.c"));
+ CPPUNIT_ASSERT(!p.match("x.y"));
+ CPPUNIT_ASSERT(!p.match("x"));
+ }
+};
+
+
+// Make this test suite a plugin.
+CPPUNIT_PLUGIN_IMPLEMENT();
+CPPUNIT_TEST_SUITE_REGISTRATION(TopicPatternTest);
+CPPUNIT_TEST_SUITE_REGISTRATION(TokensTest);
Propchange: incubator/qpid/trunk/qpid/cpp/tests/TopicExchangeTest.cpp
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: incubator/qpid/trunk/qpid/cpp/tests/TopicExchangeTest.cpp
------------------------------------------------------------------------------
svn:keywords = Rev Date
Added: incubator/qpid/trunk/qpid/cpp/tests/TxAckTest.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/tests/TxAckTest.cpp?view=auto&rev=481159
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/tests/TxAckTest.cpp (added)
+++ incubator/qpid/trunk/qpid/cpp/tests/TxAckTest.cpp Thu Nov 30 21:11:45 2006
@@ -0,0 +1,113 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include <NullMessageStore.h>
+#include <RecoveryManager.h>
+#include <TxAck.h>
+#include <qpid_test_plugin.h>
+#include <iostream>
+#include <list>
+#include <vector>
+
+using std::list;
+using std::vector;
+using namespace qpid::broker;
+using namespace qpid::framing;
+
+class TxAckTest : public CppUnit::TestCase
+{
+
+ class TestMessageStore : public NullMessageStore
+ {
+ public:
+ vector<Message::shared_ptr> dequeued;
+
+ void dequeue(TransactionContext*, Message::shared_ptr& msg, const Queue& /*queue*/, const string * const /*xid*/)
+ {
+ dequeued.push_back(msg);
+ }
+
+ TestMessageStore() : NullMessageStore(false) {}
+ ~TestMessageStore(){}
+ };
+
+ CPPUNIT_TEST_SUITE(TxAckTest);
+ CPPUNIT_TEST(testPrepare);
+ CPPUNIT_TEST(testCommit);
+ CPPUNIT_TEST_SUITE_END();
+
+
+ AccumulatedAck acked;
+ TestMessageStore store;
+ Queue::shared_ptr queue;
+ vector<Message::shared_ptr> messages;
+ list<DeliveryRecord> deliveries;
+ TxAck op;
+
+
+public:
+
+ TxAckTest() : queue(new Queue("my_queue", false, &store, 0)), op(acked, deliveries)
+ {
+ for(int i = 0; i < 10; i++){
+ Message::shared_ptr msg(new Message(0, "exchange", "routing_key", false, false));
+ msg->setHeader(AMQHeaderBody::shared_ptr(new AMQHeaderBody(BASIC)));
+ msg->getHeaderProperties()->setDeliveryMode(PERSISTENT);
+ messages.push_back(msg);
+ deliveries.push_back(DeliveryRecord(msg, queue, "xyz", (i+1)));
+ }
+
+ //assume msgs 1-5, 7 and 9 are all acked (i.e. 6, 8 & 10 are not)
+ acked.range = 5;
+ acked.individual.push_back(7);
+ acked.individual.push_back(9);
+ }
+
+ void testPrepare()
+ {
+ //ensure acked messages are discarded, i.e. dequeued from store
+ op.prepare(0);
+ CPPUNIT_ASSERT_EQUAL((size_t) 7, store.dequeued.size());
+ CPPUNIT_ASSERT_EQUAL((size_t) 10, deliveries.size());
+ CPPUNIT_ASSERT_EQUAL(messages[0], store.dequeued[0]);//msg 1
+ CPPUNIT_ASSERT_EQUAL(messages[1], store.dequeued[1]);//msg 2
+ CPPUNIT_ASSERT_EQUAL(messages[2], store.dequeued[2]);//msg 3
+ CPPUNIT_ASSERT_EQUAL(messages[3], store.dequeued[3]);//msg 4
+ CPPUNIT_ASSERT_EQUAL(messages[4], store.dequeued[4]);//msg 5
+ CPPUNIT_ASSERT_EQUAL(messages[6], store.dequeued[5]);//msg 7
+ CPPUNIT_ASSERT_EQUAL(messages[8], store.dequeued[6]);//msg 9
+ }
+
+ void testCommit()
+ {
+ //emsure acked messages are removed from list
+ op.commit();
+ CPPUNIT_ASSERT_EQUAL((size_t) 3, deliveries.size());
+ list<DeliveryRecord>::iterator i = deliveries.begin();
+ CPPUNIT_ASSERT(i->matches(6));//msg 6
+ CPPUNIT_ASSERT((++i)->matches(8));//msg 8
+ CPPUNIT_ASSERT((++i)->matches(10));//msg 10
+ }
+};
+
+// Make this test suite a plugin.
+CPPUNIT_PLUGIN_IMPLEMENT();
+CPPUNIT_TEST_SUITE_REGISTRATION(TxAckTest);
+
Propchange: incubator/qpid/trunk/qpid/cpp/tests/TxAckTest.cpp
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: incubator/qpid/trunk/qpid/cpp/tests/TxAckTest.cpp
------------------------------------------------------------------------------
svn:keywords = Rev Date
Added: incubator/qpid/trunk/qpid/cpp/tests/TxBufferTest.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/tests/TxBufferTest.cpp?view=auto&rev=481159
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/tests/TxBufferTest.cpp (added)
+++ incubator/qpid/trunk/qpid/cpp/tests/TxBufferTest.cpp Thu Nov 30 21:11:45 2006
@@ -0,0 +1,266 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include <TxBuffer.h>
+#include <qpid_test_plugin.h>
+#include <iostream>
+#include <vector>
+
+using namespace qpid::broker;
+
+template <class T> void assertEqualVector(std::vector<T>& expected, std::vector<T>& actual){
+ unsigned int i = 0;
+ while(i < expected.size() && i < actual.size()){
+ CPPUNIT_ASSERT_EQUAL(expected[i], actual[i]);
+ i++;
+ }
+ CPPUNIT_ASSERT(i == expected.size());
+ CPPUNIT_ASSERT(i == actual.size());
+}
+
+class TxBufferTest : public CppUnit::TestCase
+{
+ class MockTxOp : public TxOp{
+ enum op_codes {PREPARE=2, COMMIT=4, ROLLBACK=8};
+ std::vector<int> expected;
+ std::vector<int> actual;
+ bool failOnPrepare;
+ public:
+ MockTxOp() : failOnPrepare(false) {}
+ MockTxOp(bool _failOnPrepare) : failOnPrepare(_failOnPrepare) {}
+
+ bool prepare(TransactionContext*) throw(){
+ actual.push_back(PREPARE);
+ return !failOnPrepare;
+ }
+ void commit() throw(){
+ actual.push_back(COMMIT);
+ }
+ void rollback() throw(){
+ actual.push_back(ROLLBACK);
+ }
+ MockTxOp& expectPrepare(){
+ expected.push_back(PREPARE);
+ return *this;
+ }
+ MockTxOp& expectCommit(){
+ expected.push_back(COMMIT);
+ return *this;
+ }
+ MockTxOp& expectRollback(){
+ expected.push_back(ROLLBACK);
+ return *this;
+ }
+ void check(){
+ assertEqualVector(expected, actual);
+ }
+ ~MockTxOp(){}
+ };
+
+ class MockTransactionalStore : public TransactionalStore{
+ enum op_codes {BEGIN=2, COMMIT=4, ABORT=8};
+ std::vector<int> expected;
+ std::vector<int> actual;
+
+ enum states {OPEN = 1, COMMITTED = 2, ABORTED = 3};
+ int state;
+
+ class TestTransactionContext : public TransactionContext{
+ MockTransactionalStore* store;
+ public:
+ TestTransactionContext(MockTransactionalStore* _store) : store(_store) {}
+ void commit(){
+ if(store->state != OPEN) throw "txn already completed";
+ store->state = COMMITTED;
+ }
+
+ void abort(){
+ if(store->state != OPEN) throw "txn already completed";
+ store->state = ABORTED;
+ }
+ ~TestTransactionContext(){}
+ };
+
+
+ public:
+ MockTransactionalStore() : state(OPEN){}
+
+ std::auto_ptr<TransactionContext> begin(){
+ actual.push_back(BEGIN);
+ std::auto_ptr<TransactionContext> txn(new TestTransactionContext(this));
+ return txn;
+ }
+ void commit(TransactionContext* ctxt){
+ actual.push_back(COMMIT);
+ TestTransactionContext* txn(dynamic_cast<TestTransactionContext*>(ctxt));
+ CPPUNIT_ASSERT(txn);
+ txn->commit();
+ }
+ void abort(TransactionContext* ctxt){
+ actual.push_back(ABORT);
+ TestTransactionContext* txn(dynamic_cast<TestTransactionContext*>(ctxt));
+ CPPUNIT_ASSERT(txn);
+ txn->abort();
+ }
+ MockTransactionalStore& expectBegin(){
+ expected.push_back(BEGIN);
+ return *this;
+ }
+ MockTransactionalStore& expectCommit(){
+ expected.push_back(COMMIT);
+ return *this;
+ }
+ MockTransactionalStore& expectAbort(){
+ expected.push_back(ABORT);
+ return *this;
+ }
+ void check(){
+ assertEqualVector(expected, actual);
+ }
+
+ bool isCommitted(){
+ return state == COMMITTED;
+ }
+
+ bool isAborted(){
+ return state == ABORTED;
+ }
+
+ bool isOpen(){
+ return state == OPEN;
+ }
+ ~MockTransactionalStore(){}
+ };
+
+ CPPUNIT_TEST_SUITE(TxBufferTest);
+ CPPUNIT_TEST(testPrepareAndCommit);
+ CPPUNIT_TEST(testFailOnPrepare);
+ CPPUNIT_TEST(testRollback);
+ CPPUNIT_TEST(testBufferIsClearedAfterRollback);
+ CPPUNIT_TEST(testBufferIsClearedAfterCommit);
+ CPPUNIT_TEST_SUITE_END();
+
+ public:
+
+ void testPrepareAndCommit(){
+ MockTransactionalStore store;
+ store.expectBegin().expectCommit();
+
+ MockTxOp opA;
+ opA.expectPrepare().expectCommit();
+ MockTxOp opB;
+ opB.expectPrepare().expectPrepare().expectCommit().expectCommit();//opB enlisted twice to test reative order
+ MockTxOp opC;
+ opC.expectPrepare().expectCommit();
+
+ TxBuffer buffer;
+ buffer.enlist(&opA);
+ buffer.enlist(&opB);
+ buffer.enlist(&opB);//opB enlisted twice
+ buffer.enlist(&opC);
+
+ CPPUNIT_ASSERT(buffer.prepare(&store));
+ buffer.commit();
+ store.check();
+ CPPUNIT_ASSERT(store.isCommitted());
+ opA.check();
+ opB.check();
+ opC.check();
+ }
+
+ void testFailOnPrepare(){
+ MockTransactionalStore store;
+ store.expectBegin().expectAbort();
+
+ MockTxOp opA;
+ opA.expectPrepare();
+ MockTxOp opB(true);
+ opB.expectPrepare();
+ MockTxOp opC;//will never get prepare as b will fail
+
+ TxBuffer buffer;
+ buffer.enlist(&opA);
+ buffer.enlist(&opB);
+ buffer.enlist(&opC);
+
+ CPPUNIT_ASSERT(!buffer.prepare(&store));
+ store.check();
+ CPPUNIT_ASSERT(store.isAborted());
+ opA.check();
+ opB.check();
+ opC.check();
+ }
+
+ void testRollback(){
+ MockTxOp opA;
+ opA.expectRollback();
+ MockTxOp opB(true);
+ opB.expectRollback();
+ MockTxOp opC;
+ opC.expectRollback();
+
+ TxBuffer buffer;
+ buffer.enlist(&opA);
+ buffer.enlist(&opB);
+ buffer.enlist(&opC);
+
+ buffer.rollback();
+ opA.check();
+ opB.check();
+ opC.check();
+ }
+
+ void testBufferIsClearedAfterRollback(){
+ MockTxOp opA;
+ opA.expectRollback();
+ MockTxOp opB;
+ opB.expectRollback();
+
+ TxBuffer buffer;
+ buffer.enlist(&opA);
+ buffer.enlist(&opB);
+
+ buffer.rollback();
+ buffer.commit();//second call should not reach ops
+ opA.check();
+ opB.check();
+ }
+
+ void testBufferIsClearedAfterCommit(){
+ MockTxOp opA;
+ opA.expectCommit();
+ MockTxOp opB;
+ opB.expectCommit();
+
+ TxBuffer buffer;
+ buffer.enlist(&opA);
+ buffer.enlist(&opB);
+
+ buffer.commit();
+ buffer.rollback();//second call should not reach ops
+ opA.check();
+ opB.check();
+ }
+};
+
+// Make this test suite a plugin.
+CPPUNIT_PLUGIN_IMPLEMENT();
+CPPUNIT_TEST_SUITE_REGISTRATION(TxBufferTest);
+
Propchange: incubator/qpid/trunk/qpid/cpp/tests/TxBufferTest.cpp
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: incubator/qpid/trunk/qpid/cpp/tests/TxBufferTest.cpp
------------------------------------------------------------------------------
svn:keywords = Rev Date
Added: incubator/qpid/trunk/qpid/cpp/tests/TxPublishTest.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/tests/TxPublishTest.cpp?view=auto&rev=481159
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/tests/TxPublishTest.cpp (added)
+++ incubator/qpid/trunk/qpid/cpp/tests/TxPublishTest.cpp Thu Nov 30 21:11:45 2006
@@ -0,0 +1,105 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include <NullMessageStore.h>
+#include <RecoveryManager.h>
+#include <TxPublish.h>
+#include <qpid_test_plugin.h>
+#include <iostream>
+#include <list>
+#include <vector>
+
+using std::list;
+using std::pair;
+using std::vector;
+using namespace qpid::broker;
+using namespace qpid::framing;
+
+class TxPublishTest : public CppUnit::TestCase
+{
+
+ class TestMessageStore : public NullMessageStore
+ {
+ public:
+ vector< pair<string, Message::shared_ptr> > enqueued;
+
+ void enqueue(TransactionContext*, Message::shared_ptr& msg, const Queue& queue, const string * const /*xid*/)
+ {
+ enqueued.push_back(pair<string, Message::shared_ptr>(queue.getName(),msg));
+ }
+
+ //dont care about any of the other methods:
+ TestMessageStore() : NullMessageStore(false) {}
+ ~TestMessageStore(){}
+ };
+
+ CPPUNIT_TEST_SUITE(TxPublishTest);
+ CPPUNIT_TEST(testPrepare);
+ CPPUNIT_TEST(testCommit);
+ CPPUNIT_TEST_SUITE_END();
+
+
+ TestMessageStore store;
+ Queue::shared_ptr queue1;
+ Queue::shared_ptr queue2;
+ Message::shared_ptr msg;
+ TxPublish op;
+
+
+public:
+
+ TxPublishTest() : queue1(new Queue("queue1", false, &store, 0)),
+ queue2(new Queue("queue2", false, &store, 0)),
+ msg(new Message(0, "exchange", "routing_key", false, false)),
+ op(msg)
+ {
+ msg->setHeader(AMQHeaderBody::shared_ptr(new AMQHeaderBody(BASIC)));
+ msg->getHeaderProperties()->setDeliveryMode(PERSISTENT);
+ op.deliverTo(queue1);
+ op.deliverTo(queue2);
+ }
+
+ void testPrepare()
+ {
+ //ensure messages are enqueued in store
+ op.prepare(0);
+ CPPUNIT_ASSERT_EQUAL((size_t) 2, store.enqueued.size());
+ CPPUNIT_ASSERT_EQUAL(string("queue1"), store.enqueued[0].first);
+ CPPUNIT_ASSERT_EQUAL(msg, store.enqueued[0].second);
+ CPPUNIT_ASSERT_EQUAL(string("queue2"), store.enqueued[1].first);
+ CPPUNIT_ASSERT_EQUAL(msg, store.enqueued[1].second);
+ }
+
+ void testCommit()
+ {
+ //ensure messages are delivered to queue
+ op.commit();
+ CPPUNIT_ASSERT_EQUAL((u_int32_t) 1, queue1->getMessageCount());
+ CPPUNIT_ASSERT_EQUAL(msg, queue1->dequeue());
+
+ CPPUNIT_ASSERT_EQUAL((u_int32_t) 1, queue2->getMessageCount());
+ CPPUNIT_ASSERT_EQUAL(msg, queue2->dequeue());
+ }
+};
+
+// Make this test suite a plugin.
+CPPUNIT_PLUGIN_IMPLEMENT();
+CPPUNIT_TEST_SUITE_REGISTRATION(TxPublishTest);
+
Propchange: incubator/qpid/trunk/qpid/cpp/tests/TxPublishTest.cpp
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: incubator/qpid/trunk/qpid/cpp/tests/TxPublishTest.cpp
------------------------------------------------------------------------------
svn:keywords = Rev Date
Added: incubator/qpid/trunk/qpid/cpp/tests/ValueTest.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/tests/ValueTest.cpp?view=auto&rev=481159
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/tests/ValueTest.cpp (added)
+++ incubator/qpid/trunk/qpid/cpp/tests/ValueTest.cpp Thu Nov 30 21:11:45 2006
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+#include <Value.h>
+#include <qpid_test_plugin.h>
+
+using namespace qpid::framing;
+
+
+class ValueTest : public CppUnit::TestCase
+{
+ CPPUNIT_TEST_SUITE(ValueTest);
+ CPPUNIT_TEST(testStringValueEquals);
+ CPPUNIT_TEST(testIntegerValueEquals);
+ CPPUNIT_TEST(testDecimalValueEquals);
+ CPPUNIT_TEST(testFieldTableValueEquals);
+ CPPUNIT_TEST_SUITE_END();
+
+ StringValue s;
+ IntegerValue i;
+ DecimalValue d;
+ FieldTableValue ft;
+ EmptyValue e;
+
+ public:
+ ValueTest() :
+ s("abc"),
+ i(42),
+ d(1234,2)
+
+ {
+ ft.getValue().setString("foo", "FOO");
+ ft.getValue().setInt("magic", 7);
+ }
+
+ void testStringValueEquals()
+ {
+
+ CPPUNIT_ASSERT(StringValue("abc") == s);
+ CPPUNIT_ASSERT(s != StringValue("foo"));
+ CPPUNIT_ASSERT(s != e);
+ CPPUNIT_ASSERT(e != d);
+ CPPUNIT_ASSERT(e != ft);
+ }
+
+ void testIntegerValueEquals()
+ {
+ CPPUNIT_ASSERT(IntegerValue(42) == i);
+ CPPUNIT_ASSERT(IntegerValue(5) != i);
+ CPPUNIT_ASSERT(i != e);
+ CPPUNIT_ASSERT(i != d);
+ }
+
+ void testDecimalValueEquals()
+ {
+ CPPUNIT_ASSERT(DecimalValue(1234, 2) == d);
+ CPPUNIT_ASSERT(DecimalValue(12345, 2) != d);
+ CPPUNIT_ASSERT(DecimalValue(1234, 3) != d);
+ CPPUNIT_ASSERT(d != s);
+ }
+
+
+ void testFieldTableValueEquals()
+ {
+ CPPUNIT_ASSERT_EQUAL(std::string("FOO"),
+ ft.getValue().getString("foo"));
+ CPPUNIT_ASSERT_EQUAL(7, ft.getValue().getInt("magic"));
+
+ FieldTableValue f2;
+ CPPUNIT_ASSERT(ft != f2);
+ f2.getValue().setString("foo", "FOO");
+ CPPUNIT_ASSERT(ft != f2);
+ f2.getValue().setInt("magic", 7);
+ CPPUNIT_ASSERT_EQUAL(ft,f2);
+ CPPUNIT_ASSERT(ft == f2);
+ f2.getValue().setString("foo", "BAR");
+ CPPUNIT_ASSERT(ft != f2);
+ CPPUNIT_ASSERT(ft != i);
+ }
+
+};
+
+
+// Make this test suite a plugin.
+CPPUNIT_PLUGIN_IMPLEMENT();
+CPPUNIT_TEST_SUITE_REGISTRATION(ValueTest);
+
Propchange: incubator/qpid/trunk/qpid/cpp/tests/ValueTest.cpp
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: incubator/qpid/trunk/qpid/cpp/tests/ValueTest.cpp
------------------------------------------------------------------------------
svn:keywords = Rev Date
Added: incubator/qpid/trunk/qpid/cpp/tests/broker
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/tests/broker?view=auto&rev=481159
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/tests/broker (added)
+++ incubator/qpid/trunk/qpid/cpp/tests/broker Thu Nov 30 21:11:45 2006
@@ -0,0 +1,45 @@
+#!/bin/sh
+. `dirname $0`/env
+
+brokerpid() {
+ netstat -tpl 2> /dev/null | awk '/amqp/ {print gensub("/.*$","","g",$7) }'
+}
+
+killbroker () {
+ PID=`brokerpid`
+ if [ -n "$PID" ] ; then kill $PID ; fi
+ for ((i=5;i--;)) {
+ if [ -z "`brokerpid`" ] ; then exit 0 ; fi
+ sleep 1
+ }
+ echo "Broker `brokerpid` refuses to die."
+}
+
+waitbroker () {
+ while [ -z `brokerpid` ] ; do sleep 1 ; done
+}
+
+startbroker() {
+ case $1 in
+ j)
+ export AMQJ_LOGGING_LEVEL=fatal
+ export JDPA_OPTS=
+ export QPID_OPTS=-Xmx1024M
+ export debug=1
+ CMD="qpid-server"
+ qpid-run -run:print-command # Show the command line.
+ ;;
+ c) CMD=qpidd ;;
+ esac
+ nohup $CMD > /dev/null 2>&1 &
+ waitbroker
+ echo Broker started: $CMD
+}
+
+
+case $1 in
+ j|c) startbroker $1 ;;
+ stop|kill) killbroker ;;
+ wait) waitbroker ;;
+ pid) brokerpid ;;
+esac
Propchange: incubator/qpid/trunk/qpid/cpp/tests/broker
------------------------------------------------------------------------------
svn:executable = *
Added: incubator/qpid/trunk/qpid/cpp/tests/client_test.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/tests/client_test.cpp?view=auto&rev=481159
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/tests/client_test.cpp (added)
+++ incubator/qpid/trunk/qpid/cpp/tests/client_test.cpp Thu Nov 30 21:11:45 2006
@@ -0,0 +1,99 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include <iostream>
+
+#include <QpidError.h>
+#include <ClientChannel.h>
+#include <Connection.h>
+#include <ClientMessage.h>
+#include <MessageListener.h>
+#include <sys/Monitor.h>
+#include <FieldTable.h>
+
+using namespace qpid::client;
+using namespace qpid::sys;
+using std::string;
+
+class SimpleListener : public virtual MessageListener{
+ Monitor* monitor;
+
+public:
+ inline SimpleListener(Monitor* _monitor) : monitor(_monitor){}
+
+ inline virtual void received(Message& /*msg*/){
+ std::cout << "Received message " /**<< msg **/<< std::endl;
+ monitor->notify();
+ }
+};
+
+int main(int argc, char**)
+{
+ try{
+ Connection con(argc > 1);
+ Channel channel;
+ Exchange exchange("MyExchange", Exchange::TOPIC_EXCHANGE);
+ Queue queue("MyQueue", true);
+
+ string host("localhost");
+
+ con.open(host);
+ std::cout << "Opened connection." << std::endl;
+ con.openChannel(&channel);
+ std::cout << "Opened channel." << std::endl;
+ channel.declareExchange(exchange);
+ std::cout << "Declared exchange." << std::endl;
+ channel.declareQueue(queue);
+ std::cout << "Declared queue." << std::endl;
+ qpid::framing::FieldTable args;
+ channel.bind(exchange, queue, "MyTopic", args);
+ std::cout << "Bound queue to exchange." << std::endl;
+
+ //set up a message listener
+ Monitor monitor;
+ SimpleListener listener(&monitor);
+ string tag("MyTag");
+ channel.consume(queue, tag, &listener);
+ channel.start();
+ std::cout << "Registered consumer." << std::endl;
+
+ Message msg;
+ string data("MyMessage");
+ msg.setData(data);
+ channel.publish(msg, exchange, "MyTopic");
+ std::cout << "Published message." << std::endl;
+
+ {
+ Monitor::ScopedLock l(monitor);
+ monitor.wait();
+ }
+
+ con.closeChannel(&channel);
+ std::cout << "Closed channel." << std::endl;
+ con.close();
+ std::cout << "Closed connection." << std::endl;
+ }catch(qpid::QpidError error){
+ std::cout << "Error [" << error.code << "] " << error.msg << " ("
+ << error.location.file << ":" << error.location.line
+ << ")" << std::endl;
+ return 1;
+ }
+ return 0;
+}
Propchange: incubator/qpid/trunk/qpid/cpp/tests/client_test.cpp
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: incubator/qpid/trunk/qpid/cpp/tests/client_test.cpp
------------------------------------------------------------------------------
svn:keywords = Rev Date
Added: incubator/qpid/trunk/qpid/cpp/tests/echo_service.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/tests/echo_service.cpp?view=auto&rev=481159
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/tests/echo_service.cpp (added)
+++ incubator/qpid/trunk/qpid/cpp/tests/echo_service.cpp Thu Nov 30 21:11:45 2006
@@ -0,0 +1,198 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include <QpidError.h>
+#include <ClientChannel.h>
+#include <Connection.h>
+#include <ClientExchange.h>
+#include <MessageListener.h>
+#include <ClientQueue.h>
+#include <sys/Time.h>
+#include <iostream>
+#include <sstream>
+
+using namespace qpid::client;
+using namespace qpid::sys;
+using std::string;
+
+class EchoServer : public MessageListener{
+ Channel* const channel;
+public:
+ EchoServer(Channel* channel);
+ virtual void received(Message& msg);
+};
+
+class LoggingListener : public MessageListener{
+public:
+ virtual void received(Message& msg);
+};
+
+class Args{
+ string host;
+ int port;
+ bool trace;
+ bool help;
+ bool client;
+public:
+ inline Args() : host("localhost"), port(5672), trace(false), help(false), client(false){}
+ void parse(int argc, char** argv);
+ void usage();
+
+ inline const string& getHost() const { return host;}
+ inline int getPort() const { return port; }
+ inline bool getTrace() const { return trace; }
+ inline bool getHelp() const { return help; }
+ inline bool getClient() const { return client; }
+};
+
+int main(int argc, char** argv){
+ const std::string echo_service("echo_service");
+ Args args;
+ args.parse(argc, argv);
+ if (args.getHelp()) {
+ args.usage();
+ } else if (args.getClient()) {
+ try {
+ //Create connection & open a channel
+ Connection connection(args.getTrace());
+ connection.open(args.getHost(), args.getPort());
+ Channel channel;
+ connection.openChannel(&channel);
+
+ //Setup: declare the private 'response' queue and bind it
+ //to the direct exchange by its name which will be
+ //generated by the server
+ Queue response;
+ channel.declareQueue(response);
+ qpid::framing::FieldTable emptyArgs;
+ channel.bind(Exchange::DEFAULT_DIRECT_EXCHANGE, response, response.getName(), emptyArgs);
+
+ //Consume from the response queue, logging all echoed message to console:
+ LoggingListener listener;
+ std::string tag;
+ channel.consume(response, tag, &listener);
+
+ //Process incoming requests on a new thread
+ channel.start();
+
+ //get messages from console and send them:
+ std::string text;
+ std::cout << "Enter text to send:" << std::endl;
+ while (std::getline(std::cin, text)) {
+ std::cout << "Sending " << text << " to echo server." << std::endl;
+ Message msg;
+ msg.getHeaders().setString("RESPONSE_QUEUE", response.getName());
+ msg.setData(text);
+ channel.publish(msg, Exchange::DEFAULT_DIRECT_EXCHANGE, echo_service);
+
+ std::cout << "Enter text to send:" << std::endl;
+ }
+
+ connection.close();
+ } catch(qpid::QpidError error) {
+ std::cout << error.what() << std::endl;
+ }
+ } else {
+ try {
+ //Create connection & open a channel
+ Connection connection(args.getTrace());
+ connection.open(args.getHost(), args.getPort());
+ Channel channel;
+ connection.openChannel(&channel);
+
+ //Setup: declare the 'request' queue and bind it to the direct exchange with a 'well known' name
+ Queue request("request");
+ channel.declareQueue(request);
+ qpid::framing::FieldTable emptyArgs;
+ channel.bind(Exchange::DEFAULT_DIRECT_EXCHANGE, request, echo_service, emptyArgs);
+
+ //Consume from the request queue, echoing back all messages received to the client that sent them
+ EchoServer server(&channel);
+ std::string tag = "server_tag";
+ channel.consume(request, tag, &server);
+
+ //Process incoming requests on the main thread
+ channel.run();
+
+ connection.close();
+ } catch(qpid::QpidError error) {
+ std::cout << error.what() << std::endl;
+ }
+ }
+}
+
+EchoServer::EchoServer(Channel* _channel) : channel(_channel){}
+
+void EchoServer::received(Message& message)
+{
+ //get name of response queues binding to the default direct exchange:
+ const std::string name = message.getHeaders().getString("RESPONSE_QUEUE");
+
+ if (name.empty()) {
+ std::cout << "Cannot echo " << message.getData() << ", no response queue specified." << std::endl;
+ } else {
+ //print message to console:
+ std::cout << "Echoing " << message.getData() << " back to " << name << std::endl;
+
+ //'echo' the message back:
+ channel->publish(message, Exchange::DEFAULT_DIRECT_EXCHANGE, name);
+ }
+}
+
+void LoggingListener::received(Message& message)
+{
+ //print message to console:
+ std::cout << "Received echo: " << message.getData() << std::endl;
+}
+
+
+void Args::parse(int argc, char** argv){
+ for(int i = 1; i < argc; i++){
+ string name(argv[i]);
+ if("-help" == name){
+ help = true;
+ break;
+ }else if("-host" == name){
+ host = argv[++i];
+ }else if("-port" == name){
+ port = atoi(argv[++i]);
+ }else if("-trace" == name){
+ trace = true;
+ }else if("-client" == name){
+ client = true;
+ }else{
+ std::cout << "Warning: unrecognised option " << name << std::endl;
+ }
+ }
+}
+
+void Args::usage(){
+ std::cout << "Options:" << std::endl;
+ std::cout << " -help" << std::endl;
+ std::cout << " Prints this usage message" << std::endl;
+ std::cout << " -host <host>" << std::endl;
+ std::cout << " Specifies host to connect to (default is localhost)" << std::endl;
+ std::cout << " -port <port>" << std::endl;
+ std::cout << " Specifies port to conect to (default is 5762)" << std::endl;
+ std::cout << " -trace" << std::endl;
+ std::cout << " Indicates that the frames sent and received should be logged" << std::endl;
+ std::cout << " -client" << std::endl;
+ std::cout << " Run as a client (else will run as a server)" << std::endl;
+}
Propchange: incubator/qpid/trunk/qpid/cpp/tests/echo_service.cpp
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: incubator/qpid/trunk/qpid/cpp/tests/echo_service.cpp
------------------------------------------------------------------------------
svn:keywords = Rev Date
Added: incubator/qpid/trunk/qpid/cpp/tests/env
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/tests/env?view=auto&rev=481159
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/tests/env (added)
+++ incubator/qpid/trunk/qpid/cpp/tests/env Thu Nov 30 21:11:45 2006
@@ -0,0 +1,23 @@
+#!/bin/bash
+# Set environment variables for test scripts.
+
+pathmunge () {
+ if ! echo $PATH | /bin/egrep -q "(^|:)$1($|:)" ; then
+ if [ "$2" = "after" ] ; then
+ PATH=$PATH:$1
+ else
+ PATH=$1:$PATH
+ fi
+ fi
+}
+
+if [ -z QPID_ROOT ] ; then echo "You must set QPID_ROOT" ; fi
+
+pathmunge $QPID_ROOT/cpp/test/bin
+pathmunge $QPID_ROOT/cpp/build/*/bin
+pathmunge $QPID_ROOT/cpp/build/*/test
+
+export QPID_HOME=${QPID_HOME:-$QPID_ROOT/java/build}
+pathmunge $QPID_HOME/bin
+
+
Propchange: incubator/qpid/trunk/qpid/cpp/tests/env
------------------------------------------------------------------------------
svn:executable = *
Added: incubator/qpid/trunk/qpid/cpp/tests/gen.mk
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/tests/gen.mk?view=auto&rev=481159
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/tests/gen.mk (added)
+++ incubator/qpid/trunk/qpid/cpp/tests/gen.mk Thu Nov 30 21:11:45 2006
@@ -0,0 +1,101 @@
+client_test_SOURCES = client_test.cpp
+client_test_LDADD = $(lib_client) $(lib_common) $(extra_libs)
+echo_service_SOURCES = echo_service.cpp
+echo_service_LDADD = $(lib_client) $(lib_common) $(extra_libs)
+topic_listener_SOURCES = topic_listener.cpp
+topic_listener_LDADD = $(lib_client) $(lib_common) $(extra_libs)
+topic_publisher_SOURCES = topic_publisher.cpp
+topic_publisher_LDADD = $(lib_client) $(lib_common) $(extra_libs)
+AccumulatedAckTest_la_SOURCES = AccumulatedAckTest.cpp
+AccumulatedAckTest_la_LIBADD = $(lib_common)
+AccumulatedAckTest_la_LIBADD += $(lib_broker) $(extra_libs)
+AccumulatedAckTest_la_LDFLAGS = -module -rpath $(abs_builddir)
+ChannelTest_la_SOURCES = ChannelTest.cpp
+ChannelTest_la_LIBADD = $(lib_common)
+ChannelTest_la_LIBADD += $(lib_broker) $(extra_libs)
+ChannelTest_la_LDFLAGS = -module -rpath $(abs_builddir)
+ConfigurationTest_la_SOURCES = ConfigurationTest.cpp
+ConfigurationTest_la_LIBADD = $(lib_common)
+ConfigurationTest_la_LIBADD += $(lib_broker) $(extra_libs)
+ConfigurationTest_la_LDFLAGS = -module -rpath $(abs_builddir)
+ExchangeTest_la_SOURCES = ExchangeTest.cpp
+ExchangeTest_la_LIBADD = $(lib_common)
+ExchangeTest_la_LIBADD += $(lib_broker) $(extra_libs)
+ExchangeTest_la_LDFLAGS = -module -rpath $(abs_builddir)
+HeadersExchangeTest_la_SOURCES = HeadersExchangeTest.cpp
+HeadersExchangeTest_la_LIBADD = $(lib_common)
+HeadersExchangeTest_la_LIBADD += $(lib_broker) $(extra_libs)
+HeadersExchangeTest_la_LDFLAGS = -module -rpath $(abs_builddir)
+InMemoryContentTest_la_SOURCES = InMemoryContentTest.cpp
+InMemoryContentTest_la_LIBADD = $(lib_common)
+InMemoryContentTest_la_LIBADD += $(lib_broker) $(extra_libs)
+InMemoryContentTest_la_LDFLAGS = -module -rpath $(abs_builddir)
+LazyLoadedContentTest_la_SOURCES = LazyLoadedContentTest.cpp
+LazyLoadedContentTest_la_LIBADD = $(lib_common)
+LazyLoadedContentTest_la_LIBADD += $(lib_broker) $(extra_libs)
+LazyLoadedContentTest_la_LDFLAGS = -module -rpath $(abs_builddir)
+MessageBuilderTest_la_SOURCES = MessageBuilderTest.cpp
+MessageBuilderTest_la_LIBADD = $(lib_common)
+MessageBuilderTest_la_LIBADD += $(lib_broker) $(extra_libs)
+MessageBuilderTest_la_LDFLAGS = -module -rpath $(abs_builddir)
+MessageTest_la_SOURCES = MessageTest.cpp
+MessageTest_la_LIBADD = $(lib_common)
+MessageTest_la_LIBADD += $(lib_broker) $(extra_libs)
+MessageTest_la_LDFLAGS = -module -rpath $(abs_builddir)
+QueueRegistryTest_la_SOURCES = QueueRegistryTest.cpp
+QueueRegistryTest_la_LIBADD = $(lib_common)
+QueueRegistryTest_la_LIBADD += $(lib_broker) $(extra_libs)
+QueueRegistryTest_la_LDFLAGS = -module -rpath $(abs_builddir)
+QueueTest_la_SOURCES = QueueTest.cpp
+QueueTest_la_LIBADD = $(lib_common)
+QueueTest_la_LIBADD += $(lib_broker) $(extra_libs)
+QueueTest_la_LDFLAGS = -module -rpath $(abs_builddir)
+TopicExchangeTest_la_SOURCES = TopicExchangeTest.cpp
+TopicExchangeTest_la_LIBADD = $(lib_common)
+TopicExchangeTest_la_LIBADD += $(lib_broker) $(extra_libs)
+TopicExchangeTest_la_LDFLAGS = -module -rpath $(abs_builddir)
+TxAckTest_la_SOURCES = TxAckTest.cpp
+TxAckTest_la_LIBADD = $(lib_common)
+TxAckTest_la_LIBADD += $(lib_broker) $(extra_libs)
+TxAckTest_la_LDFLAGS = -module -rpath $(abs_builddir)
+TxBufferTest_la_SOURCES = TxBufferTest.cpp
+TxBufferTest_la_LIBADD = $(lib_common)
+TxBufferTest_la_LIBADD += $(lib_broker) $(extra_libs)
+TxBufferTest_la_LDFLAGS = -module -rpath $(abs_builddir)
+TxPublishTest_la_SOURCES = TxPublishTest.cpp
+TxPublishTest_la_LIBADD = $(lib_common)
+TxPublishTest_la_LIBADD += $(lib_broker) $(extra_libs)
+TxPublishTest_la_LDFLAGS = -module -rpath $(abs_builddir)
+ValueTest_la_SOURCES = ValueTest.cpp
+ValueTest_la_LIBADD = $(lib_common)
+ValueTest_la_LIBADD += $(lib_broker) $(extra_libs)
+ValueTest_la_LDFLAGS = -module -rpath $(abs_builddir)
+BodyHandlerTest_la_SOURCES = BodyHandlerTest.cpp
+BodyHandlerTest_la_LIBADD = $(lib_common)
+BodyHandlerTest_la_LIBADD += $(lib_broker) $(extra_libs)
+BodyHandlerTest_la_LDFLAGS = -module -rpath $(abs_builddir)
+FieldTableTest_la_SOURCES = FieldTableTest.cpp
+FieldTableTest_la_LIBADD = $(lib_common)
+FieldTableTest_la_LIBADD += $(lib_broker) $(extra_libs)
+FieldTableTest_la_LDFLAGS = -module -rpath $(abs_builddir)
+FramingTest_la_SOURCES = FramingTest.cpp
+FramingTest_la_LIBADD = $(lib_common)
+FramingTest_la_LIBADD += $(lib_broker) $(extra_libs)
+FramingTest_la_LDFLAGS = -module -rpath $(abs_builddir)
+HeaderTest_la_SOURCES = HeaderTest.cpp
+HeaderTest_la_LIBADD = $(lib_common)
+HeaderTest_la_LIBADD += $(lib_broker) $(extra_libs)
+HeaderTest_la_LDFLAGS = -module -rpath $(abs_builddir)
+ExceptionTest_la_SOURCES = ExceptionTest.cpp
+ExceptionTest_la_LIBADD = $(lib_common)
+ExceptionTest_la_LIBADD += $(lib_broker) $(extra_libs)
+ExceptionTest_la_LDFLAGS = -module -rpath $(abs_builddir)
+EventChannelTest_la_SOURCES = EventChannelTest.cpp
+EventChannelTest_la_LIBADD = $(lib_common)
+EventChannelTest_la_LIBADD += $(lib_broker) $(extra_libs)
+EventChannelTest_la_LDFLAGS = -module -rpath $(abs_builddir)
+EventChannelThreadsTest_la_SOURCES = EventChannelThreadsTest.cpp
+EventChannelThreadsTest_la_LIBADD = $(lib_common)
+EventChannelThreadsTest_la_LIBADD += $(lib_broker) $(extra_libs)
+EventChannelThreadsTest_la_LDFLAGS = -module -rpath $(abs_builddir)
+check_LTLIBRARIES = AccumulatedAckTest.la ChannelTest.la ConfigurationTest.la ExchangeTest.la HeadersExchangeTest.la InMemoryContentTest.la LazyLoadedContentTest.la MessageBuilderTest.la MessageTest.la QueueRegistryTest.la QueueTest.la TopicExchangeTest.la TxAckTest.la TxBufferTest.la TxPublishTest.la ValueTest.la BodyHandlerTest.la FieldTableTest.la FramingTest.la HeaderTest.la ExceptionTest.la EventChannelTest.la EventChannelThreadsTest.la
Added: incubator/qpid/trunk/qpid/cpp/tests/qpid_test_plugin.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/tests/qpid_test_plugin.h?view=auto&rev=481159
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/tests/qpid_test_plugin.h (added)
+++ incubator/qpid/trunk/qpid/cpp/tests/qpid_test_plugin.h Thu Nov 30 21:11:45 2006
@@ -0,0 +1,43 @@
+#ifndef _qpid_test_plugin_
+#define _qpid_test_plugin_
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+/**
+ * Convenience to include cppunit headers needed by qpid test plugins and
+ * workaround for warning from superfluous main() declaration
+ * in cppunit/TestPlugIn.h
+ */
+
+#include <cppunit/TestCase.h>
+#include <cppunit/TextTestRunner.h>
+#include <cppunit/extensions/HelperMacros.h>
+#include <cppunit/plugin/TestPlugIn.h>
+
+// Redefine CPPUNIT_PLUGIN_IMPLEMENT_MAIN to a dummy typedef to avoid warnings.
+//
+#if defined(CPPUNIT_HAVE_UNIX_DLL_LOADER) || defined(CPPUNIT_HAVE_UNIX_SHL_LOADER)
+#undef CPPUNIT_PLUGIN_IMPLEMENT_MAIN
+#define CPPUNIT_PLUGIN_IMPLEMENT_MAIN() typedef char __CppUnitPlugInImplementMainDummyTypeDef
+#endif
+
+#endif /*!_qpid_test_plugin_*/
Propchange: incubator/qpid/trunk/qpid/cpp/tests/qpid_test_plugin.h
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: incubator/qpid/trunk/qpid/cpp/tests/qpid_test_plugin.h
------------------------------------------------------------------------------
svn:keywords = Rev Date
Added: incubator/qpid/trunk/qpid/cpp/tests/topic_listener.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/tests/topic_listener.cpp?view=auto&rev=481159
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/tests/topic_listener.cpp (added)
+++ incubator/qpid/trunk/qpid/cpp/tests/topic_listener.cpp Thu Nov 30 21:11:45 2006
@@ -0,0 +1,186 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include <QpidError.h>
+#include <ClientChannel.h>
+#include <Connection.h>
+#include <ClientExchange.h>
+#include <MessageListener.h>
+#include <ClientQueue.h>
+#include <sys/Time.h>
+#include <iostream>
+#include <sstream>
+
+using namespace qpid::client;
+using namespace qpid::sys;
+using std::string;
+
+class Listener : public MessageListener{
+ Channel* const channel;
+ const std::string responseQueue;
+ const bool transactional;
+ bool init;
+ int count;
+ Time start;
+
+ void shutdown();
+ void report();
+public:
+ Listener(Channel* channel, const std::string& reponseQueue, bool tx);
+ virtual void received(Message& msg);
+};
+
+class Args{
+ string host;
+ int port;
+ int ackMode;
+ bool transactional;
+ int prefetch;
+ bool trace;
+ bool help;
+public:
+ inline Args() : host("localhost"), port(5672), ackMode(NO_ACK), transactional(false), prefetch(1000), trace(false), help(false){}
+ void parse(int argc, char** argv);
+ void usage();
+
+ inline const string& getHost() const { return host;}
+ inline int getPort() const { return port; }
+ inline int getAckMode(){ return ackMode; }
+ inline bool getTransactional() const { return transactional; }
+ inline int getPrefetch(){ return prefetch; }
+ inline bool getTrace() const { return trace; }
+ inline bool getHelp() const { return help; }
+};
+
+int main(int argc, char** argv){
+ Args args;
+ args.parse(argc, argv);
+ if(args.getHelp()){
+ args.usage();
+ }else{
+ try{
+ Connection connection(args.getTrace());
+ connection.open(args.getHost(), args.getPort());
+ Channel channel(args.getTransactional(), args.getPrefetch());
+ connection.openChannel(&channel);
+
+ //declare exchange, queue and bind them:
+ Queue response("response");
+ channel.declareQueue(response);
+
+ Queue control;
+ channel.declareQueue(control);
+ qpid::framing::FieldTable bindArgs;
+ channel.bind(Exchange::DEFAULT_TOPIC_EXCHANGE, control, "topic_control", bindArgs);
+ //set up listener
+ Listener listener(&channel, response.getName(), args.getTransactional());
+ std::string tag;
+ channel.consume(control, tag, &listener, args.getAckMode());
+ channel.run();
+ connection.close();
+ }catch(qpid::QpidError error){
+ std::cout << error.what() << std::endl;
+ }
+ }
+}
+
+Listener::Listener(Channel* _channel, const std::string& _responseq, bool tx) :
+ channel(_channel), responseQueue(_responseq), transactional(tx), init(false), count(0){}
+
+void Listener::received(Message& message){
+ if(!init){
+ start = now();
+ count = 0;
+ init = true;
+ }
+ std::string type(message.getHeaders().getString("TYPE"));
+
+ if(type == "TERMINATION_REQUEST"){
+ shutdown();
+ }else if(type == "REPORT_REQUEST"){
+ //send a report:
+ report();
+ init = false;
+ }else if (++count % 100 == 0){
+ std::cout <<"Received " << count << " messages." << std::endl;
+ }
+}
+
+void Listener::shutdown(){
+ channel->close();
+}
+
+void Listener::report(){
+ Time finish = now();
+ Time time = finish - start;
+ std::stringstream reportstr;
+ reportstr << "Received " << count << " messages in "
+ << time/TIME_MSEC << " ms.";
+ Message msg;
+ msg.setData(reportstr.str());
+ channel->publish(msg, string(), responseQueue);
+ if(transactional){
+ channel->commit();
+ }
+}
+
+
+void Args::parse(int argc, char** argv){
+ for(int i = 1; i < argc; i++){
+ string name(argv[i]);
+ if("-help" == name){
+ help = true;
+ break;
+ }else if("-host" == name){
+ host = argv[++i];
+ }else if("-port" == name){
+ port = atoi(argv[++i]);
+ }else if("-ack_mode" == name){
+ ackMode = atoi(argv[++i]);
+ }else if("-transactional" == name){
+ transactional = true;
+ }else if("-prefetch" == name){
+ prefetch = atoi(argv[++i]);
+ }else if("-trace" == name){
+ trace = true;
+ }else{
+ std::cout << "Warning: unrecognised option " << name << std::endl;
+ }
+ }
+}
+
+void Args::usage(){
+ std::cout << "Options:" << std::endl;
+ std::cout << " -help" << std::endl;
+ std::cout << " Prints this usage message" << std::endl;
+ std::cout << " -host <host>" << std::endl;
+ std::cout << " Specifies host to connect to (default is localhost)" << std::endl;
+ std::cout << " -port <port>" << std::endl;
+ std::cout << " Specifies port to conect to (default is 5762)" << std::endl;
+ std::cout << " -ack_mode <mode>" << std::endl;
+ std::cout << " Sets the acknowledgement mode" << std::endl;
+ std::cout << " 0=NO_ACK (default), 1=AUTO_ACK, 2=LAZY_ACK" << std::endl;
+ std::cout << " -transactional" << std::endl;
+ std::cout << " Indicates the client should use transactions" << std::endl;
+ std::cout << " -prefetch <count>" << std::endl;
+ std::cout << " Specifies the prefetch count (default is 1000)" << std::endl;
+ std::cout << " -trace" << std::endl;
+ std::cout << " Indicates that the frames sent and received should be logged" << std::endl;
+}
Propchange: incubator/qpid/trunk/qpid/cpp/tests/topic_listener.cpp
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: incubator/qpid/trunk/qpid/cpp/tests/topic_listener.cpp
------------------------------------------------------------------------------
svn:keywords = Rev Date
Added: incubator/qpid/trunk/qpid/cpp/tests/topic_publisher.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/tests/topic_publisher.cpp?view=auto&rev=481159
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/tests/topic_publisher.cpp (added)
+++ incubator/qpid/trunk/qpid/cpp/tests/topic_publisher.cpp Thu Nov 30 21:11:45 2006
@@ -0,0 +1,258 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include <QpidError.h>
+#include <ClientChannel.h>
+#include <Connection.h>
+#include <ClientExchange.h>
+#include <MessageListener.h>
+#include <ClientQueue.h>
+#include <sys/Monitor.h>
+#include "unistd.h"
+#include <sys/Time.h>
+#include <cstdlib>
+#include <iostream>
+
+using namespace qpid::client;
+using namespace qpid::sys;
+using std::string;
+
+class Publisher : public MessageListener{
+ Channel* const channel;
+ const std::string controlTopic;
+ const bool transactional;
+ Monitor monitor;
+ int count;
+
+ void waitForCompletion(int msgs);
+ string generateData(int size);
+
+public:
+ Publisher(Channel* channel, const std::string& controlTopic, bool tx);
+ virtual void received(Message& msg);
+ int64_t publish(int msgs, int listeners, int size);
+ void terminate();
+};
+
+class Args{
+ string host;
+ int port;
+ int messages;
+ int subscribers;
+ int ackMode;
+ bool transactional;
+ int prefetch;
+ int batches;
+ int delay;
+ int size;
+ bool trace;
+ bool help;
+public:
+ inline Args() : host("localhost"), port(5672), messages(1000), subscribers(1),
+ ackMode(NO_ACK), transactional(false), prefetch(1000), batches(1),
+ delay(0), size(256), trace(false), help(false){}
+
+ void parse(int argc, char** argv);
+ void usage();
+
+ inline const string& getHost() const { return host;}
+ inline int getPort() const { return port; }
+ inline int getMessages() const { return messages; }
+ inline int getSubscribers() const { return subscribers; }
+ inline int getAckMode(){ return ackMode; }
+ inline bool getTransactional() const { return transactional; }
+ inline int getPrefetch(){ return prefetch; }
+ inline int getBatches(){ return batches; }
+ inline int getDelay(){ return delay; }
+ inline int getSize(){ return size; }
+ inline bool getTrace() const { return trace; }
+ inline bool getHelp() const { return help; }
+};
+
+int main(int argc, char** argv){
+ Args args;
+ args.parse(argc, argv);
+ if(args.getHelp()){
+ args.usage();
+ }else{
+ try{
+ Connection connection(args.getTrace());
+ connection.open(args.getHost(), args.getPort());
+ Channel channel(args.getTransactional(), args.getPrefetch());
+ connection.openChannel(&channel);
+
+ //declare queue (relying on default binding):
+ Queue response("response");
+ channel.declareQueue(response);
+
+ //set up listener
+ Publisher publisher(&channel, "topic_control", args.getTransactional());
+ std::string tag("mytag");
+ channel.consume(response, tag, &publisher, args.getAckMode());
+ channel.start();
+
+ int batchSize(args.getBatches());
+ int64_t max(0);
+ int64_t min(0);
+ int64_t sum(0);
+ for(int i = 0; i < batchSize; i++){
+ if(i > 0 && args.getDelay()) sleep(args.getDelay());
+ Time time = publisher.publish(
+ args.getMessages(), args.getSubscribers(), args.getSize());
+ if(!max || time > max) max = time;
+ if(!min || time < min) min = time;
+ sum += time;
+ std::cout << "Completed " << (i+1) << " of " << batchSize
+ << " in " << time/TIME_MSEC << "ms" << std::endl;
+ }
+ publisher.terminate();
+ int64_t avg = sum / batchSize;
+ if(batchSize > 1){
+ std::cout << batchSize << " batches completed. avg=" << avg <<
+ ", max=" << max << ", min=" << min << std::endl;
+ }
+ channel.close();
+ connection.close();
+ }catch(qpid::QpidError error){
+ std::cout << error.what() << std::endl;
+ }
+ }
+}
+
+Publisher::Publisher(Channel* _channel, const std::string& _controlTopic, bool tx) :
+ channel(_channel), controlTopic(_controlTopic), transactional(tx){}
+
+void Publisher::received(Message& ){
+ //count responses and when all are received end the current batch
+ Monitor::ScopedLock l(monitor);
+ if(--count == 0){
+ monitor.notify();
+ }
+}
+
+void Publisher::waitForCompletion(int msgs){
+ count = msgs;
+ monitor.wait();
+}
+
+int64_t Publisher::publish(int msgs, int listeners, int size){
+ Message msg;
+ msg.setData(generateData(size));
+ Time start = now();
+ {
+ Monitor::ScopedLock l(monitor);
+ for(int i = 0; i < msgs; i++){
+ channel->publish(msg, Exchange::DEFAULT_TOPIC_EXCHANGE, controlTopic);
+ }
+ //send report request
+ Message reportRequest;
+ reportRequest.getHeaders().setString("TYPE", "REPORT_REQUEST");
+ channel->publish(reportRequest, Exchange::DEFAULT_TOPIC_EXCHANGE, controlTopic);
+ if(transactional){
+ channel->commit();
+ }
+
+ waitForCompletion(listeners);
+ }
+
+ Time finish = now();
+ return finish - start;
+}
+
+string Publisher::generateData(int size){
+ string data;
+ for(int i = 0; i < size; i++){
+ data += ('A' + (i / 26));
+ }
+ return data;
+}
+
+void Publisher::terminate(){
+ //send termination request
+ Message terminationRequest;
+ terminationRequest.getHeaders().setString("TYPE", "TERMINATION_REQUEST");
+ channel->publish(terminationRequest, Exchange::DEFAULT_TOPIC_EXCHANGE, controlTopic);
+ if(transactional){
+ channel->commit();
+ }
+}
+
+void Args::parse(int argc, char** argv){
+ for(int i = 1; i < argc; i++){
+ string name(argv[i]);
+ if("-help" == name){
+ help = true;
+ break;
+ }else if("-host" == name){
+ host = argv[++i];
+ }else if("-port" == name){
+ port = atoi(argv[++i]);
+ }else if("-messages" == name){
+ messages = atoi(argv[++i]);
+ }else if("-subscribers" == name){
+ subscribers = atoi(argv[++i]);
+ }else if("-ack_mode" == name){
+ ackMode = atoi(argv[++i]);
+ }else if("-transactional" == name){
+ transactional = true;
+ }else if("-prefetch" == name){
+ prefetch = atoi(argv[++i]);
+ }else if("-batches" == name){
+ batches = atoi(argv[++i]);
+ }else if("-delay" == name){
+ delay = atoi(argv[++i]);
+ }else if("-size" == name){
+ size = atoi(argv[++i]);
+ }else if("-trace" == name){
+ trace = true;
+ }else{
+ std::cout << "Warning: unrecognised option " << name << std::endl;
+ }
+ }
+}
+
+void Args::usage(){
+ std::cout << "Options:" << std::endl;
+ std::cout << " -help" << std::endl;
+ std::cout << " Prints this usage message" << std::endl;
+ std::cout << " -host <host>" << std::endl;
+ std::cout << " Specifies host to connect to (default is localhost)" << std::endl;
+ std::cout << " -port <port>" << std::endl;
+ std::cout << " Specifies port to conect to (default is 5762)" << std::endl;
+ std::cout << " -messages <count>" << std::endl;
+ std::cout << " Specifies how many messages to send" << std::endl;
+ std::cout << " -subscribers <count>" << std::endl;
+ std::cout << " Specifies how many subscribers to expect reports from" << std::endl;
+ std::cout << " -ack_mode <mode>" << std::endl;
+ std::cout << " Sets the acknowledgement mode" << std::endl;
+ std::cout << " 0=NO_ACK (default), 1=AUTO_ACK, 2=LAZY_ACK" << std::endl;
+ std::cout << " -transactional" << std::endl;
+ std::cout << " Indicates the client should use transactions" << std::endl;
+ std::cout << " -prefetch <count>" << std::endl;
+ std::cout << " Specifies the prefetch count (default is 1000)" << std::endl;
+ std::cout << " -batches <count>" << std::endl;
+ std::cout << " Specifies how many batches to run" << std::endl;
+ std::cout << " -delay <seconds>" << std::endl;
+ std::cout << " Causes a delay between each batch" << std::endl;
+ std::cout << " -size <bytes>" << std::endl;
+ std::cout << " Sets the size of the published messages (default is 256 bytes)" << std::endl;
+ std::cout << " -trace" << std::endl;
+ std::cout << " Indicates that the frames sent and received should be logged" << std::endl;
+}
Propchange: incubator/qpid/trunk/qpid/cpp/tests/topic_publisher.cpp
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: incubator/qpid/trunk/qpid/cpp/tests/topic_publisher.cpp
------------------------------------------------------------------------------
svn:keywords = Rev Date
Added: incubator/qpid/trunk/qpid/cpp/tests/topicall
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/tests/topicall?view=auto&rev=481159
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/tests/topicall (added)
+++ incubator/qpid/trunk/qpid/cpp/tests/topicall Thu Nov 30 21:11:45 2006
@@ -0,0 +1,25 @@
+#!/bin/sh
+# Do 3 runs of topictests for C++ and Java brokers with reduced output.
+
+. `dirname $0`/env
+
+# Run a short topictest to warm up the broker and iron out startup effects.
+flush() {
+ topic_listener >/dev/null 2>&1 &
+ topic_publisher >/dev/null 2>&1
+}
+
+echo Java broker
+broker j ; flush
+topictest c | tail -n1
+topictest c | tail -n1
+topictest c | tail -n1
+
+echo C++ broker
+broker c ; flush
+topictest c | tail -n1
+topictest c | tail -n1
+topictest c | tail -n1
+
+# Don't bother with java clients we know they're slower.
+
Propchange: incubator/qpid/trunk/qpid/cpp/tests/topicall
------------------------------------------------------------------------------
svn:executable = *
Added: incubator/qpid/trunk/qpid/cpp/tests/topictest
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/tests/topictest?view=auto&rev=481159
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/tests/topictest (added)
+++ incubator/qpid/trunk/qpid/cpp/tests/topictest Thu Nov 30 21:11:45 2006
@@ -0,0 +1,42 @@
+#!/bin/bash
+# Run the c++ or java topic test
+
+. `dirname $0`/env
+
+# Edit parameters here:
+
+# Big test:
+# LISTENERS=10
+# MESSAGES=10000
+# BATCHES=20
+
+LISTENERS=10
+MESSAGES=2000
+BATCHES=10
+
+cppcmds() {
+ LISTEN_CMD=topic_listener
+ PUBLISH_CMD="topic_publisher -messages $MESSAGES -batches $BATCHES -subscribers $LISTENERS"
+}
+
+javacmds() {
+ DEF=-Damqj.logging.level="error"
+ LISTEN_CMD="qpid-run $DEF org.apache.qpid.topic.Listener"
+ PUBLISH_CMD="qpid-run $DEF org.apache.qpid.topic.Publisher -messages $MESSAGES -batch $BATCHES -clients $LISTENERS"
+}
+
+case $1 in
+ c) cppcmds ;;
+ j) javacmds ;;
+ *) cppcmds ;;
+esac
+
+for ((i=$LISTENERS ; i--; )); do
+ $LISTEN_CMD > /dev/null 2>&1 &
+done
+sleep 1
+echo $PUBLISH_CMD $OPTIONS
+
+STATS=~/bin/topictest.times
+echo "---- topictest `date`" >> $STATS
+$PUBLISH_CMD $OPTIONS | tee -a $STATS
Propchange: incubator/qpid/trunk/qpid/cpp/tests/topictest
------------------------------------------------------------------------------
svn:executable = *