You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by cc...@apache.org on 2009/01/30 19:59:24 UTC

svn commit: r739378 - in /qpid/trunk/qpid/cpp/src: qpid/broker/Message.cpp qpid/broker/Message.h qpid/client/Connection.cpp qpid/client/Connection.h qpid/client/SessionImpl.cpp qpid/client/SessionImpl.h qpid/cluster/UpdateClient.cpp tests/consume.cpp

Author: cctrieloff
Date: Fri Jan 30 18:59:24 2009
New Revision: 739378

URL: http://svn.apache.org/viewvc?rev=739378&view=rev
Log:
Correction for: start a broker in cluster, send messages that are flow to disk, then join a broker to the cluster. Then consume from the new node. Cotent released messages where loosing content. This patch corrects that.

Modified:
    qpid/trunk/qpid/cpp/src/qpid/broker/Message.cpp
    qpid/trunk/qpid/cpp/src/qpid/broker/Message.h
    qpid/trunk/qpid/cpp/src/qpid/client/Connection.cpp
    qpid/trunk/qpid/cpp/src/qpid/client/Connection.h
    qpid/trunk/qpid/cpp/src/qpid/client/SessionImpl.cpp
    qpid/trunk/qpid/cpp/src/qpid/client/SessionImpl.h
    qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateClient.cpp
    qpid/trunk/qpid/cpp/src/tests/consume.cpp

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/Message.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/Message.cpp?rev=739378&r1=739377&r2=739378&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/Message.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/Message.cpp Fri Jan 30 18:59:24 2009
@@ -197,30 +197,39 @@
     }
 }
 
