You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ac...@apache.org on 2006/12/01 06:11:53 UTC

svn commit: r481159 [12/12] - in /incubator/qpid/trunk/qpid/cpp: ./ build-aux/ gen/ lib/ lib/broker/ lib/client/ lib/common/ lib/common/framing/ lib/common/sys/ lib/common/sys/apr/ lib/common/sys/posix/ m4/ src/ src/qpid/ src/qpid/apr/ src/qpid/broker/...

Added: incubator/qpid/trunk/qpid/cpp/tests/MessageTest.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/tests/MessageTest.cpp?view=auto&rev=481159
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/tests/MessageTest.cpp (added)
+++ incubator/qpid/trunk/qpid/cpp/tests/MessageTest.cpp Thu Nov 30 21:11:45 2006
@@ -0,0 +1,90 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include <BrokerMessage.h>
+#include <qpid_test_plugin.h>
+#include <iostream>
+
+using namespace boost;
+using namespace qpid::broker;
+using namespace qpid::framing;
+
+struct DummyHandler : OutputHandler{
+    std::vector<AMQFrame*> frames; 
+
+    virtual void send(AMQFrame* frame){
+        frames.push_back(frame);
+    }
+};
+
+class MessageTest : public CppUnit::TestCase  
+{
+    CPPUNIT_TEST_SUITE(MessageTest);
+    CPPUNIT_TEST(testEncodeDecode);
+    CPPUNIT_TEST_SUITE_END();
+
+  public:
+
+    void testEncodeDecode()
+    {
+        string exchange = "MyExchange";
+        string routingKey = "MyRoutingKey";
+        string messageId = "MyMessage";
+        string data1("abcdefg");
+        string data2("hijklmn");
+
+        Message::shared_ptr msg = Message::shared_ptr(new Message(0, exchange, routingKey, false, false));
+        AMQHeaderBody::shared_ptr header(new AMQHeaderBody(BASIC));
+        header->setContentSize(14);        
+        AMQContentBody::shared_ptr part1(new AMQContentBody(data1));
+        AMQContentBody::shared_ptr part2(new AMQContentBody(data2));        
+        msg->setHeader(header);
+        msg->addContent(part1);
+        msg->addContent(part2);
+
+        msg->getHeaderProperties()->setMessageId(messageId);
+        msg->getHeaderProperties()->setDeliveryMode(PERSISTENT);
+        msg->getHeaderProperties()->getHeaders().setString("abc", "xyz");
+
+        Buffer buffer(msg->encodedSize());
+        msg->encode(buffer);
+        buffer.flip();
+        
+        msg = Message::shared_ptr(new Message(buffer));
+        CPPUNIT_ASSERT_EQUAL(exchange, msg->getExchange());
+        CPPUNIT_ASSERT_EQUAL(routingKey, msg->getRoutingKey());
+        CPPUNIT_ASSERT_EQUAL(messageId, msg->getHeaderProperties()->getMessageId());
+        CPPUNIT_ASSERT_EQUAL((u_int8_t) PERSISTENT, msg->getHeaderProperties()->getDeliveryMode());
+        CPPUNIT_ASSERT_EQUAL(string("xyz"), msg->getHeaderProperties()->getHeaders().getString("abc"));
+        CPPUNIT_ASSERT_EQUAL((u_int64_t) 14, msg->contentSize());
+
+        DummyHandler handler;
+        msg->deliver(&handler, 0, "ignore", 0, 100); 
+        CPPUNIT_ASSERT_EQUAL((size_t) 3, handler.frames.size());
+        AMQContentBody::shared_ptr contentBody(dynamic_pointer_cast<AMQContentBody, AMQBody>(handler.frames[2]->getBody()));
+        CPPUNIT_ASSERT(contentBody);
+        CPPUNIT_ASSERT_EQUAL(data1 + data2, contentBody->getData());
+    }
+};
+
+// Make this test suite a plugin.
+CPPUNIT_PLUGIN_IMPLEMENT();
+CPPUNIT_TEST_SUITE_REGISTRATION(MessageTest);
+

Propchange: incubator/qpid/trunk/qpid/cpp/tests/MessageTest.cpp
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/qpid/trunk/qpid/cpp/tests/MessageTest.cpp
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Added: incubator/qpid/trunk/qpid/cpp/tests/QueueRegistryTest.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/tests/QueueRegistryTest.cpp?view=auto&rev=481159
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/tests/QueueRegistryTest.cpp (added)
+++ incubator/qpid/trunk/qpid/cpp/tests/QueueRegistryTest.cpp Thu Nov 30 21:11:45 2006
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include <QueueRegistry.h>
+#include <qpid_test_plugin.h>
+#include <string>
+
+using namespace qpid::broker;
+
+class QueueRegistryTest : public CppUnit::TestCase 
+{
+    CPPUNIT_TEST_SUITE(QueueRegistryTest);
+    CPPUNIT_TEST(testDeclare);
+    CPPUNIT_TEST(testDeclareTmp);
+    CPPUNIT_TEST(testFind);
+    CPPUNIT_TEST(testDestroy);
+    CPPUNIT_TEST_SUITE_END();
+
+  private:
+    std::string foo, bar;
+    QueueRegistry reg;
+    std::pair<Queue::shared_ptr,  bool> qc;
+    
+  public:
+    void setUp() {
+        foo = "foo";
+        bar = "bar";
+    }
+    
+    void testDeclare() {
+        qc = reg.declare(foo, false, 0, 0);
+        Queue::shared_ptr q = qc.first;
+        CPPUNIT_ASSERT(q);
+        CPPUNIT_ASSERT(qc.second); // New queue
+        CPPUNIT_ASSERT_EQUAL(foo, q->getName());
+
+        qc = reg.declare(foo, false, 0, 0);
+        CPPUNIT_ASSERT_EQUAL(q, qc.first);
+        CPPUNIT_ASSERT(!qc.second);
+
+        qc = reg.declare(bar, false, 0, 0);
+        q = qc.first;
+        CPPUNIT_ASSERT(q);
+        CPPUNIT_ASSERT_EQUAL(true, qc.second);
+        CPPUNIT_ASSERT_EQUAL(bar, q->getName());
+    }
+
+    void testDeclareTmp() 
+    {
+        qc = reg.declare(std::string(), false, 0, 0);
+        CPPUNIT_ASSERT(qc.second);
+        CPPUNIT_ASSERT_EQUAL(std::string("tmp_1"), qc.first->getName());
+    }
+    
+    void testFind() {
+        CPPUNIT_ASSERT(reg.find(foo) == 0);
+
+        reg.declare(foo, false, 0, 0);
+        reg.declare(bar, false, 0, 0);
+        Queue::shared_ptr q = reg.find(bar);
+        CPPUNIT_ASSERT(q);
+        CPPUNIT_ASSERT_EQUAL(bar, q->getName());
+    }
+
+    void testDestroy() {
+        qc = reg.declare(foo, false, 0, 0);
+        reg.destroy(foo);
+        // Queue is gone from the registry.
+        CPPUNIT_ASSERT(reg.find(foo) == 0);
+        // Queue is not actually destroyed till we drop our reference.
+        CPPUNIT_ASSERT_EQUAL(foo, qc.first->getName());
+        // We shoud be the only reference.
+        CPPUNIT_ASSERT_EQUAL(1L, qc.first.use_count());
+    }
+};
+
+// Make this test suite a plugin.
+CPPUNIT_PLUGIN_IMPLEMENT();
+CPPUNIT_TEST_SUITE_REGISTRATION(QueueRegistryTest);

