You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ac...@apache.org on 2007/01/15 19:28:32 UTC

svn commit: r496425 - in /incubator/qpid/branches/qpid.0-9/cpp: docs/api/ lib/broker/ lib/common/framing/ tests/

Author: aconway
Date: Mon Jan 15 10:28:29 2007
New Revision: 496425

URL: http://svn.apache.org/viewvc?view=rev&rev=496425
Log:

* Refactor: Moved major broker components (exchanges, queues etc.) from
  class SessionHandlerImplFactory to more logical class Broker.

Modified:
    incubator/qpid/branches/qpid.0-9/cpp/docs/api/developer.doxygen
    incubator/qpid/branches/qpid.0-9/cpp/lib/broker/Broker.cpp
    incubator/qpid/branches/qpid.0-9/cpp/lib/broker/Broker.h
    incubator/qpid/branches/qpid.0-9/cpp/lib/broker/SessionHandlerFactoryImpl.cpp
    incubator/qpid/branches/qpid.0-9/cpp/lib/broker/SessionHandlerFactoryImpl.h
    incubator/qpid/branches/qpid.0-9/cpp/lib/broker/SessionHandlerImpl.cpp
    incubator/qpid/branches/qpid.0-9/cpp/lib/broker/SessionHandlerImpl.h
    incubator/qpid/branches/qpid.0-9/cpp/lib/common/framing/Responder.cpp
    incubator/qpid/branches/qpid.0-9/cpp/lib/common/framing/Responder.h
    incubator/qpid/branches/qpid.0-9/cpp/tests/FramingTest.cpp

Modified: incubator/qpid/branches/qpid.0-9/cpp/docs/api/developer.doxygen
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/cpp/docs/api/developer.doxygen?view=diff&rev=496425&r1=496424&r2=496425
==============================================================================
--- incubator/qpid/branches/qpid.0-9/cpp/docs/api/developer.doxygen (original)
+++ incubator/qpid/branches/qpid.0-9/cpp/docs/api/developer.doxygen Mon Jan 15 10:28:29 2007
@@ -300,7 +300,7 @@
 # in case and if your file system supports case sensitive file names. Windows 
 # and Mac users are advised to set this option to NO.
 
-CASE_SENSE_NAMES       = YES
+CASE_SENSE_NAMES       = NO
 
 # If the HIDE_SCOPE_NAMES tag is set to NO (the default) then Doxygen 
 # will show members with their full class and namespace scopes in the 
@@ -560,7 +560,7 @@
 # Note: To get rid of all source code in the generated output, make sure also 
 # VERBATIM_HEADERS is set to NO.
 
-SOURCE_BROWSER         = NO
+SOURCE_BROWSER         = YES
 
 # Setting the INLINE_SOURCES tag to YES will include the body 
 # of functions and classes directly in the documentation.

Modified: incubator/qpid/branches/qpid.0-9/cpp/lib/broker/Broker.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/cpp/lib/broker/Broker.cpp?view=diff&rev=496425&r1=496424&r2=496425
==============================================================================
--- incubator/qpid/branches/qpid.0-9/cpp/lib/broker/Broker.cpp (original)
+++ incubator/qpid/branches/qpid.0-9/cpp/lib/broker/Broker.cpp Mon Jan 15 10:28:29 2007
@@ -20,19 +20,61 @@
  */
 #include <iostream>
 #include <memory>
-#include <Broker.h>
 