-void Message::sendContent(Queue& queue, framing::FrameHandler& out, uint16_t maxFrameSize) const
+bool Message::getContentFrame(const Queue& queue, AMQFrame& frame, uint16_t maxContentSize, uint64_t offset) const
 {
     if (isContentReleased()) {
-        //load content from store in chunks of maxContentSize
-        uint16_t maxContentSize = maxFrameSize - AMQFrame::frameOverhead();
         intrusive_ptr<const PersistableMessage> pmsg(this);
         
         bool done = false;
-        for (uint64_t offset = 0; !done; offset += maxContentSize)
+        string& data = frame.castBody<AMQContentBody>()->getData();
+        store->loadContent(queue, pmsg, data, offset, maxContentSize);
+        done = data.size() < maxContentSize;
+        frame.setBof(false);
+        frame.setEof(true);
+        QPID_LOG(debug, "loaded frame" << frame);
+        if (offset > 0) {
+            frame.setBos(false);
+        }
+        if (!done) {
+            frame.setEos(false);
+        } else return false;
+        return true;
+    }
+    else return false;
+}
+
+void Message::sendContent(const Queue& queue, framing::FrameHandler& out, uint16_t maxFrameSize) const
+{
+    if (isContentReleased()) {
+
+        uint16_t maxContentSize = maxFrameSize - AMQFrame::frameOverhead();
+        bool morecontent = true;
+        for (uint64_t offset = 0; morecontent; offset += maxContentSize)
         {            
             AMQFrame frame((AMQContentBody()));
-            string& data = frame.castBody<AMQContentBody>()->getData();
-
-            store->loadContent(queue, pmsg, data, offset, maxContentSize);
-            done = data.size() < maxContentSize;
-            frame.setBof(false);
-            frame.setEof(true);
-            if (offset > 0) {
-                frame.setBos(false);
-            }
-            if (!done) {
-                frame.setEos(false);
-            }
-            QPID_LOG(debug, "loaded frame for delivery: " << frame);
+            morecontent = getContentFrame(queue, frame, maxContentSize, offset);
             out.handle(frame);
         }
     } else {

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/Message.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/Message.h?rev=739378&r1=739377&r2=739378&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/Message.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/Message.h Fri Jan 30 18:59:24 2009
@@ -131,7 +131,8 @@
     void releaseContent(MessageStore* store);
     void destroy();
 
-    void sendContent(Queue& queue, framing::FrameHandler& out, uint16_t maxFrameSize) const;
+    bool getContentFrame(const Queue& queue, framing::AMQFrame& frame, uint16_t maxContentSize, uint64_t offset) const;
+    void sendContent(const Queue& queue, framing::FrameHandler& out, uint16_t maxFrameSize) const;
     void sendHeader(framing::FrameHandler& out, uint16_t maxFrameSize) const;
 
     bool isContentLoaded() const;

Modified: qpid/trunk/qpid/cpp/src/qpid/client/Connection.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/client/Connection.cpp?rev=739378&r1=739377&r2=739378&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/client/Connection.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/client/Connection.cpp Fri Jan 30 18:59:24 2009
@@ -119,6 +119,13 @@
         impl->registerFailureCallback ( failureCallback );
 }
 
+const ConnectionSettings& Connection::getNegotiatedSettings()
+{
+    if (!isOpen())
+        throw Exception(QPID_MSG("Connection is not open."));
+     return impl->getNegotiatedSettings();
+}
+
 Session Connection::newSession(const std::string& name, uint32_t timeout) {
     if (!isOpen())
         throw Exception(QPID_MSG("Connection has not yet been opened"));

Modified: qpid/trunk/qpid/cpp/src/qpid/client/Connection.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/client/Connection.h?rev=739378&r1=739377&r2=739378&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/client/Connection.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/client/Connection.h Fri Jan 30 18:59:24 2009
@@ -175,6 +175,11 @@
     std::vector<Url> getKnownBrokers();
     void registerFailureCallback ( boost::function<void ()> fn );
 
+    /**
+     * Return the set of client negotiated settings
+     */
+    const ConnectionSettings& getNegotiatedSettings();
+
   friend class ConnectionAccess; ///<@internal
   friend class SessionBase_0_10; ///<@internal
 };

Modified: qpid/trunk/qpid/cpp/src/qpid/client/SessionImpl.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/client/SessionImpl.cpp?rev=739378&r1=739377&r2=739378&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/client/SessionImpl.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/client/SessionImpl.cpp Fri Jan 30 18:59:24 2009
@@ -322,6 +322,11 @@
     return f;
 }
 
+void SessionImpl::sendRawFrame(AMQFrame& frame) {
+    Acquire a(sendLock);
+    handleOut(frame);
+}
+
 Future SessionImpl::sendCommand(const AMQBody& command, const MethodContent* content)
 {
     Acquire a(sendLock);
@@ -767,4 +772,9 @@
         connectionShared = connectionWeak.lock();
 }
 
+shared_ptr<ConnectionImpl> SessionImpl::getConnection()
+{
+    return connectionWeak.lock();
+}
+
 }}

Modified: qpid/trunk/qpid/cpp/src/qpid/client/SessionImpl.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/client/SessionImpl.h?rev=739378&r1=739377&r2=739378&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/client/SessionImpl.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/client/SessionImpl.h Fri Jan 30 18:59:24 2009
@@ -87,6 +87,7 @@
     Future send(const framing::AMQBody& command);
     Future send(const framing::AMQBody& command, const framing::MethodContent& content);
     Future send(const framing::AMQBody& command, const framing::FrameSet& content);
+    void sendRawFrame(framing::AMQFrame& frame);
 
     Demux& getDemux();
     void markCompleted(const framing::SequenceNumber& id, bool cumulative, bool notifyPeer);
@@ -114,6 +115,11 @@
      */
     void setWeakPtr(bool weak=true);
 