Propchange: incubator/qpid/trunk/qpid/cpp/tests/QueueRegistryTest.cpp
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/qpid/trunk/qpid/cpp/tests/QueueRegistryTest.cpp
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Added: incubator/qpid/trunk/qpid/cpp/tests/QueueTest.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/tests/QueueTest.cpp?view=auto&rev=481159
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/tests/QueueTest.cpp (added)
+++ incubator/qpid/trunk/qpid/cpp/tests/QueueTest.cpp Thu Nov 30 21:11:45 2006
@@ -0,0 +1,179 @@
+ /*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include <BrokerQueue.h>
+#include <QueueRegistry.h>
+#include <qpid_test_plugin.h>
+#include <iostream>
+
+using namespace qpid::broker;
+using namespace qpid::sys;
+
+
+class TestBinding : public virtual Binding{
+    bool cancelled;
+
+public:
+    TestBinding();
+    virtual void cancel();
+    bool isCancelled();
+};
+
+class TestConsumer : public virtual Consumer{
+public:
+    Message::shared_ptr last;
+
+    virtual bool deliver(Message::shared_ptr& msg);
+};
+
+
+class QueueTest : public CppUnit::TestCase  
+{
+    CPPUNIT_TEST_SUITE(QueueTest);
+    CPPUNIT_TEST(testConsumers);
+    CPPUNIT_TEST(testBinding);
+    CPPUNIT_TEST(testRegistry);
+    CPPUNIT_TEST(testDequeue);
+    CPPUNIT_TEST_SUITE_END();
+
+  public:
+    void testConsumers(){
+        Queue::shared_ptr queue(new Queue("my_queue", true));
+    
+        //Test adding consumers:
+        TestConsumer c1; 
+        TestConsumer c2; 
+        queue->consume(&c1);
+        queue->consume(&c2);
+
+        CPPUNIT_ASSERT_EQUAL(u_int32_t(2), queue->getConsumerCount());
+        
+        //Test basic delivery:
+        Message::shared_ptr msg1 = Message::shared_ptr(new Message(0, "e", "A", true, true));
+        Message::shared_ptr msg2 = Message::shared_ptr(new Message(0, "e", "B", true, true));
+        Message::shared_ptr msg3 = Message::shared_ptr(new Message(0, "e", "C", true, true));
+
+        queue->deliver(msg1);
+        CPPUNIT_ASSERT_EQUAL(msg1.get(), c1.last.get());
+
+        queue->deliver(msg2);
+        CPPUNIT_ASSERT_EQUAL(msg2.get(), c2.last.get());
+        
+        queue->deliver(msg3);
+        CPPUNIT_ASSERT_EQUAL(msg3.get(), c1.last.get());        
+    
+        //Test cancellation:
+        queue->cancel(&c1);
+        CPPUNIT_ASSERT_EQUAL(u_int32_t(1), queue->getConsumerCount());
+        queue->cancel(&c2);
+        CPPUNIT_ASSERT_EQUAL(u_int32_t(0), queue->getConsumerCount());
+    }
+
+    void testBinding(){
+        Queue::shared_ptr queue(new Queue("my_queue", true));
+        //Test bindings:
+        TestBinding a;
+        TestBinding b;
+        queue->bound(&a);
+        queue->bound(&b);    
+    
+        queue.reset();
+
+        CPPUNIT_ASSERT(a.isCancelled());
+        CPPUNIT_ASSERT(b.isCancelled());
+    }
+
+    void testRegistry(){
+        //Test use of queues in registry:
+        QueueRegistry registry;
+        registry.declare("queue1", true, true);
+        registry.declare("queue2", true, true);
+        registry.declare("queue3", true, true);
+
+        CPPUNIT_ASSERT(registry.find("queue1"));
+        CPPUNIT_ASSERT(registry.find("queue2"));
+        CPPUNIT_ASSERT(registry.find("queue3"));
+        
+        registry.destroy("queue1");
+        registry.destroy("queue2");
+        registry.destroy("queue3");
+
+        CPPUNIT_ASSERT(!registry.find("queue1"));
+        CPPUNIT_ASSERT(!registry.find("queue2"));
+        CPPUNIT_ASSERT(!registry.find("queue3"));
+    }
+
+    void testDequeue(){
+        Queue::shared_ptr queue(new Queue("my_queue", true));
+
+        Message::shared_ptr msg1 = Message::shared_ptr(new Message(0, "e", "A", true, true));
+        Message::shared_ptr msg2 = Message::shared_ptr(new Message(0, "e", "B", true, true));
+        Message::shared_ptr msg3 = Message::shared_ptr(new Message(0, "e", "C", true, true));
+        Message::shared_ptr received;
+
+        queue->deliver(msg1);
+        queue->deliver(msg2);
+        queue->deliver(msg3);
+
+        CPPUNIT_ASSERT_EQUAL(u_int32_t(3), queue->getMessageCount());
+        
+        received = queue->dequeue();
+        CPPUNIT_ASSERT_EQUAL(msg1.get(), received.get());
+        CPPUNIT_ASSERT_EQUAL(u_int32_t(2), queue->getMessageCount());
+
+        received = queue->dequeue();
+        CPPUNIT_ASSERT_EQUAL(msg2.get(), received.get());
+        CPPUNIT_ASSERT_EQUAL(u_int32_t(1), queue->getMessageCount());
+
+        TestConsumer consumer; 
+        queue->consume(&consumer);
+        queue->dispatch();
+        CPPUNIT_ASSERT_EQUAL(msg3.get(), consumer.last.get());
+        CPPUNIT_ASSERT_EQUAL(u_int32_t(0), queue->getMessageCount());
+
+        received = queue->dequeue();
+        CPPUNIT_ASSERT(!received);
+        CPPUNIT_ASSERT_EQUAL(u_int32_t(0), queue->getMessageCount());
+        
+    }
+};
+
+// Make this test suite a plugin.
+CPPUNIT_PLUGIN_IMPLEMENT();
+CPPUNIT_TEST_SUITE_REGISTRATION(QueueTest);
+
+//TestBinding
+TestBinding::TestBinding() : cancelled(false) {}
+
+void TestBinding::cancel(){
+    CPPUNIT_ASSERT(!cancelled);
+    cancelled = true;
+}
+
+bool TestBinding::isCancelled(){
+    return cancelled;
+}
+
+//TestConsumer
+bool TestConsumer::deliver(Message::shared_ptr& msg){
+    last = msg;
+    return true;
+}
+

Propchange: incubator/qpid/trunk/qpid/cpp/tests/QueueTest.cpp
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/qpid/trunk/qpid/cpp/tests/QueueTest.cpp
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Added: incubator/qpid/trunk/qpid/cpp/tests/TopicExchangeTest.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/tests/TopicExchangeTest.cpp?view=auto&rev=481159
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/tests/TopicExchangeTest.cpp (added)
+++ incubator/qpid/trunk/qpid/cpp/tests/TopicExchangeTest.cpp Thu Nov 30 21:11:45 2006
@@ -0,0 +1,200 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+#include <TopicExchange.h>
+#include <qpid_test_plugin.h>
+
+using namespace qpid::broker;
+
+Tokens makeTokens(char** begin, char** end)
+{
+    Tokens t;
+    t.insert(t.end(), begin, end);
+    return t;
+}
+
+// Calculate size of an array. 
+#define LEN(a) (sizeof(a)/sizeof(a[0]))
+
+// Convert array to token vector
+#define TOKENS(a) makeTokens(a, a + LEN(a))
+
+// Allow CPPUNIT_EQUALS to print a Tokens.
+CppUnit::OStringStream& operator <<(CppUnit::OStringStream& out, const Tokens& v)
+{
+    out << "[ ";
+    for (Tokens::const_iterator i = v.begin();
+         i != v.end(); ++i)
+    {
+        out << '"' << *i << '"' << (i+1 == v.end() ? "]" : ", ");
+    }
+    return out;
+}
+
+
+class TokensTest : public CppUnit::TestCase
+{
+    CPPUNIT_TEST_SUITE(TokensTest);
+    CPPUNIT_TEST(testTokens);
+    CPPUNIT_TEST_SUITE_END();
+
+  public:
+    void testTokens() 
+    {
+        Tokens tokens("hello.world");
+        char* expect[] = {"hello", "world"};
+        CPPUNIT_ASSERT_EQUAL(TOKENS(expect), tokens);
+        
+        tokens = "a.b.c";
+        char* expect2[] = { "a", "b", "c" };
+        CPPUNIT_ASSERT_EQUAL(TOKENS(expect2), tokens);
+
+        tokens = "";
+        CPPUNIT_ASSERT(tokens.empty());
+
+        tokens = "x";
+        char* expect3[] = { "x" };
+        CPPUNIT_ASSERT_EQUAL(TOKENS(expect3), tokens);
+
+        tokens = (".x");
+        char* expect4[] = { "", "x" };
+        CPPUNIT_ASSERT_EQUAL(TOKENS(expect4), tokens);
+
+        tokens = ("x.");
+        char* expect5[] = { "x", "" };
+        CPPUNIT_ASSERT_EQUAL(TOKENS(expect5), tokens);
+
+        tokens = (".");
+        char* expect6[] = { "", "" };
+        CPPUNIT_ASSERT_EQUAL(TOKENS(expect6), tokens);        
+
+        tokens = ("..");
+        char* expect7[] = { "", "", "" };
+        CPPUNIT_ASSERT_EQUAL(TOKENS(expect7), tokens);        
+    }
+    
+};
+
+#define ASSERT_NORMALIZED(expect, pattern) \
+    CPPUNIT_ASSERT_EQUAL(Tokens(expect), static_cast<Tokens>(TopicPattern(pattern)))
+class TopicPatternTest : public CppUnit::TestCase 
+{
+    CPPUNIT_TEST_SUITE(TopicPatternTest);
+    CPPUNIT_TEST(testNormalize);
+    CPPUNIT_TEST(testPlain);
+    CPPUNIT_TEST(testStar);
+    CPPUNIT_TEST(testHash);
+    CPPUNIT_TEST(testMixed);
+    CPPUNIT_TEST(testCombo);
+    CPPUNIT_TEST_SUITE_END();
+
+  public:
+
+    void testNormalize() 
+    {
+        CPPUNIT_ASSERT(TopicPattern("").empty());
+        ASSERT_NORMALIZED("a.b.c", "a.b.c");
+        ASSERT_NORMALIZED("a.*.c", "a.*.c");
+        ASSERT_NORMALIZED("#", "#");
+        ASSERT_NORMALIZED("#", "#.#.#.#");
+        ASSERT_NORMALIZED("*.*.*.#", "#.*.#.*.#.#.*");
+        ASSERT_NORMALIZED("a.*.*.*.#", "a.*.#.*.#.*.#");
+        ASSERT_NORMALIZED("a.*.*.*.#", "a.*.#.*.#.*");
+    }
+    
+    void testPlain() {
+        TopicPattern p("ab.cd.e");
+        CPPUNIT_ASSERT(p.match("ab.cd.e"));
+        CPPUNIT_ASSERT(!p.match("abx.cd.e"));
+        CPPUNIT_ASSERT(!p.match("ab.cd"));
+        CPPUNIT_ASSERT(!p.match("ab.cd..e."));
+        CPPUNIT_ASSERT(!p.match("ab.cd.e."));
+        CPPUNIT_ASSERT(!p.match(".ab.cd.e"));
+
+        p = "";
+        CPPUNIT_ASSERT(p.match(""));
+
+        p = ".";
+        CPPUNIT_ASSERT(p.match("."));
+    }
+
+
+    void testStar() 
+    {
+        TopicPattern p("a.*.b");
+        CPPUNIT_ASSERT(p.match("a.xx.b"));
+        CPPUNIT_ASSERT(!p.match("a.b"));
+
+        p = "*.x";
+        CPPUNIT_ASSERT(p.match("y.x"));
+        CPPUNIT_ASSERT(p.match(".x"));
+        CPPUNIT_ASSERT(!p.match("x"));
+
+        p = "x.x.*";
+        CPPUNIT_ASSERT(p.match("x.x.y"));
+        CPPUNIT_ASSERT(p.match("x.x."));
+        CPPUNIT_ASSERT(!p.match("x.x"));
+        CPPUNIT_ASSERT(!p.match("q.x.y"));
+    }
+
+    void testHash() 
+    {
+        TopicPattern p("a.#.b");
+        CPPUNIT_ASSERT(p.match("a.b"));
+        CPPUNIT_ASSERT(p.match("a.x.b"));
+        CPPUNIT_ASSERT(p.match("a..x.y.zz.b"));
+        CPPUNIT_ASSERT(!p.match("a.b."));
+        CPPUNIT_ASSERT(!p.match("q.x.b"));
+
+        p = "a.#";
+        CPPUNIT_ASSERT(p.match("a"));
+        CPPUNIT_ASSERT(p.match("a.b"));
+        CPPUNIT_ASSERT(p.match("a.b.c"));
+
+        p = "#.a";
+        CPPUNIT_ASSERT(p.match("a"));
+        CPPUNIT_ASSERT(p.match("x.y.a"));
+    }
+
+    void testMixed() 
+    {
+        TopicPattern p("*.x.#.y");
+        CPPUNIT_ASSERT(p.match("a.x.y"));
+        CPPUNIT_ASSERT(p.match("a.x.p.qq.y"));
+        CPPUNIT_ASSERT(!p.match("a.a.x.y"));
+        CPPUNIT_ASSERT(!p.match("aa.x.b.c"));
+
+        p = "a.#.b.*";
+        CPPUNIT_ASSERT(p.match("a.b.x"));
+        CPPUNIT_ASSERT(p.match("a.x.x.x.b.x"));
+    }
+
+    void testCombo() {
+        TopicPattern p("*.#.#.*.*.#");
+        CPPUNIT_ASSERT(p.match("x.y.z"));
+        CPPUNIT_ASSERT(p.match("x.y.z.a.b.c"));
+        CPPUNIT_ASSERT(!p.match("x.y"));
+        CPPUNIT_ASSERT(!p.match("x"));
+    }
+};
+
+    
+// Make this test suite a plugin.
+CPPUNIT_PLUGIN_IMPLEMENT();
+CPPUNIT_TEST_SUITE_REGISTRATION(TopicPatternTest);
+CPPUNIT_TEST_SUITE_REGISTRATION(TokensTest);

Propchange: incubator/qpid/trunk/qpid/cpp/tests/TopicExchangeTest.cpp
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/qpid/trunk/qpid/cpp/tests/TopicExchangeTest.cpp
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Added: incubator/qpid/trunk/qpid/cpp/tests/TxAckTest.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/tests/TxAckTest.cpp?view=auto&rev=481159
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/tests/TxAckTest.cpp (added)
+++ incubator/qpid/trunk/qpid/cpp/tests/TxAckTest.cpp Thu Nov 30 21:11:45 2006
@@ -0,0 +1,113 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include <NullMessageStore.h>
+#include <RecoveryManager.h>
+#include <TxAck.h>
+#include <qpid_test_plugin.h>
+#include <iostream>
+#include <list>
+#include <vector>
+
+using std::list;
+using std::vector;
+using namespace qpid::broker;
+using namespace qpid::framing;
+
+class TxAckTest : public CppUnit::TestCase  
+{
+
+    class TestMessageStore : public NullMessageStore
+    {
+    public:
+        vector<Message::shared_ptr> dequeued;
+
+        void dequeue(TransactionContext*, Message::shared_ptr& msg, const Queue& /*queue*/, const string * const /*xid*/)
+        {
+            dequeued.push_back(msg);
+        }
+
+        TestMessageStore() : NullMessageStore(false) {}
+        ~TestMessageStore(){}
+    };
+
+    CPPUNIT_TEST_SUITE(TxAckTest);
+    CPPUNIT_TEST(testPrepare);
+    CPPUNIT_TEST(testCommit);
+    CPPUNIT_TEST_SUITE_END();
+
+
+    AccumulatedAck acked;
+    TestMessageStore store;
+    Queue::shared_ptr queue;
+    vector<Message::shared_ptr> messages;
+    list<DeliveryRecord> deliveries;
+    TxAck op;
+
+
+public:
+
+    TxAckTest() : queue(new Queue("my_queue", false, &store, 0)), op(acked, deliveries)
+    {
+        for(int i = 0; i < 10; i++){
+            Message::shared_ptr msg(new Message(0, "exchange", "routing_key", false, false));
+            msg->setHeader(AMQHeaderBody::shared_ptr(new AMQHeaderBody(BASIC)));
+            msg->getHeaderProperties()->setDeliveryMode(PERSISTENT);
+            messages.push_back(msg);
+            deliveries.push_back(DeliveryRecord(msg, queue, "xyz", (i+1)));
+        }
+
+        //assume msgs 1-5, 7 and 9 are all acked (i.e. 6, 8 & 10 are not)
+        acked.range = 5;
+        acked.individual.push_back(7);
+        acked.individual.push_back(9);
+    }      
+
+    void testPrepare()
+    {
+        //ensure acked messages are discarded, i.e. dequeued from store
+        op.prepare(0);
+        CPPUNIT_ASSERT_EQUAL((size_t) 7, store.dequeued.size());
+        CPPUNIT_ASSERT_EQUAL((size_t) 10, deliveries.size());
+        CPPUNIT_ASSERT_EQUAL(messages[0], store.dequeued[0]);//msg 1
+        CPPUNIT_ASSERT_EQUAL(messages[1], store.dequeued[1]);//msg 2
+        CPPUNIT_ASSERT_EQUAL(messages[2], store.dequeued[2]);//msg 3
+        CPPUNIT_ASSERT_EQUAL(messages[3], store.dequeued[3]);//msg 4
+        CPPUNIT_ASSERT_EQUAL(messages[4], store.dequeued[4]);//msg 5
+        CPPUNIT_ASSERT_EQUAL(messages[6], store.dequeued[5]);//msg 7
+        CPPUNIT_ASSERT_EQUAL(messages[8], store.dequeued[6]);//msg 9
+    }
+
+    void testCommit()
+    {
+        //emsure acked messages are removed from list
+        op.commit();
+        CPPUNIT_ASSERT_EQUAL((size_t) 3, deliveries.size());
+        list<DeliveryRecord>::iterator i = deliveries.begin();
+        CPPUNIT_ASSERT(i->matches(6));//msg 6
+        CPPUNIT_ASSERT((++i)->matches(8));//msg 8
+        CPPUNIT_ASSERT((++i)->matches(10));//msg 10
+    }
+};
+
+// Make this test suite a plugin.
+CPPUNIT_PLUGIN_IMPLEMENT();
+CPPUNIT_TEST_SUITE_REGISTRATION(TxAckTest);
+

