You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ac...@apache.org on 2008/10/16 23:05:45 UTC

svn commit: r705359 - in /incubator/qpid/trunk/qpid/cpp/src/qpid: broker/SemanticState.cpp broker/SemanticState.h broker/SessionAdapter.cpp cluster/DumpClient.cpp

Author: aconway
Date: Thu Oct 16 14:05:45 2008
New Revision: 705359

URL: http://svn.apache.org/viewvc?rev=705359&view=rev
Log:
Added missing message.subscribe arguments to SemanticState::ConsumerImpl for replication (and future use.)

Modified:
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionAdapter.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/DumpClient.cpp

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp?rev=705359&r1=705358&r2=705359&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp Thu Oct 16 14:05:45 2008
@@ -86,11 +86,11 @@
 
 void SemanticState::consume(DeliveryToken::shared_ptr token, string& tagInOut, 
                             Queue::shared_ptr queue, bool nolocal, bool ackRequired, bool acquire,
-                            bool exclusive, const FieldTable*)
+                            bool exclusive, const string& resumeId, uint64_t resumeTtl, const FieldTable& arguments)
 {
     if(tagInOut.empty())
         tagInOut = tagGenerator.generate();
-    ConsumerImpl::shared_ptr c(new ConsumerImpl(this, token, tagInOut, queue, ackRequired, nolocal, acquire));
+    ConsumerImpl::shared_ptr c(new ConsumerImpl(this, token, tagInOut, queue, ackRequired, nolocal, acquire, exclusive, resumeId, resumeTtl, arguments));
     queue->consume(c, exclusive);//may throw exception
     outputTasks.addOutputTask(c.get());
     consumers[tagInOut] = c;
@@ -233,13 +233,19 @@
 }
 
 SemanticState::ConsumerImpl::ConsumerImpl(SemanticState* _parent, 
-                                    DeliveryToken::shared_ptr _token,
-                                    const string& _name, 
-                                    Queue::shared_ptr _queue, 
-                                    bool ack,
-                                    bool _nolocal,
-                                    bool _acquire
-                                    ) : 
+                                          DeliveryToken::shared_ptr _token,
+                                          const string& _name, 
+                                          Queue::shared_ptr _queue, 
+                                          bool ack,
+                                          bool _nolocal,
+                                          bool _acquire,
+                                          bool _exclusive,
+                                          const string& _resumeId,
+                                          uint64_t _resumeTtl,
+                                          const framing::FieldTable& _arguments
+
+
+) : 
     Consumer(_acquire),
     parent(_parent), 
     token(_token), 
@@ -249,7 +255,11 @@
     nolocal(_nolocal),
     acquire(_acquire),
     blocked(true), 
-    windowing(true), 
+    windowing(true),
+    exclusive(_exclusive),
+    resumeId(_resumeId),
+    resumeTtl(_resumeTtl),
+    arguments(_arguments),
     msgCredit(0), 
     byteCredit(0),
     notifyEnabled(true) {}

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.h?rev=705359&r1=705358&r2=705359&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.h Thu Oct 16 14:05:45 2008
@@ -73,6 +73,10 @@
         const bool acquire;
         bool blocked;
         bool windowing;
+        bool exclusive;
+        string resumeId;
+        uint64_t resumeTtl;
+        framing::FieldTable arguments;
         uint32_t msgCredit;
         uint32_t byteCredit;
         bool notifyEnabled;
@@ -85,7 +89,8 @@
 
         ConsumerImpl(SemanticState* parent, DeliveryToken::shared_ptr token, 
                      const string& name, Queue::shared_ptr queue,
-                     bool ack, bool nolocal, bool acquire);
+                     bool ack, bool nolocal, bool acquire, bool exclusive,
+                     const std::string& resumeId, uint64_t resumeTtl, const framing::FieldTable& arguments);
         ~ConsumerImpl();
         OwnershipToken* getSession();
         bool deliver(QueuedMessage& msg);            
