You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by gs...@apache.org on 2007/06/06 18:39:09 UTC

svn commit: r544879 - in /incubator/qpid/trunk/qpid: cpp/src/qpid/broker/ cpp/src/tests/ python/tests_0-9/

Author: gsim
Date: Wed Jun  6 09:39:03 2007
New Revision: 544879

URL: http://svn.apache.org/viewvc?view=rev&rev=544879
Log:
Merged in channel.flow implementation and interoperability tests.


Added:
    incubator/qpid/trunk/qpid/cpp/src/tests/BasicP2PTest.cpp   (with props)
    incubator/qpid/trunk/qpid/cpp/src/tests/BasicP2PTest.h   (with props)
    incubator/qpid/trunk/qpid/cpp/src/tests/BasicPubSubTest.cpp   (with props)
    incubator/qpid/trunk/qpid/cpp/src/tests/BasicPubSubTest.h   (with props)
    incubator/qpid/trunk/qpid/cpp/src/tests/SimpleTestCaseBase.cpp   (with props)
    incubator/qpid/trunk/qpid/cpp/src/tests/SimpleTestCaseBase.h   (with props)
    incubator/qpid/trunk/qpid/cpp/src/tests/TestCase.h   (with props)
    incubator/qpid/trunk/qpid/cpp/src/tests/TestOptions.h   (with props)
    incubator/qpid/trunk/qpid/cpp/src/tests/interop_runner.cpp   (with props)
Modified:
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerAdapter.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerChannel.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerChannel.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerQueue.cpp
    incubator/qpid/trunk/qpid/cpp/src/tests/   (props changed)
    incubator/qpid/trunk/qpid/cpp/src/tests/BrokerChannelTest.cpp
    incubator/qpid/trunk/qpid/cpp/src/tests/Makefile.am
    incubator/qpid/trunk/qpid/python/tests_0-9/broker.py

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerAdapter.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerAdapter.cpp?view=diff&rev=544879&r1=544878&r2=544879
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerAdapter.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerAdapter.cpp Wed Jun  6 09:39:03 2007
@@ -100,7 +100,11 @@
         std::string()/* ID */, context.getRequestId());
 } 
         