Propchange: incubator/qpid/trunk/qpid/cpp/tests/TxAckTest.cpp
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/qpid/trunk/qpid/cpp/tests/TxAckTest.cpp
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Added: incubator/qpid/trunk/qpid/cpp/tests/TxBufferTest.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/tests/TxBufferTest.cpp?view=auto&rev=481159
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/tests/TxBufferTest.cpp (added)
+++ incubator/qpid/trunk/qpid/cpp/tests/TxBufferTest.cpp Thu Nov 30 21:11:45 2006
@@ -0,0 +1,266 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include <TxBuffer.h>
+#include <qpid_test_plugin.h>
+#include <iostream>
+#include <vector>
+
+using namespace qpid::broker;
+
+template <class T> void assertEqualVector(std::vector<T>& expected, std::vector<T>& actual){
+    unsigned int i = 0;
+    while(i < expected.size() && i < actual.size()){
+        CPPUNIT_ASSERT_EQUAL(expected[i], actual[i]);
+        i++;
+    }
+    CPPUNIT_ASSERT(i == expected.size());
+    CPPUNIT_ASSERT(i == actual.size());
+}
+
+class TxBufferTest : public CppUnit::TestCase  
+{
+    class MockTxOp : public TxOp{
+        enum op_codes {PREPARE=2, COMMIT=4, ROLLBACK=8};
+        std::vector<int> expected;
+        std::vector<int> actual;
+        bool failOnPrepare;
+    public:
+        MockTxOp() : failOnPrepare(false) {}
+        MockTxOp(bool _failOnPrepare) : failOnPrepare(_failOnPrepare) {}
+
+        bool prepare(TransactionContext*) throw(){
+            actual.push_back(PREPARE);
+            return !failOnPrepare;
+        }
+        void commit()  throw(){
+            actual.push_back(COMMIT);
+        }
+        void rollback()  throw(){
+            actual.push_back(ROLLBACK);
+        }
+        MockTxOp& expectPrepare(){
+            expected.push_back(PREPARE);
+            return *this;
+        }
+        MockTxOp& expectCommit(){
+            expected.push_back(COMMIT);
+            return *this;
+        }
+        MockTxOp& expectRollback(){
+            expected.push_back(ROLLBACK);
+            return *this;
+        }
+        void check(){
+            assertEqualVector(expected, actual);
+        }
+        ~MockTxOp(){}        
+    };
+
+    class MockTransactionalStore : public TransactionalStore{
+        enum op_codes {BEGIN=2, COMMIT=4, ABORT=8};
+        std::vector<int> expected;
+        std::vector<int> actual;
+
+        enum states {OPEN = 1, COMMITTED = 2, ABORTED = 3};
+        int state;
+
+        class TestTransactionContext : public TransactionContext{
+            MockTransactionalStore* store;
+        public:
+            TestTransactionContext(MockTransactionalStore* _store) : store(_store) {}
+            void commit(){
+                if(store->state != OPEN) throw "txn already completed";
+                store->state = COMMITTED;
+            }
+
+            void abort(){
+                if(store->state != OPEN) throw "txn already completed";
+                store->state = ABORTED;
+            }
+            ~TestTransactionContext(){}
+        };
+
+
+    public:
+        MockTransactionalStore() : state(OPEN){}
+
+        std::auto_ptr<TransactionContext> begin(){ 
+            actual.push_back(BEGIN);
+            std::auto_ptr<TransactionContext> txn(new TestTransactionContext(this));
+            return txn;
+        }
+        void commit(TransactionContext* ctxt){
+            actual.push_back(COMMIT);
+            TestTransactionContext* txn(dynamic_cast<TestTransactionContext*>(ctxt));
+            CPPUNIT_ASSERT(txn);
+            txn->commit();
+        }
+        void abort(TransactionContext* ctxt){
+            actual.push_back(ABORT);
+            TestTransactionContext* txn(dynamic_cast<TestTransactionContext*>(ctxt));
+            CPPUNIT_ASSERT(txn);
+            txn->abort();
+        }        
+        MockTransactionalStore& expectBegin(){
+            expected.push_back(BEGIN);
+            return *this;
+        }
+        MockTransactionalStore& expectCommit(){
+            expected.push_back(COMMIT);
+            return *this;
+        }
+        MockTransactionalStore& expectAbort(){
+            expected.push_back(ABORT);
+            return *this;
+        }
+        void check(){
+            assertEqualVector(expected, actual);
+        }
+
+        bool isCommitted(){
+            return state == COMMITTED;
+        }
+        
+        bool isAborted(){
+            return state == ABORTED;
+        }
+        
+        bool isOpen(){
+            return state == OPEN;
+        }
+        ~MockTransactionalStore(){}
+    };
+
+    CPPUNIT_TEST_SUITE(TxBufferTest);
+    CPPUNIT_TEST(testPrepareAndCommit);
+    CPPUNIT_TEST(testFailOnPrepare);
+    CPPUNIT_TEST(testRollback);
+    CPPUNIT_TEST(testBufferIsClearedAfterRollback);
+    CPPUNIT_TEST(testBufferIsClearedAfterCommit);
+    CPPUNIT_TEST_SUITE_END();
+
+  public:
+
+    void testPrepareAndCommit(){
+        MockTransactionalStore store;
+        store.expectBegin().expectCommit();
+
+        MockTxOp opA;
+        opA.expectPrepare().expectCommit();
+        MockTxOp opB;
+        opB.expectPrepare().expectPrepare().expectCommit().expectCommit();//opB enlisted twice to test reative order
+        MockTxOp opC;
+        opC.expectPrepare().expectCommit();
+
+        TxBuffer buffer;
+        buffer.enlist(&opA);
+        buffer.enlist(&opB);
+        buffer.enlist(&opB);//opB enlisted twice
+        buffer.enlist(&opC);
+
+        CPPUNIT_ASSERT(buffer.prepare(&store));
+        buffer.commit();
+        store.check();
+        CPPUNIT_ASSERT(store.isCommitted());
+        opA.check();
+        opB.check();
+        opC.check();
+    }
+
+    void testFailOnPrepare(){
+        MockTransactionalStore store;
+        store.expectBegin().expectAbort();
+
+        MockTxOp opA;
+        opA.expectPrepare();
+        MockTxOp opB(true);
+        opB.expectPrepare();
+        MockTxOp opC;//will never get prepare as b will fail
+
+        TxBuffer buffer;
+        buffer.enlist(&opA);
+        buffer.enlist(&opB);
+        buffer.enlist(&opC);
+
+        CPPUNIT_ASSERT(!buffer.prepare(&store));
+        store.check();
+        CPPUNIT_ASSERT(store.isAborted());
+        opA.check();
+        opB.check();
+        opC.check();
+    }
+
+    void testRollback(){
+        MockTxOp opA;
+        opA.expectRollback();
+        MockTxOp opB(true);
+        opB.expectRollback();
+        MockTxOp opC;
+        opC.expectRollback();
+
+        TxBuffer buffer;
+        buffer.enlist(&opA);
+        buffer.enlist(&opB);
+        buffer.enlist(&opC);
+
+        buffer.rollback();
+        opA.check();
+        opB.check();
+        opC.check();
+    }
+
+    void testBufferIsClearedAfterRollback(){
+        MockTxOp opA;
+        opA.expectRollback();
+        MockTxOp opB;
+        opB.expectRollback();
+
+        TxBuffer buffer;
+        buffer.enlist(&opA);
+        buffer.enlist(&opB);
+
+        buffer.rollback();
+        buffer.commit();//second call should not reach ops
+        opA.check();
+        opB.check();
+    }
+
+    void testBufferIsClearedAfterCommit(){
+        MockTxOp opA;
+        opA.expectCommit();
+        MockTxOp opB;
+        opB.expectCommit();
+
+        TxBuffer buffer;
+        buffer.enlist(&opA);
+        buffer.enlist(&opB);
+
+        buffer.commit();
+        buffer.rollback();//second call should not reach ops
+        opA.check();
+        opB.check();
+    }
+};
+
+// Make this test suite a plugin.
+CPPUNIT_PLUGIN_IMPLEMENT();
+CPPUNIT_TEST_SUITE_REGISTRATION(TxBufferTest);
+