-
-using namespace qpid::broker;
-using namespace qpid::sys;
+#include "AMQFrame.h"
+#include "DirectExchange.h"
+#include "FanOutExchange.h"
+#include "HeadersExchange.h"
+#include "MessageStoreModule.h"
+#include "NullMessageStore.h"
+#include "ProtocolInitiation.h"
+#include "SessionHandlerImpl.h"
+#include "sys/SessionContext.h"
+#include "sys/SessionHandler.h"
+#include "sys/SessionHandlerFactory.h"
+#include "sys/TimeoutHandler.h"
+
+#include "Broker.h"
+
+namespace qpid {
+namespace broker {
+
+const std::string empty;
+const std::string amq_direct("amq.direct");
+const std::string amq_topic("amq.topic");
+const std::string amq_fanout("amq.fanout");
+const std::string amq_match("amq.match");
 
 Broker::Broker(const Configuration& config) :
     acceptor(Acceptor::create(config.getPort(),
                               config.getConnectionBacklog(),
                               config.getWorkerThreads(),
                               config.isTrace())),
-    factory(config.getStore())
-{ }
+    queues(store.get()),
+    timeout(30000),
+    stagingThreshold(0),
+    cleaner(&queues, timeout/10),
+    factory(*this)
+{
+    if (config.getStore().empty())
+        store.reset(new NullMessageStore());
+    else
+        store.reset(new MessageStoreModule(config.getStore()));
+
+    exchanges.declare(empty, DirectExchange::typeName); // Default exchange.
+    exchanges.declare(amq_direct, DirectExchange::typeName);
+    exchanges.declare(amq_topic, TopicExchange::typeName);
+    exchanges.declare(amq_fanout, FanOutExchange::typeName);
+    exchanges.declare(amq_match, HeadersExchange::typeName);
+
+    if(store.get()) {
+        RecoveryManager recoverer(queues, exchanges);
+        MessageStoreSettings storeSettings = { getStagingThreshold() };
+        store->recover(recoverer, &storeSettings);
+    }
+
+    cleaner.start();
+}
 
 
 Broker::shared_ptr Broker::create(int16_t port) 
@@ -57,3 +99,6 @@
 Broker::~Broker() { }
 
 const int16_t Broker::DEFAULT_PORT(5672);
+
+
+}} // namespace qpid::broker

Modified: incubator/qpid/branches/qpid.0-9/cpp/lib/broker/Broker.h
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/cpp/lib/broker/Broker.h?view=diff&rev=496425&r1=496424&r2=496425
==============================================================================
--- incubator/qpid/branches/qpid.0-9/cpp/lib/broker/Broker.h (original)
+++ incubator/qpid/branches/qpid.0-9/cpp/lib/broker/Broker.h Mon Jan 15 10:28:29 2007
@@ -27,6 +27,8 @@
 #include <sys/Runnable.h>
 #include <sys/Acceptor.h>
 #include <SharedObject.h>
+#include <MessageStore.h>
+#include <AutoDelete.h>
 
 namespace qpid {
 namespace broker {
@@ -69,13 +71,27 @@
     /** Shut down the broker */
     virtual void shutdown();
 
+    MessageStore& getStore() { return *store; }
+    QueueRegistry& getQueues() { return queues; }
+    ExchangeRegistry& getExchanges() { return exchanges; }
+    u_int32_t getTimeout() { return timeout; }
+    u_int64_t getStagingThreshold() { return stagingThreshold; }
+    AutoDelete& getCleaner() { return cleaner; }
+    
   private:
     Broker(const Configuration& config); 
+
     qpid::sys::Acceptor::shared_ptr acceptor;
+    std::auto_ptr<MessageStore> store;
+    QueueRegistry queues;
+    ExchangeRegistry exchanges;
+    u_int32_t timeout;
+    u_int64_t stagingThreshold;
+    AutoDelete cleaner;
     SessionHandlerFactoryImpl factory;
 };
-}
-}
+
+}}
             
 
 

Modified: incubator/qpid/branches/qpid.0-9/cpp/lib/broker/SessionHandlerFactoryImpl.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/cpp/lib/broker/SessionHandlerFactoryImpl.cpp?view=diff&rev=496425&r1=496424&r2=496425
==============================================================================
--- incubator/qpid/branches/qpid.0-9/cpp/lib/broker/SessionHandlerFactoryImpl.cpp (original)
+++ incubator/qpid/branches/qpid.0-9/cpp/lib/broker/SessionHandlerFactoryImpl.cpp Mon Jan 15 10:28:29 2007
@@ -19,51 +19,25 @@
  *
  */
 #include <SessionHandlerFactoryImpl.h>
-
-#include <DirectExchange.h>
-#include <FanOutExchange.h>
-#include <HeadersExchange.h>
-#include <MessageStoreModule.h>
-#include <NullMessageStore.h>
 #include <SessionHandlerImpl.h>
 