+    /** 
+     * get the Connection associated with this connection
+     */
+    shared_ptr<ConnectionImpl> getConnection();
+
 private:
     enum State {
         INACTIVE,
@@ -204,7 +210,6 @@
     const uint64_t maxFrameSize;
     const SessionId id;
 
-    shared_ptr<ConnectionImpl> connection();
     shared_ptr<ConnectionImpl> connectionShared;
     boost::weak_ptr<ConnectionImpl> connectionWeak;
     bool weakPtr;

Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateClient.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateClient.cpp?rev=739378&r1=739377&r2=739378&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateClient.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateClient.cpp Fri Jan 30 18:59:24 2009
@@ -168,6 +168,7 @@
         session.exchangeUnbind(queue, UpdateClient::UPDATE);
     }
 
+
     void updateQueuedMessage(const broker::QueuedMessage& message) {
         if (!haveLastPos || message.position - lastPos != 1)  {
             ClusterConnectionProxy(session).queuePosition(queue, message.position.getValue()-1);
@@ -177,12 +178,27 @@
         SessionBase_0_10Access sb(session);
         framing::MessageTransferBody transfer(
             framing::ProtocolVersion(), UpdateClient::UPDATE, message::ACCEPT_MODE_NONE, message::ACQUIRE_MODE_PRE_ACQUIRED);
+        
         sb.get()->send(transfer, message.payload->getFrames());
+        if (message.payload->isContentReleased()){
+            uint16_t maxFrameSize = sb.get()->getConnection()->getNegotiatedSettings().maxFrameSize;
+
+            uint16_t maxContentSize = maxFrameSize - AMQFrame::frameOverhead();
+            bool morecontent = true;
+            for (uint64_t offset = 0; morecontent; offset += maxContentSize)
+            {            
+                AMQFrame frame((AMQContentBody()));
+                morecontent = message.payload->getContentFrame(*(message.queue), frame, maxContentSize, offset);
+                sb.get()->sendRawFrame(frame);
+            }
+        }
     }
 
     void updateMessage(const boost::intrusive_ptr<broker::Message>& message) {
         updateQueuedMessage(broker::QueuedMessage(0, message, haveLastPos? lastPos.getValue()+1 : 1));
     }
+    
+   
 };
 
 

Modified: qpid/trunk/qpid/cpp/src/tests/consume.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/consume.cpp?rev=739378&r1=739377&r2=739378&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/consume.cpp (original)
+++ qpid/trunk/qpid/cpp/src/tests/consume.cpp Fri Jan 30 18:59:24 2009
@@ -44,15 +44,19 @@
     string queue;
     bool declare;
     bool summary;
+    bool print;
+    bool durable;
     
     Args() : count(1000), ack(0), queue("publish-consume"),
-             declare(false), summary(false)
+             declare(false), summary(false), print(false)
     {
         addOptions()
             ("count", optValue(count, "N"), "number of messages to publish")
             ("ack-frequency", optValue(ack, "N"), "ack every N messages (0 means use no-ack mode)")
             ("queue", optValue(queue, "<queue name>"), "queue to consume from")
             ("declare", optValue(declare), "declare the queue")
+            ("durable", optValue(durable), "declare the queue durable, use with declare")
+            ("print-data", optValue(print), "Print the recieved data at info level")
             ("s,summary", optValue(summary), "Print undecorated rate.");
     }
 };
@@ -73,7 +77,7 @@
     void consume()
     {
         if (opts.declare)
-            session.queueDeclare(opts.queue);
+            session.queueDeclare(arg::queue=opts.queue, arg::durable=opts.durable);
         SubscriptionManager subs(session);
         LocalQueue lq;
         SubscriptionSettings settings;
@@ -85,6 +89,7 @@
         for (size_t i = 0; i < opts.count; ++i) {
             msg=lq.pop();
             QPID_LOG(info, "Received: " << msg.getMessageProperties().getCorrelationId());
+            if (opts.print) QPID_LOG(info, "Data: " << msg.getData());
         }
         if (opts.ack != 0)
             sub.accept(sub.getUnaccepted()); // Cumulative ack for final batch.



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org