Propchange: incubator/qpid/trunk/qpid/cpp/tests/TxBufferTest.cpp
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/qpid/trunk/qpid/cpp/tests/TxBufferTest.cpp
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Added: incubator/qpid/trunk/qpid/cpp/tests/TxPublishTest.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/tests/TxPublishTest.cpp?view=auto&rev=481159
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/tests/TxPublishTest.cpp (added)
+++ incubator/qpid/trunk/qpid/cpp/tests/TxPublishTest.cpp Thu Nov 30 21:11:45 2006
@@ -0,0 +1,105 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include <NullMessageStore.h>
+#include <RecoveryManager.h>
+#include <TxPublish.h>
+#include <qpid_test_plugin.h>
+#include <iostream>
+#include <list>
+#include <vector>
+
+using std::list;
+using std::pair;
+using std::vector;
+using namespace qpid::broker;
+using namespace qpid::framing;
+
+class TxPublishTest : public CppUnit::TestCase  
+{
+
+    class TestMessageStore : public NullMessageStore
+    {
+    public:
+        vector< pair<string, Message::shared_ptr> > enqueued;
+        
+        void enqueue(TransactionContext*, Message::shared_ptr& msg, const Queue& queue, const string * const /*xid*/)
+        {
+            enqueued.push_back(pair<string, Message::shared_ptr>(queue.getName(),msg));
+        }
+        
+        //dont care about any of the other methods:
+        TestMessageStore() : NullMessageStore(false) {}
+        ~TestMessageStore(){}
+    };
+    
+    CPPUNIT_TEST_SUITE(TxPublishTest);
+    CPPUNIT_TEST(testPrepare);
+    CPPUNIT_TEST(testCommit);
+    CPPUNIT_TEST_SUITE_END();
+    
+    
+    TestMessageStore store;
+    Queue::shared_ptr queue1;
+    Queue::shared_ptr queue2;
+    Message::shared_ptr msg;
+    TxPublish op;
+    
+    
+public:
+    
+    TxPublishTest() : queue1(new Queue("queue1", false, &store, 0)), 
+                      queue2(new Queue("queue2", false, &store, 0)), 
+                      msg(new Message(0, "exchange", "routing_key", false, false)),
+                      op(msg)
+    {
+        msg->setHeader(AMQHeaderBody::shared_ptr(new AMQHeaderBody(BASIC)));
+        msg->getHeaderProperties()->setDeliveryMode(PERSISTENT);
+        op.deliverTo(queue1);
+        op.deliverTo(queue2);
+    }      
+
+    void testPrepare()
+    {
+        //ensure messages are enqueued in store
+        op.prepare(0);
+        CPPUNIT_ASSERT_EQUAL((size_t) 2, store.enqueued.size());
+        CPPUNIT_ASSERT_EQUAL(string("queue1"), store.enqueued[0].first);
+        CPPUNIT_ASSERT_EQUAL(msg, store.enqueued[0].second);
+        CPPUNIT_ASSERT_EQUAL(string("queue2"), store.enqueued[1].first);
+        CPPUNIT_ASSERT_EQUAL(msg, store.enqueued[1].second);
+    }
+
+    void testCommit()
+    {
+        //ensure messages are delivered to queue
+        op.commit();
+        CPPUNIT_ASSERT_EQUAL((u_int32_t) 1, queue1->getMessageCount());
+        CPPUNIT_ASSERT_EQUAL(msg, queue1->dequeue());
+
+        CPPUNIT_ASSERT_EQUAL((u_int32_t) 1, queue2->getMessageCount());
+        CPPUNIT_ASSERT_EQUAL(msg, queue2->dequeue());            
+    }
+};
+
+// Make this test suite a plugin.
+CPPUNIT_PLUGIN_IMPLEMENT();
+CPPUNIT_TEST_SUITE_REGISTRATION(TxPublishTest);
+

Propchange: incubator/qpid/trunk/qpid/cpp/tests/TxPublishTest.cpp
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/qpid/trunk/qpid/cpp/tests/TxPublishTest.cpp
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Added: incubator/qpid/trunk/qpid/cpp/tests/ValueTest.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/tests/ValueTest.cpp?view=auto&rev=481159
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/tests/ValueTest.cpp (added)
+++ incubator/qpid/trunk/qpid/cpp/tests/ValueTest.cpp Thu Nov 30 21:11:45 2006
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+#include <Value.h>
+#include <qpid_test_plugin.h>
+
+using namespace qpid::framing;
+
+
+class ValueTest : public CppUnit::TestCase 
+{
+    CPPUNIT_TEST_SUITE(ValueTest);
+    CPPUNIT_TEST(testStringValueEquals);
+    CPPUNIT_TEST(testIntegerValueEquals);
+    CPPUNIT_TEST(testDecimalValueEquals);
+    CPPUNIT_TEST(testFieldTableValueEquals);
+    CPPUNIT_TEST_SUITE_END();
+
+    StringValue s;
+    IntegerValue i;
+    DecimalValue d;
+    FieldTableValue ft;
+    EmptyValue e;
+
+  public:
+    ValueTest() :
+        s("abc"),
+        i(42),
+        d(1234,2)
+        
+    {
+        ft.getValue().setString("foo", "FOO");
+        ft.getValue().setInt("magic", 7);
+    }
+    
+    void testStringValueEquals() 
+    {
+        
+        CPPUNIT_ASSERT(StringValue("abc") == s);
+        CPPUNIT_ASSERT(s != StringValue("foo"));
+        CPPUNIT_ASSERT(s != e);
+        CPPUNIT_ASSERT(e != d);
+        CPPUNIT_ASSERT(e != ft);
+    }
+
+    void testIntegerValueEquals()
+    {
+        CPPUNIT_ASSERT(IntegerValue(42) == i);
+        CPPUNIT_ASSERT(IntegerValue(5) != i);
+        CPPUNIT_ASSERT(i != e);
+        CPPUNIT_ASSERT(i != d);
+    }
+
+    void testDecimalValueEquals() 
+    {
+        CPPUNIT_ASSERT(DecimalValue(1234, 2) == d);
+        CPPUNIT_ASSERT(DecimalValue(12345, 2) != d);
+        CPPUNIT_ASSERT(DecimalValue(1234, 3) != d);
+        CPPUNIT_ASSERT(d != s);
+    }
+
+
+    void testFieldTableValueEquals()
+    {
+        CPPUNIT_ASSERT_EQUAL(std::string("FOO"),
+                             ft.getValue().getString("foo"));
+        CPPUNIT_ASSERT_EQUAL(7, ft.getValue().getInt("magic"));
+        
+        FieldTableValue f2;
+        CPPUNIT_ASSERT(ft != f2);
+        f2.getValue().setString("foo", "FOO");
+        CPPUNIT_ASSERT(ft != f2);
+        f2.getValue().setInt("magic", 7);
+        CPPUNIT_ASSERT_EQUAL(ft,f2);
+        CPPUNIT_ASSERT(ft == f2);
+        f2.getValue().setString("foo", "BAR");
+        CPPUNIT_ASSERT(ft != f2);
+        CPPUNIT_ASSERT(ft != i);
+    }
+    
+};
+
+    
+// Make this test suite a plugin.
+CPPUNIT_PLUGIN_IMPLEMENT();
+CPPUNIT_TEST_SUITE_REGISTRATION(ValueTest);
+