@@ -114,8 +119,12 @@
         bool isAckExpected() const { return ackExpected; }
         bool isAcquire() const { return acquire; }
         bool isWindowing() const { return windowing; }
+        bool isExclusive() const { return exclusive; }
         uint32_t getMsgCredit() const { return msgCredit; }
         uint32_t getByteCredit() const { return byteCredit; }
+        std::string getResumeId() const { return resumeId; };
+        uint64_t getResumeTtl() const { return resumeTtl; }
+        const framing::FieldTable& getArguments() const { return arguments; }
     };
 
   private:
@@ -168,7 +177,9 @@
      *@param tagInOut - if empty it is updated with the generated token.
      */
     void consume(DeliveryToken::shared_ptr token, string& tagInOut, Queue::shared_ptr queue, 
-                 bool nolocal, bool ackRequired, bool acquire, bool exclusive, const framing::FieldTable* = 0);
+                 bool nolocal, bool ackRequired, bool acquire, bool exclusive,
+                 const string& resumeId=string(), uint64_t resumeTtl=0,
+                 const framing::FieldTable& = framing::FieldTable());
 
     void cancel(const string& tag);
 

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionAdapter.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionAdapter.cpp?rev=705359&r1=705358&r2=705359&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionAdapter.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionAdapter.cpp Thu Oct 16 14:05:45 2008
@@ -457,13 +457,13 @@
 
 void
 SessionAdapter::MessageHandlerImpl::subscribe(const string& queueName,
-                              const string& destination,
-                              uint8_t acceptMode,
-                              uint8_t acquireMode,
-                              bool exclusive,
-                              const string& /*resumeId*/,//TODO implement resume behaviour. Need to update cluster.
-                              uint64_t /*resumeTtl*/,
-                              const FieldTable& arguments)
+                                              const string& destination,
+                                              uint8_t acceptMode,
+                                              uint8_t acquireMode,
+                                              bool exclusive,
+                                              const string& resumeId,
+                                              uint64_t resumeTtl,
+                                              const FieldTable& arguments)
 {
 
     AclModule* acl = getBroker().getAcl();
@@ -481,7 +481,7 @@
     string tag = destination;
     state.consume(MessageDelivery::getMessageDeliveryToken(destination, acceptMode, acquireMode), 
                   tag, queue, false, //TODO get rid of no-local
-                  acceptMode == 0, acquireMode == 0, exclusive, &arguments);
+                  acceptMode == 0, acquireMode == 0, exclusive, resumeId, resumeTtl, arguments);
 
     ManagementAgent* agent = ManagementAgent::Singleton::getInstance();
     if (agent)

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/DumpClient.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/DumpClient.cpp?rev=705359&r1=705358&r2=705359&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/DumpClient.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/DumpClient.cpp Thu Oct 16 14:05:45 2008
@@ -219,13 +219,10 @@
         arg::destination = ci->getName(),
         arg::acceptMode  = ci->isAckExpected() ? ACCEPT_MODE_EXPLICIT : ACCEPT_MODE_NONE,
         arg::acquireMode = ci->isAcquire() ? ACQUIRE_MODE_PRE_ACQUIRED : ACQUIRE_MODE_NOT_ACQUIRED,
-        arg::exclusive   = false ,  // FIXME aconway 2008-09-23: duplicate from consumer
-
-        // TODO aconway 2008-09-23: remaining args not used by current broker.
-        // Update this code when they are.
-        arg::resumeId=std::string(), 
-        arg::resumeTtl=0,
-        arg::arguments=FieldTable()
+        arg::exclusive   = ci->isExclusive(),
+        arg::resumeId    = ci->getResumeId(),
+        arg::resumeTtl   = ci->getResumeTtl(),
+        arg::arguments   = ci->getArguments()
     );
     shadowSession.messageSetFlowMode(ci->getName(), ci->isWindowing() ? FLOW_MODE_WINDOW : FLOW_MODE_CREDIT);
     shadowSession.messageFlow(ci->getName(), CREDIT_UNIT_MESSAGE, ci->getMsgCredit());