-void BrokerAdapter::ChannelHandlerImpl::flow(const MethodContext&, bool /*active*/){}         
+void BrokerAdapter::ChannelHandlerImpl::flow(const MethodContext& context, bool active){
+    channel.flow(active);
+    client.flowOk(active, context.getRequestId());
+}         
+
 void BrokerAdapter::ChannelHandlerImpl::flowOk(const MethodContext&, bool /*active*/){} 
         
 void BrokerAdapter::ChannelHandlerImpl::close(

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerChannel.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerChannel.cpp?view=diff&rev=544879&r1=544878&r2=544879
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerChannel.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerChannel.cpp Wed Jun  6 09:39:03 2007
@@ -66,6 +66,7 @@
     store(_store),
     messageBuilder(this, _store, _stagingThreshold),
     opened(id == 0),//channel 0 is automatically open, other must be explicitly opened
+    flowActive(true),
     adapter(new BrokerAdapter(*this, con, con.broker))
 {
     outstanding.reset();
@@ -221,7 +222,7 @@
 
 bool Channel::ConsumerImpl::deliver(Message::shared_ptr& msg){
     if(!connection || connection != msg->getPublisher()){//check for no_local
-        if(ackExpected && !parent->checkPrefetch(msg)){
+        if(!parent->flowActive || (ackExpected && !parent->checkPrefetch(msg))){
             blocked = true;
         }else{
             blocked = false;
@@ -394,5 +395,16 @@
         connection.close(e.code, e.toString(), method->amqpClassId(), method->amqpMethodId());
     }catch(std::exception& e){
         connection.close(541/*internal error*/, e.what(), method->amqpClassId(), method->amqpMethodId());
+    }
+}
+
+void Channel::flow(bool active)
+{
+    Mutex::ScopedLock locker(deliveryLock);
+    bool requestDelivery(!flowActive && active);
+    flowActive = active;
+    if (requestDelivery) {
+        //there may be messages that can be now be delivered
+        std::for_each(consumers.begin(), consumers.end(), boost::bind(&ConsumerImpl::requestDispatch, _1));
     }
 }

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerChannel.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerChannel.h?view=diff&rev=544879&r1=544878&r2=544879
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerChannel.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerChannel.h Wed Jun  6 09:39:03 2007
@@ -97,6 +97,7 @@
     MessageStore* const store;
     MessageBuilder messageBuilder;//builder for in-progress message
     bool opened;
+    bool flowActive;
     boost::scoped_ptr<BrokerAdapter> adapter;
 
 	// completion handler for MessageBuilder
@@ -147,6 +148,7 @@
     void ack(uint64_t deliveryTag, bool multiple);
     void ack(uint64_t deliveryTag, uint64_t endTag);
     void recover(bool requeue);
+    void flow(bool active);
     void deliver(Message::shared_ptr& msg, const string& consumerTag, uint64_t deliveryTag);            
     void handlePublish(Message* msg);
     void handleHeader(boost::shared_ptr<framing::AMQHeaderBody>);

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerQueue.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerQueue.cpp?view=diff&rev=544879&r1=544878&r2=544879
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerQueue.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerQueue.cpp Wed Jun  6 09:39:03 2007
@@ -78,10 +78,7 @@
     if(consumers.empty()){
         return false;
     }else if(exclusive){
-        if(!exclusive->deliver(msg)){
-            QPID_LOG(warning, "Dropping undeliverable message from queue with exclusive consumer.");
-        }
-        return true;
+        return exclusive->deliver(msg);
     }else{
         //deliver to next consumer
         next = next % consumers.size();

Propchange: incubator/qpid/trunk/qpid/cpp/src/tests/
------------------------------------------------------------------------------
--- svn:ignore (original)
+++ svn:ignore Wed Jun  6 09:39:03 2007
@@ -13,3 +13,4 @@
 .valgrindrc
 logging
 .valgrind.supp
+interop_runner

Added: incubator/qpid/trunk/qpid/cpp/src/tests/BasicP2PTest.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/tests/BasicP2PTest.cpp?view=auto&rev=544879
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/tests/BasicP2PTest.cpp (added)
+++ incubator/qpid/trunk/qpid/cpp/src/tests/BasicP2PTest.cpp Wed Jun  6 09:39:03 2007
@@ -0,0 +1,66 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "BasicP2PTest.h"
+
+using namespace qpid;
+using namespace qpid::client;
+
+class BasicP2PTest::Receiver : public Worker, public MessageListener
+{
+    const std::string queue;
+    std::string tag;
+public:
+    Receiver(TestOptions& options, const std::string& _queue, const int _messages) 
+        : Worker(options, _messages), queue(_queue){}
+    void init()
+    {
+        Queue q(queue, true);
+        channel.declareQueue(q);
+        framing::FieldTable args;
+        channel.bind(Exchange::STANDARD_DIRECT_EXCHANGE, q, queue, args);
+        channel.consume(q, tag, this);
+        channel.start();
+    }
+
+    void start()
+    {
+    }
+        
+    void received(Message&)
+    {
+        count++;
+    }
+};
+
+void BasicP2PTest::assign(const std::string& role, framing::FieldTable& params, TestOptions& options)
+{
+    std::string queue = params.getString("P2P_QUEUE_AND_KEY_NAME");
+    int messages = params.getInt("P2P_NUM_MESSAGES");
+    if (role == "SENDER") {
+        worker = std::auto_ptr<Worker>(new Sender(options, Exchange::STANDARD_DIRECT_EXCHANGE, queue, messages));
+    } else if(role == "RECEIVER"){
+        worker = std::auto_ptr<Worker>(new Receiver(options, queue, messages));
+    } else {
+        throw Exception("unrecognised role");
+    }
+    worker->init();
+}

Propchange: incubator/qpid/trunk/qpid/cpp/src/tests/BasicP2PTest.cpp
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/qpid/trunk/qpid/cpp/src/tests/BasicP2PTest.cpp
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Added: incubator/qpid/trunk/qpid/cpp/src/tests/BasicP2PTest.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/tests/BasicP2PTest.h?view=auto&rev=544879
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/tests/BasicP2PTest.h (added)
+++ incubator/qpid/trunk/qpid/cpp/src/tests/BasicP2PTest.h Wed Jun  6 09:39:03 2007
@@ -0,0 +1,46 @@
+#ifndef _BasicP2PTest_
+#define _BasicP2PTest_
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include <memory>
+#include <sstream>
+
+#include "qpid/Exception.h"
+#include "qpid/client/ClientChannel.h"
+#include "qpid/client/ClientMessage.h"
+#include "qpid/client/Connection.h"
+#include "qpid/client/MessageListener.h"
+#include "SimpleTestCaseBase.h"
+
+
+namespace qpid {
+
+class BasicP2PTest : public SimpleTestCaseBase
+{
+    class Receiver;
+public:
+    void assign(const std::string& role, framing::FieldTable& params, TestOptions& options);
+};
+
+}
+
+#endif

Propchange: incubator/qpid/trunk/qpid/cpp/src/tests/BasicP2PTest.h
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/qpid/trunk/qpid/cpp/src/tests/BasicP2PTest.h
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Added: incubator/qpid/trunk/qpid/cpp/src/tests/BasicPubSubTest.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/tests/BasicPubSubTest.cpp?view=auto&rev=544879
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/tests/BasicPubSubTest.cpp (added)
+++ incubator/qpid/trunk/qpid/cpp/src/tests/BasicPubSubTest.cpp Wed Jun  6 09:39:03 2007
@@ -0,0 +1,121 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "BasicPubSubTest.h"
+
+using namespace qpid;
+
+class BasicPubSubTest::Receiver : public Worker, public MessageListener
+{
+    const Exchange& exchange;
+    const std::string queue;
+    const std::string key;
+    std::string tag;
+public:
+    Receiver(TestOptions& options, const Exchange& _exchange, const std::string& _queue, const std::string& _key, const int _messages) 
+        : Worker(options, _messages), exchange(_exchange), queue(_queue), key(_key){}
+
+    void init()
+    {
+        Queue q(queue, true);
+        channel.declareQueue(q);
+        framing::FieldTable args;
+        channel.bind(exchange, q, key, args);
+        channel.consume(q, tag, this);
+        channel.start();
+    }
+
+    void start(){
+    }
+        
+    void received(Message&)
+    {
+        count++;
+    }
+};
+
+class BasicPubSubTest::MultiReceiver : public Worker, public MessageListener
+{
+    typedef boost::ptr_vector<Receiver> ReceiverList;
+    ReceiverList receivers;
+
+public:
+    MultiReceiver(TestOptions& options, const Exchange& exchange, const std::string& key, const int _messages, int receiverCount) 
+        : Worker(options, _messages) 
+    {
+        for (int i = 0; i != receiverCount; i++) {                
+            std::string queue = (boost::format("%1%_%2%") % options.clientid % i).str();
+            receivers.push_back(new Receiver(options, exchange, queue, key, _messages));
+        }
+    }
+
+    void init()
+    {
+        for (ReceiverList::size_type i = 0; i != receivers.size(); i++) {
+            receivers[i].init();
+        }
+    }
+
+    void start()
+    {
+        for (ReceiverList::size_type i = 0; i != receivers.size(); i++) {
+            receivers[i].start();
+        }
+    }
+        
+    void received(Message& msg)
+    {
+        for (ReceiverList::size_type i = 0; i != receivers.size(); i++) {
+            receivers[i].received(msg);
+        }
+    }
+
+    virtual int getCount()
+    {
+        count = 0;
+        for (ReceiverList::size_type i = 0; i != receivers.size(); i++) {
+            count += receivers[i].getCount();
+        }
+        return count;
+    }
+    virtual void stop()
+    {
+        for (ReceiverList::size_type i = 0; i != receivers.size(); i++) {
+            receivers[i].stop();
+        }
+    }
+};
+
+void BasicPubSubTest::assign(const std::string& role, framing::FieldTable& params, TestOptions& options)
+{
+    std::string key = params.getString("PUBSUB_KEY");
+    int messages = params.getInt("PUBSUB_NUM_MESSAGES");
+    int receivers = params.getInt("PUBSUB_NUM_RECEIVERS");
+    if (role == "SENDER") {
+        worker = std::auto_ptr<Worker>(new Sender(options, Exchange::STANDARD_TOPIC_EXCHANGE, key, messages));
+    } else if(role == "RECEIVER"){
+        worker = std::auto_ptr<Worker>(new MultiReceiver(options, Exchange::STANDARD_TOPIC_EXCHANGE, key, messages, receivers));
+    } else {
+        throw Exception("unrecognised role");
+    }
+    worker->init();
+}
+

Propchange: incubator/qpid/trunk/qpid/cpp/src/tests/BasicPubSubTest.cpp
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/qpid/trunk/qpid/cpp/src/tests/BasicPubSubTest.cpp
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Added: incubator/qpid/trunk/qpid/cpp/src/tests/BasicPubSubTest.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/tests/BasicPubSubTest.h?view=auto&rev=544879
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/tests/BasicPubSubTest.h (added)
+++ incubator/qpid/trunk/qpid/cpp/src/tests/BasicPubSubTest.h Wed Jun  6 09:39:03 2007
@@ -0,0 +1,51 @@
+#ifndef _BasicPubSubTest_
+#define _BasicPubSubTest_
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include <memory>
+#include <sstream>
+
+#include "qpid/Exception.h"
+#include "qpid/client/ClientChannel.h"
+#include "qpid/client/ClientMessage.h"
+#include "qpid/client/Connection.h"
+#include "qpid/client/MessageListener.h"
+#include "SimpleTestCaseBase.h"
+#include <boost/ptr_container/ptr_vector.hpp>
+#include <boost/format.hpp>
+
+
+namespace qpid {
+
+using namespace qpid::client;
+
+class BasicPubSubTest : public SimpleTestCaseBase
+{
+    class Receiver;
+    class MultiReceiver;
+public:
+    void assign(const std::string& role, framing::FieldTable& params, TestOptions& options);
+};
+
+}
+
+#endif

Propchange: incubator/qpid/trunk/qpid/cpp/src/tests/BasicPubSubTest.h
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/qpid/trunk/qpid/cpp/src/tests/BasicPubSubTest.h
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Modified: incubator/qpid/trunk/qpid/cpp/src/tests/BrokerChannelTest.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/tests/BrokerChannelTest.cpp?view=diff&rev=544879&r1=544878&r2=544879
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/tests/BrokerChannelTest.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/tests/BrokerChannelTest.cpp Wed Jun  6 09:39:03 2007
@@ -57,6 +57,7 @@
     CPPUNIT_TEST(testDeliveryAndRecovery);
     CPPUNIT_TEST(testStaging);
     CPPUNIT_TEST(testQueuePolicy);
+    CPPUNIT_TEST(testFlow);
     CPPUNIT_TEST_SUITE_END();
 
     Broker::shared_ptr broker;
@@ -331,6 +332,42 @@
 
         }
         store.check();
+    }
+
+    void testFlow(){
+        Channel channel(connection, 7, 10000);
+        channel.open();
+        //there will always be a connection-start frame
+        CPPUNIT_ASSERT_EQUAL((size_t) 1, handler.frames.size());
+        CPPUNIT_ASSERT_EQUAL(ChannelId(0), handler.frames[0].getChannel());
+        CPPUNIT_ASSERT(dynamic_cast<ConnectionStartBody*>(
+                           handler.frames[0].getBody().get()));
+        
+        const string data("abcdefghijklmn");
+
+        Message::shared_ptr msg(createMessage("test", "my_routing_key", "my_message_id", 14));
+        addContent(msg, data);
+        Queue::shared_ptr queue(new Queue("my_queue"));
+        ConnectionToken* owner(0);
+        string tag("no_ack");
+        channel.consume(tag, queue, false, false, owner);
+        channel.flow(false);
+        queue->deliver(msg);
+        //ensure no more frames have been delivered
+        CPPUNIT_ASSERT_EQUAL((size_t) 1, handler.frames.size());
+        CPPUNIT_ASSERT_EQUAL((u_int32_t) 1, queue->getMessageCount());        
+        channel.flow(true);
+        CPPUNIT_ASSERT_EQUAL((size_t) 4, handler.frames.size());
+        CPPUNIT_ASSERT_EQUAL((u_int16_t) 7, handler.frames[1].getChannel());        
+        CPPUNIT_ASSERT_EQUAL((u_int16_t) 7, handler.frames[2].getChannel());        
+        CPPUNIT_ASSERT_EQUAL((u_int16_t) 7, handler.frames[3].getChannel());
+        BasicDeliverBody::shared_ptr deliver(dynamic_pointer_cast<BasicDeliverBody, AMQBody>(handler.frames[1].getBody()));
+        AMQHeaderBody::shared_ptr contentHeader(dynamic_pointer_cast<AMQHeaderBody, AMQBody>(handler.frames[2].getBody()));
+        AMQContentBody::shared_ptr contentBody(dynamic_pointer_cast<AMQContentBody, AMQBody>(handler.frames[3].getBody()));
+        CPPUNIT_ASSERT(deliver);
+        CPPUNIT_ASSERT(contentHeader);
+        CPPUNIT_ASSERT(contentBody);
+        CPPUNIT_ASSERT_EQUAL(data, contentBody->getData());
     }
 
     Message* createMessage(const string& exchange, const string& routingKey, const string& messageId, uint64_t contentSize)

Modified: incubator/qpid/trunk/qpid/cpp/src/tests/Makefile.am
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/tests/Makefile.am?view=diff&rev=544879&r1=544878&r2=544879
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/tests/Makefile.am (original)
+++ incubator/qpid/trunk/qpid/cpp/src/tests/Makefile.am Wed Jun  6 09:39:03 2007
@@ -63,7 +63,7 @@
   topic_listener	\
   topic_publisher
 
-check_PROGRAMS = $(UNIT_TESTS) $(testprogs)
+check_PROGRAMS = $(UNIT_TESTS) $(testprogs) interop_runner
 
 # FIXME aconway 2007-05-30: TESTS_ENVIRONMENT should have ./run_test
 # as below to run valgrind on all test programs.
@@ -137,3 +137,15 @@
 
 CLEANFILES=valgrind.out qpidd.log .valgrindrc .valgrind.supp
 MAINTAINERCLEANFILES=gen.mk
+
+interop_runner_SOURCES = 	\
+  interop_runner.cpp	 	\
+  SimpleTestCaseBase.cpp	\
+  BasicP2PTest.cpp		\
+  BasicPubSubTest.cpp		\
+  SimpleTestCaseBase.h		\
+  BasicP2PTest.h		\
+  BasicPubSubTest.h		\
+  TestCase.h			\
+  TestOptions.h
+interop_runner_LDADD = $(lib_client) $(lib_common) $(extra_libs)

Added: incubator/qpid/trunk/qpid/cpp/src/tests/SimpleTestCaseBase.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/tests/SimpleTestCaseBase.cpp?view=auto&rev=544879
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/tests/SimpleTestCaseBase.cpp (added)
+++ incubator/qpid/trunk/qpid/cpp/src/tests/SimpleTestCaseBase.cpp Wed Jun  6 09:39:03 2007
@@ -0,0 +1,87 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "SimpleTestCaseBase.h"
+
+using namespace qpid;
+
+void SimpleTestCaseBase::start()
+{
+    if (worker.get()) {
+        worker->start();
+    }
+}
+
+void SimpleTestCaseBase::stop()
+{
+    if (worker.get()) {
+        worker->stop();
+    }
+}
+
+void SimpleTestCaseBase::report(client::Message& report)
+{
+    if (worker.get()) {
+        report.getHeaders().setInt("MESSAGE_COUNT", worker->getCount());
+        //add number of messages sent or received
+        std::stringstream reportstr;
+        reportstr << worker->getCount();
+        report.setData(reportstr.str());
+    }
+}
+
+SimpleTestCaseBase::Sender::Sender(TestOptions& options, 
+                                   const Exchange& _exchange, 
+                                   const std::string& _key, 
+                                   const int _messages) 
+    : Worker(options, _messages), exchange(_exchange), key(_key) {}
+
+void SimpleTestCaseBase::Sender::init()
+{
+    channel.start();
+}
+
+void SimpleTestCaseBase::Sender::start(){
+    Message msg;
+    while (count < messages) {
+        channel.publish(msg, exchange, key);
+        count++;
+    }
+    stop();
+}
+
+SimpleTestCaseBase::Worker::Worker(TestOptions& options, const int _messages) : 
+    connection(options.trace), messages(_messages), count(0)
+{
+    connection.open(options.broker, options.port);
+    connection.openChannel(channel);
+}
+            
+void SimpleTestCaseBase::Worker::stop()
+{
+    channel.close();
+    connection.close();
+}
+
+int SimpleTestCaseBase::Worker::getCount()
+{
+    return count;
+}
+

Propchange: incubator/qpid/trunk/qpid/cpp/src/tests/SimpleTestCaseBase.cpp
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/qpid/trunk/qpid/cpp/src/tests/SimpleTestCaseBase.cpp
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Added: incubator/qpid/trunk/qpid/cpp/src/tests/SimpleTestCaseBase.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/tests/SimpleTestCaseBase.h?view=auto&rev=544879
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/tests/SimpleTestCaseBase.h (added)
+++ incubator/qpid/trunk/qpid/cpp/src/tests/SimpleTestCaseBase.h Wed Jun  6 09:39:03 2007
@@ -0,0 +1,88 @@
+#ifndef _SimpleTestCaseBase_
+#define _SimpleTestCaseBase_
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include <memory>
+#include <sstream>
+
+#include "qpid/Exception.h"
+#include "qpid/client/ClientChannel.h"
+#include "qpid/client/ClientMessage.h"
+#include "qpid/client/Connection.h"
+#include "qpid/client/MessageListener.h"
+#include "TestCase.h"
+
+
+namespace qpid {
+
+using namespace qpid::client;
+
+class SimpleTestCaseBase : public TestCase
+{
+protected:
+    class Worker
+    {
+    protected:
+        client::Connection connection;
+        client::Channel channel;
+        const int messages;
+        int count;
+
+    public:
+
+        Worker(TestOptions& options, const int messages);
+        virtual ~Worker(){}
+
+        virtual void stop();
+        virtual int getCount();
+        virtual void init() = 0;
+        virtual void start() = 0;
+    };
+
+    class Sender : public Worker
+    {
+        const Exchange& exchange;
+        const std::string key;
+    public:
+        Sender(TestOptions& options, 
+               const Exchange& exchange, 
+               const std::string& key, 
+               const int messages); 
+        void init();
+        void start();
+    };
+
+    std::auto_ptr<Worker> worker;
+
+public:
+    virtual void assign(const std::string& role, framing::FieldTable& params, TestOptions& options) = 0;
+    
+    virtual ~SimpleTestCaseBase() {}
+
+    void start();
+    void stop();
+    void report(client::Message& report);
+};
+
+}
+
+#endif

Propchange: incubator/qpid/trunk/qpid/cpp/src/tests/SimpleTestCaseBase.h
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/qpid/trunk/qpid/cpp/src/tests/SimpleTestCaseBase.h
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Added: incubator/qpid/trunk/qpid/cpp/src/tests/TestCase.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/tests/TestCase.h?view=auto&rev=544879
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/tests/TestCase.h (added)
+++ incubator/qpid/trunk/qpid/cpp/src/tests/TestCase.h Wed Jun  6 09:39:03 2007
@@ -0,0 +1,64 @@
+#ifndef _TestCase_
+#define _TestCase_
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "qpid/client/ClientMessage.h"
+#include "TestOptions.h"
+
+
+namespace qpid {
+
+/**
+ * Interface to be implemented by test cases for use with the test
+ * runner.
+ */
+class TestCase
+{
+public:
+    /**
+     * Directs the test case to act in a particular role. Some roles
+     * may be 'activated' at this stage others may require an explicit
+     * start request.
+     */
+    virtual void assign(const std::string& role, framing::FieldTable& params, TestOptions& options) = 0;
+    /**
+     * Each test will be started on its own thread, which should block
+     * until the test completes (this may or may not require an
+     * explicit stop() request).
+     */
+    virtual void start() = 0;
+    /**
+     * Requests that the test be stopped if still running.
+     */
+    virtual void stop() = 0;
+    /**
+     * Allows the test to fill in details on the final report
+     * message. Will be called only after start has returned.
+     */
+    virtual void report(client::Message& report) = 0;
+
+    virtual ~TestCase() {}
+};
+
+}
+
+#endif

Propchange: incubator/qpid/trunk/qpid/cpp/src/tests/TestCase.h
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/qpid/trunk/qpid/cpp/src/tests/TestCase.h
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Added: incubator/qpid/trunk/qpid/cpp/src/tests/TestOptions.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/tests/TestOptions.h?view=auto&rev=544879
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/tests/TestOptions.h (added)
+++ incubator/qpid/trunk/qpid/cpp/src/tests/TestOptions.h Wed Jun  6 09:39:03 2007
@@ -0,0 +1,69 @@
+#ifndef _TestOptions_
+#define _TestOptions_
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "qpid/CommonOptions.h"
+
+namespace qpid {
+
+struct TestOptions : public qpid::CommonOptions
+{
+    TestOptions() : desc("Options"), broker("localhost"), virtualhost(""), clientid("cpp"), help(false)
+    {
+        using namespace qpid::program_options;
+        using namespace boost::program_options;
+        CommonOptions::addTo(desc);        
+        desc.add_options()
+            ("broker,b", optValue(broker, "HOSTNAME"), "the hostname to connect to")
+            ("virtualhost,v", optValue(virtualhost, "VIRTUAL_HOST"), "virtual host")
+            ("clientname,n", optValue(clientid, "ID"), "unique client identifier")
+            ("help,h", optValue(help), "print this usage statement");
+    }
+
+    void parse(int argc, char** argv)
+    {
+        using namespace boost::program_options;
+        try {
+            variables_map vm;
+            store(parse_command_line(argc, argv, desc), vm);
+            notify(vm);
+        } catch(const error& e) {
+            std::cerr << "Error: " << e.what() << std::endl
+                      << "Specify '--help' for usage." << std::endl;
+        }
+    }
+
+    void usage()
+    {
+        std::cout << desc << std::endl; 
+    }
+
+    boost::program_options::options_description desc;
+    std::string broker;      
+    std::string virtualhost;
+    std::string clientid;            
+    bool help;
+};
+
+}
+
+#endif

Propchange: incubator/qpid/trunk/qpid/cpp/src/tests/TestOptions.h
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/qpid/trunk/qpid/cpp/src/tests/TestOptions.h
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Added: incubator/qpid/trunk/qpid/cpp/src/tests/interop_runner.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/tests/interop_runner.cpp?view=auto&rev=544879
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/tests/interop_runner.cpp (added)
+++ incubator/qpid/trunk/qpid/cpp/src/tests/interop_runner.cpp Wed Jun  6 09:39:03 2007
@@ -0,0 +1,252 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "qpid/CommonOptions.h"
+#include "qpid/Exception.h"
+#include "qpid/QpidError.h"
+#include "qpid/client/ClientChannel.h"
+#include "qpid/client/Connection.h"
+#include "qpid/client/ClientExchange.h"
+#include "qpid/client/MessageListener.h"
+#include "qpid/client/ClientQueue.h"
+#include "qpid/sys/Thread.h"
+#include "qpid/sys/Time.h"
+#include <iostream>
+#include <memory>
+#include "BasicP2PTest.h"
+#include "BasicPubSubTest.h"
+#include "TestCase.h"
+#include <boost/ptr_container/ptr_map.hpp>
+
+/**
+ * Framework for interop tests.
+ * 
+ * [see http://cwiki.apache.org/confluence/display/qpid/Interop+Testing+Specification for details].
+ */
+
+using namespace qpid::client;
+using namespace qpid::sys;
+using qpid::TestCase;
+using qpid::TestOptions;
+using qpid::framing::FieldTable;
+using std::string;
+
+class DummyRun : public TestCase
+{
+public:
+    DummyRun() {}
+    void assign(const std::string&, FieldTable&, TestOptions&) {}
+    void start() {}
+    void stop() {}
+    void report(qpid::client::Message&) {}
+};
+
+string parse_next_word(const string& input, const string& delims, string::size_type& position);
+
+/**
+ */
+class Listener : public MessageListener, private Runnable{    
+    typedef boost::ptr_map<std::string, TestCase> TestMap;
+
+    Channel& channel;
+    TestOptions& options;
+    TestMap tests;
+    const string name;
+    const string topic;
+    TestMap::iterator test;
+    std::auto_ptr<Thread> runner;
+    string reportTo;
+    string reportCorrelator;    
+
+    void shutdown();
+    bool invite(const string& name);
+    void run();
+
+    void sendResponse(Message& response, string replyTo);
+    void sendResponse(Message& response, Message& request);
+    void sendSimpleResponse(const string& type, Message& request);
+    void sendReport();
+public:
+    Listener(Channel& channel, TestOptions& options);
+    void received(Message& msg);
+    void bindAndConsume();
+    void registerTest(std::string name, TestCase* test);
+};
+
+/**
+ */
+int main(int argc, char** argv){
+    TestOptions options;
+    options.parse(argc, argv);
+
+    if (options.help) {
+        options.usage();
+    } else {
+        try{
+            Connection connection(options.trace);
+            connection.open(options.broker, options.port, "guest", "guest", options.virtualhost);
+            
+            Channel channel;
+            connection.openChannel(channel);
+            
+            Listener listener(channel, options);
+            listener.registerTest("TC1_DummyRun", new DummyRun());
+            listener.registerTest("TC2_BasicP2P", new qpid::BasicP2PTest());
+            listener.registerTest("TC3_BasicPubSub", new qpid::BasicPubSubTest());
+
+            listener.bindAndConsume();
+            
+            channel.run();
+            connection.close();
+        } catch(qpid::Exception error) {
+            std::cout << error.what() << std::endl;
+        }
+    }
+}
+
+Listener::Listener(Channel& _channel, TestOptions& _options) : channel(_channel), options(_options), name(options.clientid), topic("iop.control." + name)
+{}
+
+void Listener::registerTest(std::string name, TestCase* test)
+{
+    tests.insert(name, test);
+}
+
+void Listener::bindAndConsume()
+{
+    Queue control(name, true);
+    channel.declareQueue(control);
+    qpid::framing::FieldTable bindArgs;
+    //replace these separate binds with a wildcard once that is supported on java broker
+    channel.bind(Exchange::STANDARD_TOPIC_EXCHANGE, control, "iop.control", bindArgs);
+    channel.bind(Exchange::STANDARD_TOPIC_EXCHANGE, control, topic, bindArgs);
+    
+    std::string tag;
+    channel.consume(control, tag, this);
+}
+
+void Listener::sendSimpleResponse(const string& type, Message& request)
+{
+    Message response;
+    response.getHeaders().setString("CONTROL_TYPE", type);
+    response.getHeaders().setString("CLIENT_NAME", name);
+    response.getHeaders().setString("CLIENT_PRIVATE_CONTROL_KEY", topic);
+    response.setCorrelationId(request.getCorrelationId());
+    sendResponse(response, request);
+}
+
+void Listener::sendResponse(Message& response, Message& request)
+{
+    sendResponse(response, request.getReplyTo()); 
+}
+
+void Listener::sendResponse(Message& response, string replyTo)
+{
+    //Exchange and routing key need to be extracted from the reply-to
+    //field. Format is assumed to be:
+    //
+    //    <exchange type>://<exchange name>/<routing key>?<options>
+    //
+    //and all we need is the exchange name and routing key
+    // 
+    if (replyTo.empty()) throw qpid::Exception("Reply address not set!"); 
+    const string delims(":/?=");
+
+    string::size_type start = replyTo.find(':');//skip exchange type
+    string exchange = parse_next_word(replyTo, delims, start);
+    string routingKey = parse_next_word(replyTo, delims, start);
+    channel.publish(response, exchange, routingKey);
+}
+
+void Listener::received(Message& message)
+{
+    std::string type(message.getHeaders().getString("CONTROL_TYPE"));
+
+    if (type == "INVITE") {
+        std::string name(message.getHeaders().getString("TEST_NAME"));
+        if (name.empty() || invite(name)) {
+            sendSimpleResponse("ENLIST", message);
+        } else {
+            std::cout << "Can't take part in '" << name << "'" << std::endl;
+        }
+    } else if (type == "ASSIGN_ROLE") {        
+        test->assign(message.getHeaders().getString("ROLE"), message.getHeaders(), options);
+        sendSimpleResponse("ACCEPT_ROLE", message);
+    } else if (type == "START") {        
+        reportTo = message.getReplyTo();
+        reportCorrelator = message.getCorrelationId();
+        runner = std::auto_ptr<Thread>(new Thread(this));
+    } else if (type == "STATUS_REQUEST") {
+        reportTo = message.getReplyTo();
+        reportCorrelator = message.getCorrelationId();
+        test->stop();
+        sendReport();
+    } else if (type == "TERMINATE") {
+        if (test != tests.end()) test->stop();
+        shutdown();
+    } else {        
+        std::cerr <<"ERROR!: Received unknown control message: " << type << std::endl;
+        shutdown();
+    }
+}
+
+void Listener::shutdown()
+{
+    channel.close();
+}
+
+bool Listener::invite(const string& name)
+{
+    test = tests.find(name);
+    return test != tests.end();
+}
+
+void Listener::run()
+{
+    //NB: this method will be called in its own thread 
+    //start test and when start returns...
+    test->start();
+    sendReport();
+}
+
+void Listener::sendReport()
+{
+    Message report;
+    report.getHeaders().setString("CONTROL_TYPE", "REPORT");
+    test->report(report);
+    report.setCorrelationId(reportCorrelator);
+    sendResponse(report, reportTo);
+}
+
+string parse_next_word(const string& input, const string& delims, string::size_type& position)
+{
+    string::size_type start = input.find_first_not_of(delims, position);
+    if (start == string::npos) {
+        return "";
+    } else {
+        string::size_type end = input.find_first_of(delims, start);
+        if (end == string::npos) {
+            end = input.length();
+        }
+        position = end;
+        return input.substr(start, end - start);
+    }
+}

Propchange: incubator/qpid/trunk/qpid/cpp/src/tests/interop_runner.cpp
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/qpid/trunk/qpid/cpp/src/tests/interop_runner.cpp
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Modified: incubator/qpid/trunk/qpid/python/tests_0-9/broker.py
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/python/tests_0-9/broker.py?view=diff&rev=544879&r1=544878&r2=544879
==============================================================================
--- incubator/qpid/trunk/qpid/python/tests_0-9/broker.py (original)
+++ incubator/qpid/trunk/qpid/python/tests_0-9/broker.py Wed Jun  6 09:39:03 2007
@@ -114,3 +114,20 @@
         self.assertEqual(reply.method.klass.name, "channel")
         self.assertEqual(reply.method.name, "ok")
         #todo: provide a way to get notified of incoming pongs...
+
+    def test_channel_flow(self):
+        channel = self.channel
+        channel.queue_declare(queue="flow_test_queue", exclusive=True)
+        channel.message_consume(destination="my-tag", queue="flow_test_queue")
+        incoming = self.client.queue("my-tag")
+        
+        channel.channel_flow(active=False)        
+        channel.message_transfer(routing_key="flow_test_queue", body="abcdefghijklmnopqrstuvwxyz")
+        try:
+            incoming.get(timeout=1) 
+            self.fail("Received message when flow turned off.")
+        except Empty: None
+        
+        channel.channel_flow(active=True)
+        msg = incoming.get(timeout=1)
+        self.assertEqual("abcdefghijklmnopqrstuvwxyz", msg.body)