Propchange: incubator/qpid/trunk/qpid/cpp/tests/ValueTest.cpp
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/qpid/trunk/qpid/cpp/tests/ValueTest.cpp
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Added: incubator/qpid/trunk/qpid/cpp/tests/broker
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/tests/broker?view=auto&rev=481159
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/tests/broker (added)
+++ incubator/qpid/trunk/qpid/cpp/tests/broker Thu Nov 30 21:11:45 2006
@@ -0,0 +1,45 @@
+#!/bin/sh
+. `dirname $0`/env
+
+brokerpid() {
+    netstat -tpl 2> /dev/null | awk '/amqp/ {print gensub("/.*$","","g",$7) }'
+}
+
+killbroker () {
+    PID=`brokerpid`
+    if [ -n "$PID" ] ; then kill $PID ; fi
+    for ((i=5;i--;)) {
+	if [ -z "`brokerpid`" ] ; then exit 0 ; fi
+	sleep 1
+    }
+    echo "Broker `brokerpid` refuses to die."
+}
+
+waitbroker () {
+    while [ -z `brokerpid` ] ; do sleep 1 ; done
+}
+
+startbroker() {
+    case $1 in
+	j)   
+	    export AMQJ_LOGGING_LEVEL=fatal
+	    export JDPA_OPTS=
+	    export QPID_OPTS=-Xmx1024M
+	    export debug=1
+	    CMD="qpid-server"
+	    qpid-run -run:print-command # Show the command line.
+	    ;;
+	c)  CMD=qpidd ;;
+    esac
+    nohup $CMD  > /dev/null 2>&1 &
+    waitbroker
+    echo Broker started: $CMD
+}
+
+
+case $1 in
+    j|c) startbroker $1 ;;
+    stop|kill) killbroker ;;
+    wait) waitbroker ;;
+    pid) brokerpid ;;
+esac

Propchange: incubator/qpid/trunk/qpid/cpp/tests/broker
------------------------------------------------------------------------------
    svn:executable = *

Added: incubator/qpid/trunk/qpid/cpp/tests/client_test.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/tests/client_test.cpp?view=auto&rev=481159
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/tests/client_test.cpp (added)
+++ incubator/qpid/trunk/qpid/cpp/tests/client_test.cpp Thu Nov 30 21:11:45 2006
@@ -0,0 +1,99 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include <iostream>
+
+#include <QpidError.h>
+#include <ClientChannel.h>
+#include <Connection.h>
+#include <ClientMessage.h>
+#include <MessageListener.h>
+#include <sys/Monitor.h>
+#include <FieldTable.h>
+
+using namespace qpid::client;
+using namespace qpid::sys;
+using std::string;
+
+class SimpleListener : public virtual MessageListener{
+    Monitor* monitor;
+
+public:
+    inline SimpleListener(Monitor* _monitor) : monitor(_monitor){}
+
+    inline virtual void received(Message& /*msg*/){
+	std::cout << "Received message " /**<< msg **/<< std::endl;
+	monitor->notify();
+    }
+};
+
+int main(int argc, char**)
+{
+    try{               
+	Connection con(argc > 1);
+	Channel channel;
+	Exchange exchange("MyExchange", Exchange::TOPIC_EXCHANGE);
+	Queue queue("MyQueue", true);
+	
+	string host("localhost");
+	
+	con.open(host);
+	std::cout << "Opened connection." << std::endl;
+	con.openChannel(&channel);
+	std::cout << "Opened channel." << std::endl;	
+	channel.declareExchange(exchange);
+	std::cout << "Declared exchange." << std::endl;
+	channel.declareQueue(queue);
+	std::cout << "Declared queue." << std::endl;
+	qpid::framing::FieldTable args;
+	channel.bind(exchange, queue, "MyTopic", args);
+	std::cout << "Bound queue to exchange." << std::endl;
+
+	//set up a message listener
+	Monitor monitor;
+	SimpleListener listener(&monitor);
+	string tag("MyTag");
+	channel.consume(queue, tag, &listener);
+	channel.start();
+	std::cout << "Registered consumer." << std::endl;
+
+	Message msg;
+	string data("MyMessage");
+	msg.setData(data);
+	channel.publish(msg, exchange, "MyTopic");
+	std::cout << "Published message." << std::endl;
+
+	{
+            Monitor::ScopedLock l(monitor);
+            monitor.wait();
+        }
+        
+	con.closeChannel(&channel);
+	std::cout << "Closed channel." << std::endl;
+	con.close();	
+	std::cout << "Closed connection." << std::endl;
+    }catch(qpid::QpidError error){
+	std::cout << "Error [" << error.code << "] " << error.msg << " ("
+                  << error.location.file << ":" << error.location.line
+                  << ")" << std::endl;
+	return 1;
+    }
+    return 0;
+}

Propchange: incubator/qpid/trunk/qpid/cpp/tests/client_test.cpp
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/qpid/trunk/qpid/cpp/tests/client_test.cpp
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Added: incubator/qpid/trunk/qpid/cpp/tests/echo_service.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/tests/echo_service.cpp?view=auto&rev=481159
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/tests/echo_service.cpp (added)
+++ incubator/qpid/trunk/qpid/cpp/tests/echo_service.cpp Thu Nov 30 21:11:45 2006
@@ -0,0 +1,198 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include <QpidError.h>
+#include <ClientChannel.h>
+#include <Connection.h>
+#include <ClientExchange.h>
+#include <MessageListener.h>
+#include <ClientQueue.h>
+#include <sys/Time.h>
+#include <iostream>
+#include <sstream>
+
+using namespace qpid::client;
+using namespace qpid::sys;
+using std::string;
+
+class EchoServer : public MessageListener{    
+    Channel* const channel;
+public:
+    EchoServer(Channel* channel);
+    virtual void received(Message& msg);
+};
+
+class LoggingListener : public MessageListener{    
+public:
+    virtual void received(Message& msg);
+};
+
+class Args{
+    string host;
+    int port;
+    bool trace;
+    bool help;
+    bool client;
+public:
+    inline Args() : host("localhost"), port(5672), trace(false), help(false), client(false){}
+    void parse(int argc, char** argv);
+    void usage();
+
+    inline const string& getHost() const { return host;}
+    inline int getPort() const { return port; }
+    inline bool getTrace() const { return trace; }
+    inline bool getHelp() const { return help; }
+    inline bool getClient() const { return client; }
+};
+
+int main(int argc, char** argv){
+    const std::string echo_service("echo_service");
+    Args args;
+    args.parse(argc, argv);
+    if (args.getHelp()) {
+        args.usage();
+    } else if (args.getClient()) {
+        try {
+            //Create connection & open a channel
+            Connection connection(args.getTrace());
+            connection.open(args.getHost(), args.getPort());
+            Channel channel;
+            connection.openChannel(&channel);
+        
+            //Setup: declare the private 'response' queue and bind it
+            //to the direct exchange by its name which will be
+            //generated by the server
+            Queue response;
+            channel.declareQueue(response);
+            qpid::framing::FieldTable emptyArgs;
+            channel.bind(Exchange::DEFAULT_DIRECT_EXCHANGE, response, response.getName(), emptyArgs);
+
+            //Consume from the response queue, logging all echoed message to console:
+            LoggingListener listener;
+            std::string tag;
+            channel.consume(response, tag, &listener);
+
+            //Process incoming requests on a new thread
+            channel.start();
+
+            //get messages from console and send them:
+            std::string text;
+            std::cout << "Enter text to send:" << std::endl;
+            while (std::getline(std::cin, text)) {
+                std::cout << "Sending " << text << " to echo server." << std::endl;
+                Message msg;
+                msg.getHeaders().setString("RESPONSE_QUEUE", response.getName());
+                msg.setData(text);
+                channel.publish(msg, Exchange::DEFAULT_DIRECT_EXCHANGE, echo_service);
+                
+                std::cout << "Enter text to send:" << std::endl;
+            }
+            
+            connection.close();
+        } catch(qpid::QpidError error) {
+            std::cout << error.what() << std::endl;
+        }        
+    } else {
+        try {
+            //Create connection & open a channel
+            Connection connection(args.getTrace());
+            connection.open(args.getHost(), args.getPort());
+            Channel channel;
+            connection.openChannel(&channel);
+        
+            //Setup: declare the 'request' queue and bind it to the direct exchange with a 'well known' name
+            Queue request("request");
+            channel.declareQueue(request);
+            qpid::framing::FieldTable emptyArgs;
+            channel.bind(Exchange::DEFAULT_DIRECT_EXCHANGE, request, echo_service, emptyArgs);
+
+            //Consume from the request queue, echoing back all messages received to the client that sent them
+            EchoServer server(&channel);
+            std::string tag = "server_tag";
+            channel.consume(request, tag, &server);
+
+            //Process incoming requests on the main thread
+            channel.run();
+            
+            connection.close();
+        } catch(qpid::QpidError error) {
+            std::cout << error.what() << std::endl;
+        }
+    }
+}
+
+EchoServer::EchoServer(Channel* _channel) : channel(_channel){}
+
+void EchoServer::received(Message& message)
+{
+    //get name of response queues binding to the default direct exchange:
+    const std::string name = message.getHeaders().getString("RESPONSE_QUEUE");
+
+    if (name.empty()) {
+        std::cout << "Cannot echo " << message.getData() << ", no response queue specified." << std::endl;
+    } else {
+        //print message to console:
+        std::cout << "Echoing " << message.getData() << " back to " << name << std::endl;
+        
+        //'echo' the message back:
+        channel->publish(message, Exchange::DEFAULT_DIRECT_EXCHANGE, name);
+    }
+}
+
+void LoggingListener::received(Message& message)
+{
+    //print message to console:
+    std::cout << "Received echo: " << message.getData() << std::endl;
+}
+
+
+void Args::parse(int argc, char** argv){
+    for(int i = 1; i < argc; i++){
+        string name(argv[i]);
+        if("-help" == name){
+            help = true;
+            break;
+        }else if("-host" == name){
+            host = argv[++i];
+        }else if("-port" == name){
+            port = atoi(argv[++i]);
+        }else if("-trace" == name){
+            trace = true;
+        }else if("-client" == name){
+            client = true;
+        }else{
+            std::cout << "Warning: unrecognised option " << name << std::endl;
+        }
+    }
+}
+
+void Args::usage(){
+    std::cout << "Options:" << std::endl;
+    std::cout << "    -help" << std::endl;
+    std::cout << "            Prints this usage message" << std::endl;
+    std::cout << "    -host <host>" << std::endl;
+    std::cout << "            Specifies host to connect to (default is localhost)" << std::endl;
+    std::cout << "    -port <port>" << std::endl;
+    std::cout << "            Specifies port to conect to (default is 5762)" << std::endl;
+    std::cout << "    -trace" << std::endl;
+    std::cout << "            Indicates that the frames sent and received should be logged" << std::endl;
+    std::cout << "    -client" << std::endl;
+    std::cout << "            Run as a client (else will run as a server)" << std::endl;
+}