-using namespace qpid::broker;
-using namespace qpid::sys;
+namespace qpid {
+namespace broker {
 
-namespace
-{
-const std::string empty;
-const std::string amq_direct("amq.direct");
-const std::string amq_topic("amq.topic");
-const std::string amq_fanout("amq.fanout");
-const std::string amq_match("amq.match");
-}
 
-SessionHandlerFactoryImpl::SessionHandlerFactoryImpl(const std::string& _store, u_int64_t _stagingThreshold, u_int32_t _timeout) : 
-    store(_store.empty() ? (MessageStore*)  new NullMessageStore() : (MessageStore*) new MessageStoreModule(_store)), 
-    queues(store.get()), settings(_timeout, _stagingThreshold), cleaner(&queues, _timeout/10)
-{
-    exchanges.declare(empty, DirectExchange::typeName); // Default exchange.
-    exchanges.declare(amq_direct, DirectExchange::typeName);
-    exchanges.declare(amq_topic, TopicExchange::typeName);
-    exchanges.declare(amq_fanout, FanOutExchange::typeName);
-    exchanges.declare(amq_match, HeadersExchange::typeName);
-
-    if(store.get()) {
-        RecoveryManager recoverer(queues, exchanges);
-        MessageStoreSettings storeSettings = { settings.stagingThreshold };
-        store->recover(recoverer, &storeSettings);
-    }
+SessionHandlerFactoryImpl::SessionHandlerFactoryImpl(Broker& b) : broker(b)
+{}
 
-    cleaner.start();
-}
 
-SessionHandler* SessionHandlerFactoryImpl::create(SessionContext* ctxt)
+SessionHandlerFactoryImpl::~SessionHandlerFactoryImpl()
 {
-    return new SessionHandlerImpl(ctxt, &queues, &exchanges, &cleaner, settings);
+    broker.getCleaner().stop();
 }
 
-SessionHandlerFactoryImpl::~SessionHandlerFactoryImpl()
+qpid::sys::SessionHandler*
+SessionHandlerFactoryImpl::create(qpid::sys::SessionContext* ctxt)
 {
-    cleaner.stop();
+    return new SessionHandlerImpl(ctxt, broker);
 }
+
+}} // namespace qpid::broker

Modified: incubator/qpid/branches/qpid.0-9/cpp/lib/broker/SessionHandlerFactoryImpl.h
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/cpp/lib/broker/SessionHandlerFactoryImpl.h?view=diff&rev=496425&r1=496424&r2=496425
==============================================================================
--- incubator/qpid/branches/qpid.0-9/cpp/lib/broker/SessionHandlerFactoryImpl.h (original)
+++ incubator/qpid/branches/qpid.0-9/cpp/lib/broker/SessionHandlerFactoryImpl.h Mon Jan 15 10:28:29 2007
@@ -21,37 +21,26 @@
 #ifndef _SessionHandlerFactoryImpl_
 #define _SessionHandlerFactoryImpl_
 
-#include <AutoDelete.h>
-#include <ExchangeRegistry.h>
-#include <MessageStore.h>
-#include <QueueRegistry.h>
-#include <AMQFrame.h>
-#include <ProtocolInitiation.h>
-#include <sys/SessionContext.h>
-#include <sys/SessionHandler.h>
-#include <sys/SessionHandlerFactory.h>
-#include <sys/TimeoutHandler.h>
-#include <SessionHandlerImpl.h>
-#include <memory>
+#include "SessionHandlerFactory.h"
 
 namespace qpid {
-    namespace broker {
+namespace broker {
+class Broker;
 
-        class SessionHandlerFactoryImpl : public virtual qpid::sys::SessionHandlerFactory
-        {
-            std::auto_ptr<MessageStore> store;
-            QueueRegistry queues;
-            ExchangeRegistry exchanges;
-            const Settings settings;
-            AutoDelete cleaner;
-        public:
-            SessionHandlerFactoryImpl(const std::string& store = "", u_int64_t stagingThreshold = 0, u_int32_t timeout = 30000);
-            virtual qpid::sys::SessionHandler* create(qpid::sys::SessionContext* ctxt);
-            virtual ~SessionHandlerFactoryImpl();
-        };
+class SessionHandlerFactoryImpl : public qpid::sys::SessionHandlerFactory
+{
+  public:
+    SessionHandlerFactoryImpl(Broker& b);
+            
+    virtual qpid::sys::SessionHandler* create(qpid::sys::SessionContext* ctxt);
+            
+    virtual ~SessionHandlerFactoryImpl();
 
-    }
-}
+  private:
+    Broker& broker;
+};
+
+}}
 
 
 #endif

