You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by gs...@apache.org on 2007/12/13 19:44:25 UTC

svn commit: r603973 - in /incubator/qpid/trunk/qpid/cpp/src: qpid/broker/Broker.cpp qpid/broker/Broker.h qpid/broker/Message.cpp qpid/broker/MessageBuilder.h qpid/broker/SemanticHandler.cpp qpid/client/ExecutionHandler.cpp tests/MessageBuilderTest.cpp

Author: gsim
Date: Thu Dec 13 10:44:24 2007
New Revision: 603973

URL: http://svn.apache.org/viewvc?rev=603973&view=rev
Log:
Some fixes relating to message 'staging'.


Modified:
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Message.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageBuilder.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticHandler.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/client/ExecutionHandler.cpp
    incubator/qpid/trunk/qpid/cpp/src/tests/MessageBuilderTest.cpp

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp?rev=603973&r1=603972&r2=603973&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp Thu Dec 13 10:44:24 2007
@@ -124,7 +124,6 @@
     config(conf),
     store(createStore(conf)),
     queues(store.get()),
-    stagingThreshold(0),
     factory(*this),
     dtxManager(store.get()),
     sessionManager(conf.ack)

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.h?rev=603973&r1=603972&r2=603973&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.h Thu Dec 13 10:44:24 2007
@@ -113,7 +113,7 @@
     MessageStore& getStore() { return *store; }
     QueueRegistry& getQueues() { return queues; }
     ExchangeRegistry& getExchanges() { return exchanges; }
-    uint64_t getStagingThreshold() { return stagingThreshold; }
+    uint64_t getStagingThreshold() { return config.stagingThreshold; }
     DtxManager& getDtxManager() { return dtxManager; }
 
     SessionManager& getSessionManager() { return sessionManager; }
@@ -132,7 +132,6 @@
 
     QueueRegistry queues;
     ExchangeRegistry exchanges;
-    uint64_t stagingThreshold;
     ConnectionFactory factory;
     DtxManager dtxManager;
     HandlerUpdaters handlerUpdaters;

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Message.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Message.cpp?rev=603973&r1=603972&r2=603973&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Message.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Message.cpp Thu Dec 13 10:44:24 2007
@@ -177,7 +177,7 @@
             if (offset > 0) {
                 frame.setBos(false);
             }
-            if (remaining) {
+            if (remaining > maxContentSize) {
                 frame.setEos(false);
             }
             out.handle(frame);

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageBuilder.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageBuilder.h?rev=603973&r1=603972&r2=603973&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageBuilder.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageBuilder.h Thu Dec 13 10:44:24 2007
@@ -32,7 +32,7 @@
 
         class MessageBuilder : public framing::FrameHandler{
         public:
-            MessageBuilder(MessageStore* const store = 0, uint64_t stagingThreshold = 0);
+            MessageBuilder(MessageStore* const store, uint64_t stagingThreshold);
             void handle(framing::AMQFrame& frame);
             intrusive_ptr<Message> getMessage() { return message; }
             void start(const framing::SequenceNumber& id);

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticHandler.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticHandler.cpp?rev=603973&r1=603972&r2=603973&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticHandler.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticHandler.cpp Thu Dec 13 10:44:24 2007
@@ -40,6 +40,7 @@
 
 SemanticHandler::SemanticHandler(SessionState& s) : 
     state(*this,s), session(s),
+    msgBuilder(&s.getBroker().getStore(), s.getBroker().getStagingThreshold()),
     ackOp(boost::bind(&SemanticState::ackRange, &state, _1, _2))
  {}
 
@@ -150,7 +151,7 @@
         msg = msgBuilder.getMessage();
     }
     msgBuilder.handle(frame);
-    if (msg->getFrames().isComplete()) {//end of frameset will be indicated by frame flags
+    if (frame.getEof() && frame.getEos()) {//end of frameset will be indicated by frame flags
         msg->setPublisher(&session.getConnection());
         state.handle(msg);        
         msgBuilder.end();

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/ExecutionHandler.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/ExecutionHandler.cpp?rev=603973&r1=603972&r2=603973&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/ExecutionHandler.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/ExecutionHandler.cpp Thu Dec 13 10:44:24 2007
@@ -206,8 +206,7 @@
     if(data_length > 0){
         header.setEof(false);
         out(header);   
-        //frame itself uses 8 bytes
-        u_int32_t frag_size = maxFrameSize - 8;
+        u_int32_t frag_size = maxFrameSize - AMQFrame::frameOverhead();
         if(data_length < frag_size){
             AMQFrame frame(in_place<AMQContentBody>(content.getData()));
             frame.setBof(false);

Modified: incubator/qpid/trunk/qpid/cpp/src/tests/MessageBuilderTest.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/tests/MessageBuilderTest.cpp?rev=603973&r1=603972&r2=603973&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/tests/MessageBuilderTest.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/tests/MessageBuilderTest.cpp Thu Dec 13 10:44:24 2007
@@ -94,7 +94,7 @@
   public:
 
     void testHeaderOnly(){
-        MessageBuilder builder;
+        MessageBuilder builder(0, 0);
         builder.start(SequenceNumber());
 
         std::string exchange("builder-exchange");
@@ -117,7 +117,7 @@
     }
 
     void test1ContentFrame(){
-        MessageBuilder builder;
+        MessageBuilder builder(0, 0);
         builder.start(SequenceNumber());
 
         std::string data("abcdefg");
@@ -149,7 +149,7 @@
     }
 
     void test2ContentFrames(){
-        MessageBuilder builder;
+        MessageBuilder builder(0, 0);
         builder.start(SequenceNumber());
 
         std::string data1("abcdefg");