Propchange: incubator/qpid/trunk/qpid/cpp/tests/echo_service.cpp
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/qpid/trunk/qpid/cpp/tests/echo_service.cpp
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Added: incubator/qpid/trunk/qpid/cpp/tests/env
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/tests/env?view=auto&rev=481159
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/tests/env (added)
+++ incubator/qpid/trunk/qpid/cpp/tests/env Thu Nov 30 21:11:45 2006
@@ -0,0 +1,23 @@
+#!/bin/bash
+# Set environment variables for test scripts.
+
+pathmunge () {
+    if ! echo $PATH | /bin/egrep -q "(^|:)$1($|:)" ; then
+	if [ "$2" = "after" ] ; then
+	    PATH=$PATH:$1
+	else
+	    PATH=$1:$PATH
+	fi
+    fi
+}
+
+if [ -z QPID_ROOT ] ; then echo "You must set QPID_ROOT" ; fi
+ 
+pathmunge $QPID_ROOT/cpp/test/bin
+pathmunge $QPID_ROOT/cpp/build/*/bin
+pathmunge $QPID_ROOT/cpp/build/*/test
+
+export QPID_HOME=${QPID_HOME:-$QPID_ROOT/java/build}
+pathmunge $QPID_HOME/bin
+
+

Propchange: incubator/qpid/trunk/qpid/cpp/tests/env
------------------------------------------------------------------------------
    svn:executable = *

Added: incubator/qpid/trunk/qpid/cpp/tests/gen.mk
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/tests/gen.mk?view=auto&rev=481159
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/tests/gen.mk (added)
+++ incubator/qpid/trunk/qpid/cpp/tests/gen.mk Thu Nov 30 21:11:45 2006
@@ -0,0 +1,101 @@
+client_test_SOURCES = client_test.cpp
+client_test_LDADD = $(lib_client) $(lib_common) $(extra_libs)
+echo_service_SOURCES = echo_service.cpp
+echo_service_LDADD = $(lib_client) $(lib_common) $(extra_libs)
+topic_listener_SOURCES = topic_listener.cpp
+topic_listener_LDADD = $(lib_client) $(lib_common) $(extra_libs)
+topic_publisher_SOURCES = topic_publisher.cpp
+topic_publisher_LDADD = $(lib_client) $(lib_common) $(extra_libs)
+AccumulatedAckTest_la_SOURCES = AccumulatedAckTest.cpp
+AccumulatedAckTest_la_LIBADD = $(lib_common)
+AccumulatedAckTest_la_LIBADD += $(lib_broker) $(extra_libs)
+AccumulatedAckTest_la_LDFLAGS = -module -rpath $(abs_builddir)
+ChannelTest_la_SOURCES = ChannelTest.cpp
+ChannelTest_la_LIBADD = $(lib_common)
+ChannelTest_la_LIBADD += $(lib_broker) $(extra_libs)
+ChannelTest_la_LDFLAGS = -module -rpath $(abs_builddir)
+ConfigurationTest_la_SOURCES = ConfigurationTest.cpp
+ConfigurationTest_la_LIBADD = $(lib_common)
+ConfigurationTest_la_LIBADD += $(lib_broker) $(extra_libs)
+ConfigurationTest_la_LDFLAGS = -module -rpath $(abs_builddir)
+ExchangeTest_la_SOURCES = ExchangeTest.cpp
+ExchangeTest_la_LIBADD = $(lib_common)
+ExchangeTest_la_LIBADD += $(lib_broker) $(extra_libs)
+ExchangeTest_la_LDFLAGS = -module -rpath $(abs_builddir)
+HeadersExchangeTest_la_SOURCES = HeadersExchangeTest.cpp
+HeadersExchangeTest_la_LIBADD = $(lib_common)
+HeadersExchangeTest_la_LIBADD += $(lib_broker) $(extra_libs)
+HeadersExchangeTest_la_LDFLAGS = -module -rpath $(abs_builddir)
+InMemoryContentTest_la_SOURCES = InMemoryContentTest.cpp
+InMemoryContentTest_la_LIBADD = $(lib_common)
+InMemoryContentTest_la_LIBADD += $(lib_broker) $(extra_libs)
+InMemoryContentTest_la_LDFLAGS = -module -rpath $(abs_builddir)
+LazyLoadedContentTest_la_SOURCES = LazyLoadedContentTest.cpp
+LazyLoadedContentTest_la_LIBADD = $(lib_common)
+LazyLoadedContentTest_la_LIBADD += $(lib_broker) $(extra_libs)
+LazyLoadedContentTest_la_LDFLAGS = -module -rpath $(abs_builddir)
+MessageBuilderTest_la_SOURCES = MessageBuilderTest.cpp
+MessageBuilderTest_la_LIBADD = $(lib_common)
+MessageBuilderTest_la_LIBADD += $(lib_broker) $(extra_libs)
+MessageBuilderTest_la_LDFLAGS = -module -rpath $(abs_builddir)
+MessageTest_la_SOURCES = MessageTest.cpp
+MessageTest_la_LIBADD = $(lib_common)
+MessageTest_la_LIBADD += $(lib_broker) $(extra_libs)
+MessageTest_la_LDFLAGS = -module -rpath $(abs_builddir)
+QueueRegistryTest_la_SOURCES = QueueRegistryTest.cpp
+QueueRegistryTest_la_LIBADD = $(lib_common)
+QueueRegistryTest_la_LIBADD += $(lib_broker) $(extra_libs)
+QueueRegistryTest_la_LDFLAGS = -module -rpath $(abs_builddir)
+QueueTest_la_SOURCES = QueueTest.cpp
+QueueTest_la_LIBADD = $(lib_common)
+QueueTest_la_LIBADD += $(lib_broker) $(extra_libs)
+QueueTest_la_LDFLAGS = -module -rpath $(abs_builddir)
+TopicExchangeTest_la_SOURCES = TopicExchangeTest.cpp
+TopicExchangeTest_la_LIBADD = $(lib_common)
+TopicExchangeTest_la_LIBADD += $(lib_broker) $(extra_libs)
+TopicExchangeTest_la_LDFLAGS = -module -rpath $(abs_builddir)
+TxAckTest_la_SOURCES = TxAckTest.cpp
+TxAckTest_la_LIBADD = $(lib_common)
+TxAckTest_la_LIBADD += $(lib_broker) $(extra_libs)
+TxAckTest_la_LDFLAGS = -module -rpath $(abs_builddir)
+TxBufferTest_la_SOURCES = TxBufferTest.cpp
+TxBufferTest_la_LIBADD = $(lib_common)
+TxBufferTest_la_LIBADD += $(lib_broker) $(extra_libs)
+TxBufferTest_la_LDFLAGS = -module -rpath $(abs_builddir)
+TxPublishTest_la_SOURCES = TxPublishTest.cpp
+TxPublishTest_la_LIBADD = $(lib_common)
+TxPublishTest_la_LIBADD += $(lib_broker) $(extra_libs)
+TxPublishTest_la_LDFLAGS = -module -rpath $(abs_builddir)
+ValueTest_la_SOURCES = ValueTest.cpp
+ValueTest_la_LIBADD = $(lib_common)
+ValueTest_la_LIBADD += $(lib_broker) $(extra_libs)
+ValueTest_la_LDFLAGS = -module -rpath $(abs_builddir)
+BodyHandlerTest_la_SOURCES = BodyHandlerTest.cpp
+BodyHandlerTest_la_LIBADD = $(lib_common)
+BodyHandlerTest_la_LIBADD += $(lib_broker) $(extra_libs)
+BodyHandlerTest_la_LDFLAGS = -module -rpath $(abs_builddir)
+FieldTableTest_la_SOURCES = FieldTableTest.cpp
+FieldTableTest_la_LIBADD = $(lib_common)
+FieldTableTest_la_LIBADD += $(lib_broker) $(extra_libs)
+FieldTableTest_la_LDFLAGS = -module -rpath $(abs_builddir)
+FramingTest_la_SOURCES = FramingTest.cpp
+FramingTest_la_LIBADD = $(lib_common)
+FramingTest_la_LIBADD += $(lib_broker) $(extra_libs)
+FramingTest_la_LDFLAGS = -module -rpath $(abs_builddir)
+HeaderTest_la_SOURCES = HeaderTest.cpp
+HeaderTest_la_LIBADD = $(lib_common)
+HeaderTest_la_LIBADD += $(lib_broker) $(extra_libs)
+HeaderTest_la_LDFLAGS = -module -rpath $(abs_builddir)
+ExceptionTest_la_SOURCES = ExceptionTest.cpp
+ExceptionTest_la_LIBADD = $(lib_common)
+ExceptionTest_la_LIBADD += $(lib_broker) $(extra_libs)
+ExceptionTest_la_LDFLAGS = -module -rpath $(abs_builddir)
+EventChannelTest_la_SOURCES = EventChannelTest.cpp
+EventChannelTest_la_LIBADD = $(lib_common)
+EventChannelTest_la_LIBADD += $(lib_broker) $(extra_libs)
+EventChannelTest_la_LDFLAGS = -module -rpath $(abs_builddir)
+EventChannelThreadsTest_la_SOURCES = EventChannelThreadsTest.cpp
+EventChannelThreadsTest_la_LIBADD = $(lib_common)
+EventChannelThreadsTest_la_LIBADD += $(lib_broker) $(extra_libs)
+EventChannelThreadsTest_la_LDFLAGS = -module -rpath $(abs_builddir)
+check_LTLIBRARIES = AccumulatedAckTest.la ChannelTest.la ConfigurationTest.la ExchangeTest.la HeadersExchangeTest.la InMemoryContentTest.la LazyLoadedContentTest.la MessageBuilderTest.la MessageTest.la QueueRegistryTest.la QueueTest.la TopicExchangeTest.la TxAckTest.la TxBufferTest.la TxPublishTest.la ValueTest.la BodyHandlerTest.la FieldTableTest.la FramingTest.la HeaderTest.la ExceptionTest.la EventChannelTest.la EventChannelThreadsTest.la