Modified: incubator/qpid/branches/qpid.0-9/cpp/lib/broker/SessionHandlerImpl.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/cpp/lib/broker/SessionHandlerImpl.cpp?view=diff&rev=496425&r1=496424&r2=496425
==============================================================================
--- incubator/qpid/branches/qpid.0-9/cpp/lib/broker/SessionHandlerImpl.cpp (original)
+++ incubator/qpid/branches/qpid.0-9/cpp/lib/broker/SessionHandlerImpl.cpp Mon Jan 15 10:28:29 2007
@@ -26,21 +26,22 @@
 #include "assert.h"
 
 using namespace boost;
-using namespace qpid::broker;
 using namespace qpid::sys;
 using namespace qpid::framing;
 using namespace qpid::sys;
 
-SessionHandlerImpl::SessionHandlerImpl(SessionContext* _context, 
-                                       QueueRegistry* _queues, 
-                                       ExchangeRegistry* _exchanges, 
-                                       AutoDelete* _cleaner,
-                                       const Settings& _settings) :
+namespace qpid {
+namespace broker {
+
+SessionHandlerImpl::SessionHandlerImpl(
+    SessionContext* _context, Broker& broker) :
+
     context(_context), 
-    queues(_queues), 
-    exchanges(_exchanges),
-    cleaner(_cleaner),
-    settings(_settings),
+    client(0),
+    queues(broker.getQueues()), 
+    exchanges(broker.getExchanges()),
+    cleaner(broker.getCleaner()),
+    settings(broker.getTimeout(), broker.getStagingThreshold()),
     basicHandler(new BasicHandlerImpl(this)),
     channelHandler(new ChannelHandlerImpl(this)),
     connectionHandler(new ConnectionHandlerImpl(this)),
@@ -49,10 +50,8 @@
     txHandler(new TxHandlerImpl(this)),
     messageHandler(new MessageHandlerImpl(this)),
     framemax(65536), 
-    heartbeat(0){
-    
-    client =NULL;    
-}
+    heartbeat(0)
+{}
 
 SessionHandlerImpl::~SessionHandlerImpl(){
 
@@ -75,7 +74,7 @@
         queue = getChannel(channel)->getDefaultQueue();
         if (!queue) throw ConnectionException( 530, "Queue must be specified or previously declared" );
     } else {
-        queue = queues->find(name);
+        queue = queues.find(name);
         if (queue == 0) {
             throw ChannelException( 404, "Queue not found: " + name);
         }
@@ -85,7 +84,7 @@
 
 
 Exchange::shared_ptr SessionHandlerImpl::findExchange(const string& name){
-    return exchanges->get(name);
+    return exchanges.get(name);
 }
 
 void SessionHandlerImpl::received(qpid::framing::AMQFrame* frame){
@@ -96,8 +95,10 @@
     switch(body->type())
     {
       case REQUEST_BODY:
+        //        responder.received(frame);
       case RESPONSE_BODY:
-      case METHOD_BODY:
+        // requester.received(frame);
+      case METHOD_BODY:         // 
         method = dynamic_pointer_cast<AMQMethodBody, AMQBody>(body);
         try{
             method->invoke(*this, channel);
@@ -164,7 +165,7 @@
         }
         for(queue_iterator i = exclusiveQueues.begin(); i < exclusiveQueues.end(); i = exclusiveQueues.begin()){
             string name = (*i)->getName();
-            queues->destroy(name);
+            queues.destroy(name);
             exclusiveQueues.erase(i);
         }
     } catch(std::exception& e) {
@@ -221,7 +222,7 @@
     
     parent->channels[channel] = new Channel(
         parent->client->getProtocolVersion() , parent->context, channel,
-        parent->framemax, parent->queues->getStore(),
+        parent->framemax, parent->queues.getStore(),
         parent->settings.stagingThreshold);
 
     // FIXME aconway 2007-01-04: provide valid channel Id as per ampq 0-9
@@ -251,12 +252,12 @@
                                                       const FieldTable& /*arguments*/){
 
     if(passive){
-        if(!parent->exchanges->get(exchange)){
+        if(!parent->exchanges.get(exchange)){
             throw ChannelException(404, "Exchange not found: " + exchange);            
         }
     }else{        
         try{
-            std::pair<Exchange::shared_ptr, bool> response = parent->exchanges->declare(exchange, type);
+            std::pair<Exchange::shared_ptr, bool> response = parent->exchanges.declare(exchange, type);
             if(!response.second && response.first->getType() != type){
                 throw ConnectionException(507, "Exchange already declared to be of type " 
                                           + response.first->getType() + ", requested " + type);
@@ -288,7 +289,7 @@
                                                       const string& exchange, bool /*ifUnused*/, bool nowait){
 
     //TODO: implement unused
-    parent->exchanges->destroy(exchange);
+    parent->exchanges.destroy(exchange);
     if(!nowait) parent->client->getExchange().deleteOk(channel);
 } 
 
@@ -300,7 +301,7 @@
 	queue = parent->getQueue(name, channel);
     } else {
 	std::pair<Queue::shared_ptr, bool> queue_created =  
-            parent->queues->declare(name, durable, autoDelete ? parent->settings.timeout : 0, exclusive ? parent : 0);
+            parent->queues.declare(name, durable, autoDelete ? parent->settings.timeout : 0, exclusive ? parent : 0);
 	queue = queue_created.first;
 	assert(queue);
 	if (queue_created.second) { // This is a new queue
@@ -310,11 +311,11 @@
             queue_created.first->create(arguments);
 
 	    //add default binding:
-	    parent->exchanges->getDefault()->bind(queue, name, 0);
+	    parent->exchanges.getDefault()->bind(queue, name, 0);
 	    if (exclusive) {
 		parent->exclusiveQueues.push_back(queue);
 	    } else if(autoDelete){
-		parent->cleaner->add(queue);
+		parent->cleaner.add(queue);
 	    }
 	}
     }
@@ -332,7 +333,7 @@
                                                 const FieldTable& arguments){
 
     Queue::shared_ptr queue = parent->getQueue(queueName, channel);
-    Exchange::shared_ptr exchange = parent->exchanges->get(exchangeName);
+    Exchange::shared_ptr exchange = parent->exchanges.get(exchangeName);
     if(exchange){
 // kpvdr - cannot use this any longer as routingKey is now const
 //        if(routingKey.empty() && queueName.empty()) routingKey = queue->getName();
@@ -369,7 +370,7 @@
         }
         count = q->getMessageCount();
         q->destroy();
-        parent->queues->destroy(queue);
+        parent->queues.destroy(queue);
     }
 
     if(!nowait) parent->client->getQueue().deleteOk(channel, count);
@@ -424,7 +425,7 @@
                                                    const string& exchangeName, const string& routingKey, 
                                                    bool mandatory, bool immediate){
 
-    Exchange::shared_ptr exchange = exchangeName.empty() ? parent->exchanges->getDefault() : parent->exchanges->get(exchangeName);
+    Exchange::shared_ptr exchange = exchangeName.empty() ? parent->exchanges.getDefault() : parent->exchanges.get(exchangeName);
     if(exchange){
         Message* msg = new Message(parent, exchangeName, routingKey, mandatory, immediate);
         parent->getChannel(channel)->handlePublish(msg, exchange);
@@ -652,3 +653,4 @@
         assert(0);                // FIXME astitcher 2007-01-11: 0-9 feature
 }
  
+}}

Modified: incubator/qpid/branches/qpid.0-9/cpp/lib/broker/SessionHandlerImpl.h
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/cpp/lib/broker/SessionHandlerImpl.h?view=diff&rev=496425&r1=496424&r2=496425
==============================================================================
--- incubator/qpid/branches/qpid.0-9/cpp/lib/broker/SessionHandlerImpl.h (original)
+++ incubator/qpid/branches/qpid.0-9/cpp/lib/broker/SessionHandlerImpl.h Mon Jan 15 10:28:29 2007
@@ -40,6 +40,7 @@
 #include <sys/SessionHandler.h>
 #include <sys/TimeoutHandler.h>
 #include <TopicExchange.h>
+#include "Broker.h"
 
 namespace qpid {
 namespace broker {
@@ -77,11 +78,10 @@
 
     qpid::sys::SessionContext* context;
     qpid::framing::AMQP_ClientProxy* client;
-    QueueRegistry* queues;
-    ExchangeRegistry* const exchanges;
-    AutoDelete* const cleaner;
-    const Settings settings;
-
+    QueueRegistry& queues;
+    ExchangeRegistry& exchanges;
+    AutoDelete& cleaner;
+    Settings settings;
     std::auto_ptr<BasicHandler> basicHandler;
     std::auto_ptr<ChannelHandler> channelHandler;
     std::auto_ptr<ConnectionHandler> connectionHandler;
@@ -112,8 +112,7 @@
     Exchange::shared_ptr findExchange(const string& name);
     
   public:
-    SessionHandlerImpl(qpid::sys::SessionContext* context, QueueRegistry* queues, 
-                       ExchangeRegistry* exchanges, AutoDelete* cleaner, const Settings& settings);
+    SessionHandlerImpl(qpid::sys::SessionContext* context, Broker& broker);
     virtual void received(qpid::framing::AMQFrame* frame);
     virtual void initiated(qpid::framing::ProtocolInitiation* header);
     virtual void idleOut();

Modified: incubator/qpid/branches/qpid.0-9/cpp/lib/common/framing/Responder.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/cpp/lib/common/framing/Responder.cpp?view=diff&rev=496425&r1=496424&r2=496425
==============================================================================
--- incubator/qpid/branches/qpid.0-9/cpp/lib/common/framing/Responder.cpp (original)
+++ incubator/qpid/branches/qpid.0-9/cpp/lib/common/framing/Responder.cpp Mon Jan 15 10:28:29 2007
@@ -30,9 +30,9 @@
     responseMark = request.responseMark;
 }
 
-void Responder::sending(AMQResponseBody::Data& response, RequestId toRequest) {
+void Responder::sending(AMQResponseBody::Data& response) {
     response.responseId = ++lastId;
-    response.requestId = toRequest;
+    // response.requestId should have been set by caller.
     response.batchOffset = 0;
 }
 

Modified: incubator/qpid/branches/qpid.0-9/cpp/lib/common/framing/Responder.h
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/cpp/lib/common/framing/Responder.h?view=diff&rev=496425&r1=496424&r2=496425
==============================================================================
--- incubator/qpid/branches/qpid.0-9/cpp/lib/common/framing/Responder.h (original)
+++ incubator/qpid/branches/qpid.0-9/cpp/lib/common/framing/Responder.h Mon Jan 15 10:28:29 2007
@@ -40,7 +40,7 @@
     void received(const AMQRequestBody::Data& request);
 
     /** Called before sending a response to set respose data.  */
-    void sending(AMQResponseBody::Data& response, RequestId toRequest);
+    void sending(AMQResponseBody::Data& response);
 
     /** Get the ID of the highest response acknowledged by the peer. */
     ResponseId getResponseMark() { return responseMark; }

Modified: incubator/qpid/branches/qpid.0-9/cpp/tests/FramingTest.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/cpp/tests/FramingTest.cpp?view=diff&rev=496425&r1=496424&r2=496425
==============================================================================
--- incubator/qpid/branches/qpid.0-9/cpp/tests/FramingTest.cpp (original)
+++ incubator/qpid/branches/qpid.0-9/cpp/tests/FramingTest.cpp Mon Jan 15 10:28:29 2007
@@ -236,7 +236,8 @@
         q.requestId = 1;
         q.responseMark = 0;
         r.received(q);
-        r.sending(p, q.requestId);
+        p.requestId = q.requestId;
+        r.sending(p);
         CPPUNIT_ASSERT_EQUAL(1ULL, p.responseId);
         CPPUNIT_ASSERT_EQUAL(1ULL, p.requestId);
         CPPUNIT_ASSERT_EQUAL(0U,   p.batchOffset);
@@ -245,9 +246,8 @@
         q.requestId++;
         q.responseMark = 1;
         r.received(q);
-        r.sending(p, q.requestId);
+        r.sending(p);
         CPPUNIT_ASSERT_EQUAL(2ULL, p.responseId);
-        CPPUNIT_ASSERT_EQUAL(2ULL, p.requestId);
         CPPUNIT_ASSERT_EQUAL(0U,   p.batchOffset);
         CPPUNIT_ASSERT_EQUAL(1ULL, r.getResponseMark());