Added: incubator/qpid/trunk/qpid/cpp/tests/qpid_test_plugin.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/tests/qpid_test_plugin.h?view=auto&rev=481159
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/tests/qpid_test_plugin.h (added)
+++ incubator/qpid/trunk/qpid/cpp/tests/qpid_test_plugin.h Thu Nov 30 21:11:45 2006
@@ -0,0 +1,43 @@
+#ifndef _qpid_test_plugin_
+#define _qpid_test_plugin_
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+/**
+ * Convenience to include cppunit headers needed by qpid test plugins and
+ * workaround for warning from superfluous main() declaration
+ * in cppunit/TestPlugIn.h
+ */
+
+#include <cppunit/TestCase.h>
+#include <cppunit/TextTestRunner.h>
+#include <cppunit/extensions/HelperMacros.h>
+#include <cppunit/plugin/TestPlugIn.h>
+
+// Redefine CPPUNIT_PLUGIN_IMPLEMENT_MAIN to a dummy typedef to avoid warnings.
+// 
+#if defined(CPPUNIT_HAVE_UNIX_DLL_LOADER) || defined(CPPUNIT_HAVE_UNIX_SHL_LOADER)
+#undef CPPUNIT_PLUGIN_IMPLEMENT_MAIN 
+#define CPPUNIT_PLUGIN_IMPLEMENT_MAIN() typedef char __CppUnitPlugInImplementMainDummyTypeDef
+#endif
+
+#endif  /*!_qpid_test_plugin_*/

Propchange: incubator/qpid/trunk/qpid/cpp/tests/qpid_test_plugin.h
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/qpid/trunk/qpid/cpp/tests/qpid_test_plugin.h
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Added: incubator/qpid/trunk/qpid/cpp/tests/topic_listener.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/tests/topic_listener.cpp?view=auto&rev=481159
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/tests/topic_listener.cpp (added)
+++ incubator/qpid/trunk/qpid/cpp/tests/topic_listener.cpp Thu Nov 30 21:11:45 2006
@@ -0,0 +1,186 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include <QpidError.h>
+#include <ClientChannel.h>
+#include <Connection.h>
+#include <ClientExchange.h>
+#include <MessageListener.h>
+#include <ClientQueue.h>
+#include <sys/Time.h>
+#include <iostream>
+#include <sstream>
+
+using namespace qpid::client;
+using namespace qpid::sys;
+using std::string;
+
+class Listener : public MessageListener{    
+    Channel* const channel;
+    const std::string responseQueue;
+    const bool transactional;
+    bool init;
+    int count;
+    Time start;
+    
+    void shutdown();
+    void report();
+public:
+    Listener(Channel* channel, const std::string& reponseQueue, bool tx);
+    virtual void received(Message& msg);
+};
+
+class Args{
+    string host;
+    int port;
+    int ackMode;
+    bool transactional;
+    int prefetch;
+    bool trace;
+    bool help;
+public:
+    inline Args() : host("localhost"), port(5672), ackMode(NO_ACK), transactional(false), prefetch(1000), trace(false), help(false){}
+    void parse(int argc, char** argv);
+    void usage();
+
+    inline const string& getHost() const { return host;}
+    inline int getPort() const { return port; }
+    inline int getAckMode(){ return ackMode; }
+    inline bool getTransactional() const { return transactional; }
+    inline int getPrefetch(){ return prefetch; }
+    inline bool getTrace() const { return trace; }
+    inline bool getHelp() const { return help; }
+};
+
+int main(int argc, char** argv){
+    Args args;
+    args.parse(argc, argv);
+    if(args.getHelp()){
+        args.usage();
+    }else{
+        try{
+            Connection connection(args.getTrace());
+            connection.open(args.getHost(), args.getPort());
+            Channel channel(args.getTransactional(), args.getPrefetch());
+            connection.openChannel(&channel);
+        
+            //declare exchange, queue and bind them:
+            Queue response("response");
+            channel.declareQueue(response);
+        
+            Queue control;
+            channel.declareQueue(control);
+            qpid::framing::FieldTable bindArgs;
+            channel.bind(Exchange::DEFAULT_TOPIC_EXCHANGE, control, "topic_control", bindArgs);
+            //set up listener
+            Listener listener(&channel, response.getName(), args.getTransactional());
+            std::string tag;
+            channel.consume(control, tag, &listener, args.getAckMode());
+            channel.run();
+            connection.close();
+        }catch(qpid::QpidError error){
+            std::cout << error.what() << std::endl;
+        }
+    }
+}
+
+Listener::Listener(Channel* _channel, const std::string& _responseq, bool tx) : 
+    channel(_channel), responseQueue(_responseq), transactional(tx), init(false), count(0){}
+
+void Listener::received(Message& message){
+    if(!init){        
+        start = now();
+        count = 0;
+        init = true;
+    }
+    std::string type(message.getHeaders().getString("TYPE"));
+
+    if(type == "TERMINATION_REQUEST"){
+        shutdown();
+    }else if(type == "REPORT_REQUEST"){        
+        //send a report:
+        report();
+        init = false;
+    }else if (++count % 100 == 0){        
+        std::cout <<"Received " << count << " messages." << std::endl;
+    }
+}
+
+void Listener::shutdown(){
+    channel->close();
+}
+
+void Listener::report(){
+    Time finish = now();
+    Time time = finish - start;
+    std::stringstream reportstr;
+    reportstr << "Received " << count << " messages in "
+              << time/TIME_MSEC << " ms.";
+    Message msg;
+    msg.setData(reportstr.str());
+    channel->publish(msg, string(), responseQueue);
+    if(transactional){
+        channel->commit();
+    }
+}
+
+
+void Args::parse(int argc, char** argv){
+    for(int i = 1; i < argc; i++){
+        string name(argv[i]);
+        if("-help" == name){
+            help = true;
+            break;
+        }else if("-host" == name){
+            host = argv[++i];
+        }else if("-port" == name){
+            port = atoi(argv[++i]);
+        }else if("-ack_mode" == name){
+            ackMode = atoi(argv[++i]);
+        }else if("-transactional" == name){
+            transactional = true;
+        }else if("-prefetch" == name){
+            prefetch = atoi(argv[++i]);
+        }else if("-trace" == name){
+            trace = true;
+        }else{
+            std::cout << "Warning: unrecognised option " << name << std::endl;
+        }
+    }
+}
+
+void Args::usage(){
+    std::cout << "Options:" << std::endl;
+    std::cout << "    -help" << std::endl;
+    std::cout << "            Prints this usage message" << std::endl;
+    std::cout << "    -host <host>" << std::endl;
+    std::cout << "            Specifies host to connect to (default is localhost)" << std::endl;
+    std::cout << "    -port <port>" << std::endl;
+    std::cout << "            Specifies port to conect to (default is 5762)" << std::endl;
+    std::cout << "    -ack_mode <mode>" << std::endl;
+    std::cout << "            Sets the acknowledgement mode" << std::endl;
+    std::cout << "            0=NO_ACK (default), 1=AUTO_ACK, 2=LAZY_ACK" << std::endl;
+    std::cout << "    -transactional" << std::endl;
+    std::cout << "            Indicates the client should use transactions" << std::endl;
+    std::cout << "    -prefetch <count>" << std::endl;
+    std::cout << "            Specifies the prefetch count (default is 1000)" << std::endl;
+    std::cout << "    -trace" << std::endl;
+    std::cout << "            Indicates that the frames sent and received should be logged" << std::endl;
+}

Propchange: incubator/qpid/trunk/qpid/cpp/tests/topic_listener.cpp
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/qpid/trunk/qpid/cpp/tests/topic_listener.cpp
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Added: incubator/qpid/trunk/qpid/cpp/tests/topic_publisher.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/tests/topic_publisher.cpp?view=auto&rev=481159
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/tests/topic_publisher.cpp (added)
+++ incubator/qpid/trunk/qpid/cpp/tests/topic_publisher.cpp Thu Nov 30 21:11:45 2006
@@ -0,0 +1,258 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include <QpidError.h>
+#include <ClientChannel.h>
+#include <Connection.h>
+#include <ClientExchange.h>
+#include <MessageListener.h>
+#include <ClientQueue.h>
+#include <sys/Monitor.h>
+#include "unistd.h"
+#include <sys/Time.h>
+#include <cstdlib>
+#include <iostream>
+
+using namespace qpid::client;
+using namespace qpid::sys;
+using std::string;
+
+class Publisher : public MessageListener{    
+    Channel* const channel;
+    const std::string controlTopic;
+    const bool transactional;
+    Monitor monitor;
+    int count;
+    
+    void waitForCompletion(int msgs);
+    string generateData(int size);
+
+public:
+    Publisher(Channel* channel, const std::string& controlTopic, bool tx);
+    virtual void received(Message& msg);
+    int64_t publish(int msgs, int listeners, int size);
+    void terminate();
+};
+
+class Args{
+    string host;
+    int port;
+    int messages;
+    int subscribers;
+    int ackMode;
+    bool transactional;
+    int prefetch;
+    int batches;
+    int delay;
+    int size;
+    bool trace;
+    bool help;
+public:
+    inline Args() : host("localhost"), port(5672), messages(1000), subscribers(1), 
+                    ackMode(NO_ACK), transactional(false), prefetch(1000), batches(1), 
+                    delay(0), size(256), trace(false), help(false){}
+
+    void parse(int argc, char** argv);
+    void usage();
+
+    inline const string& getHost() const { return host;}
+    inline int getPort() const { return port; }
+    inline int getMessages() const { return messages; }
+    inline int getSubscribers() const { return subscribers; }
+    inline int getAckMode(){ return ackMode; }
+    inline bool getTransactional() const { return transactional; }
+    inline int getPrefetch(){ return prefetch; }
+    inline int getBatches(){ return batches; }
+    inline int getDelay(){ return delay; }
+    inline int getSize(){ return size; }
+    inline bool getTrace() const { return trace; }
+    inline bool getHelp() const { return help; }
+};
+
+int main(int argc, char** argv){
+    Args args;
+    args.parse(argc, argv);
+    if(args.getHelp()){
+        args.usage();
+    }else{
+        try{
+            Connection connection(args.getTrace());
+            connection.open(args.getHost(), args.getPort());
+            Channel channel(args.getTransactional(), args.getPrefetch());
+            connection.openChannel(&channel);
+
+            //declare queue (relying on default binding):
+            Queue response("response");
+            channel.declareQueue(response);
+
+            //set up listener
+            Publisher publisher(&channel, "topic_control", args.getTransactional());
+            std::string tag("mytag");
+            channel.consume(response, tag, &publisher, args.getAckMode());
+            channel.start();
+
+            int batchSize(args.getBatches());
+            int64_t max(0);
+            int64_t min(0);
+            int64_t sum(0);
+            for(int i = 0; i < batchSize; i++){
+                if(i > 0 && args.getDelay()) sleep(args.getDelay());
+                Time time = publisher.publish(
+                    args.getMessages(), args.getSubscribers(), args.getSize());
+                if(!max || time > max) max = time;
+                if(!min || time < min) min = time;
+                sum += time;
+                std::cout << "Completed " << (i+1) << " of " << batchSize
+                          << " in " << time/TIME_MSEC << "ms" << std::endl;
+            }
+            publisher.terminate();
+            int64_t avg = sum / batchSize;
+            if(batchSize > 1){
+                std::cout << batchSize << " batches completed. avg=" << avg << 
+                    ", max=" << max << ", min=" << min << std::endl;
+            }
+            channel.close();
+            connection.close();
+        }catch(qpid::QpidError error){
+            std::cout << error.what() << std::endl;
+        }
+    }
+}
+
+Publisher::Publisher(Channel* _channel, const std::string& _controlTopic, bool tx) : 
+    channel(_channel), controlTopic(_controlTopic), transactional(tx){}
+
+void Publisher::received(Message& ){
+    //count responses and when all are received end the current batch
+    Monitor::ScopedLock l(monitor);
+    if(--count == 0){
+        monitor.notify();
+    }
+}
+
+void Publisher::waitForCompletion(int msgs){
+    count = msgs;
+    monitor.wait();
+}
+
+int64_t Publisher::publish(int msgs, int listeners, int size){
+    Message msg;
+    msg.setData(generateData(size));
+    Time start = now();
+    {
+        Monitor::ScopedLock l(monitor);
+        for(int i = 0; i < msgs; i++){
+            channel->publish(msg, Exchange::DEFAULT_TOPIC_EXCHANGE, controlTopic);
+        }
+        //send report request
+        Message reportRequest;
+        reportRequest.getHeaders().setString("TYPE", "REPORT_REQUEST");
+        channel->publish(reportRequest, Exchange::DEFAULT_TOPIC_EXCHANGE, controlTopic);
+        if(transactional){
+            channel->commit();
+        }
+
+        waitForCompletion(listeners);
+    }
+
+    Time finish = now();
+    return finish - start; 
+}
+
+string Publisher::generateData(int size){
+    string data;
+    for(int i = 0; i < size; i++){
+        data += ('A' + (i / 26));
+    }
+    return data;
+}
+
+void Publisher::terminate(){
+    //send termination request
+    Message terminationRequest;
+    terminationRequest.getHeaders().setString("TYPE", "TERMINATION_REQUEST");
+    channel->publish(terminationRequest, Exchange::DEFAULT_TOPIC_EXCHANGE, controlTopic);
+    if(transactional){
+        channel->commit();
+    }
+}
+
+void Args::parse(int argc, char** argv){
+    for(int i = 1; i < argc; i++){
+        string name(argv[i]);
+        if("-help" == name){
+            help = true;
+            break;
+        }else if("-host" == name){
+            host = argv[++i];
+        }else if("-port" == name){
+            port = atoi(argv[++i]);
+        }else if("-messages" == name){
+            messages = atoi(argv[++i]);
+        }else if("-subscribers" == name){
+            subscribers = atoi(argv[++i]);
+        }else if("-ack_mode" == name){
+            ackMode = atoi(argv[++i]);
+        }else if("-transactional" == name){
+            transactional = true;
+        }else if("-prefetch" == name){
+            prefetch = atoi(argv[++i]);
+        }else if("-batches" == name){
+            batches = atoi(argv[++i]);
+        }else if("-delay" == name){
+            delay = atoi(argv[++i]);
+        }else if("-size" == name){
+            size = atoi(argv[++i]);
+        }else if("-trace" == name){
+            trace = true;
+        }else{
+            std::cout << "Warning: unrecognised option " << name << std::endl;
+        }
+    }
+}
+
+void Args::usage(){
+    std::cout << "Options:" << std::endl;
+    std::cout << "    -help" << std::endl;
+    std::cout << "            Prints this usage message" << std::endl;
+    std::cout << "    -host <host>" << std::endl;
+    std::cout << "            Specifies host to connect to (default is localhost)" << std::endl;
+    std::cout << "    -port <port>" << std::endl;
+    std::cout << "            Specifies port to conect to (default is 5762)" << std::endl;
+    std::cout << "    -messages <count>" << std::endl;
+    std::cout << "            Specifies how many messages to send" << std::endl;
+    std::cout << "    -subscribers <count>" << std::endl;
+    std::cout << "            Specifies how many subscribers to expect reports from" << std::endl;
+    std::cout << "    -ack_mode <mode>" << std::endl;
+    std::cout << "            Sets the acknowledgement mode" << std::endl;
+    std::cout << "            0=NO_ACK (default), 1=AUTO_ACK, 2=LAZY_ACK" << std::endl;
+    std::cout << "    -transactional" << std::endl;
+    std::cout << "            Indicates the client should use transactions" << std::endl;
+    std::cout << "    -prefetch <count>" << std::endl;
+    std::cout << "            Specifies the prefetch count (default is 1000)" << std::endl;
+    std::cout << "    -batches <count>" << std::endl;
+    std::cout << "            Specifies how many batches to run" << std::endl;
+    std::cout << "    -delay <seconds>" << std::endl;
+    std::cout << "            Causes a delay between each batch" << std::endl;
+    std::cout << "    -size <bytes>" << std::endl;
+    std::cout << "            Sets the size of the published messages (default is 256 bytes)" << std::endl;
+    std::cout << "    -trace" << std::endl;
+    std::cout << "            Indicates that the frames sent and received should be logged" << std::endl;
+}

Propchange: incubator/qpid/trunk/qpid/cpp/tests/topic_publisher.cpp
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/qpid/trunk/qpid/cpp/tests/topic_publisher.cpp
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Added: incubator/qpid/trunk/qpid/cpp/tests/topicall
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/tests/topicall?view=auto&rev=481159
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/tests/topicall (added)
+++ incubator/qpid/trunk/qpid/cpp/tests/topicall Thu Nov 30 21:11:45 2006
@@ -0,0 +1,25 @@
+#!/bin/sh
+# Do 3 runs of topictests for C++ and Java brokers with reduced output.
+
+. `dirname $0`/env
+
+# Run a short topictest to warm up the broker and iron out startup effects.
+flush() {
+    topic_listener >/dev/null 2>&1 &
+    topic_publisher >/dev/null 2>&1 
+}
+
+echo Java broker
+broker j ; flush
+topictest c | tail -n1
+topictest c | tail -n1
+topictest c | tail -n1
+
+echo C++ broker
+broker c ; flush
+topictest c | tail -n1
+topictest c | tail -n1
+topictest c | tail -n1
+
+# Don't bother with java clients we know they're slower.
+

Propchange: incubator/qpid/trunk/qpid/cpp/tests/topicall
------------------------------------------------------------------------------
    svn:executable = *

Added: incubator/qpid/trunk/qpid/cpp/tests/topictest
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/tests/topictest?view=auto&rev=481159
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/tests/topictest (added)
+++ incubator/qpid/trunk/qpid/cpp/tests/topictest Thu Nov 30 21:11:45 2006
@@ -0,0 +1,42 @@
+#!/bin/bash
+# Run the c++ or java topic test
+
+. `dirname $0`/env
+
+# Edit parameters here:
+
+# Big test:
+# LISTENERS=10
+# MESSAGES=10000
+# BATCHES=20
+
+LISTENERS=10
+MESSAGES=2000
+BATCHES=10
+
+cppcmds() {
+    LISTEN_CMD=topic_listener
+    PUBLISH_CMD="topic_publisher -messages $MESSAGES -batches $BATCHES -subscribers $LISTENERS"
+}
+
+javacmds() {
+    DEF=-Damqj.logging.level="error"
+    LISTEN_CMD="qpid-run $DEF org.apache.qpid.topic.Listener"
+    PUBLISH_CMD="qpid-run $DEF org.apache.qpid.topic.Publisher -messages $MESSAGES -batch $BATCHES -clients $LISTENERS"
+}
+
+case $1 in
+    c) cppcmds ;;
+    j) javacmds ;;
+    *) cppcmds ;;
+esac
+
+for ((i=$LISTENERS ; i--; )); do
+    $LISTEN_CMD  > /dev/null 2>&1 &
+done
+sleep 1
+echo $PUBLISH_CMD $OPTIONS
+
+STATS=~/bin/topictest.times
+echo "---- topictest `date`" >> $STATS
+$PUBLISH_CMD $OPTIONS | tee -a $STATS

Propchange: incubator/qpid/trunk/qpid/cpp/tests/topictest
------------------------------------------------------------------------------
    svn:executable = *