You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ac...@apache.org on 2008/10/16 23:05:45 UTC
svn commit: r705359 - in /incubator/qpid/trunk/qpid/cpp/src/qpid:
broker/SemanticState.cpp broker/SemanticState.h broker/SessionAdapter.cpp
cluster/DumpClient.cpp
Author: aconway
Date: Thu Oct 16 14:05:45 2008
New Revision: 705359
URL: http://svn.apache.org/viewvc?rev=705359&view=rev
Log:
Added missing message.subscribe arguments to SemanticState::ConsumerImpl for replication (and future use.)
Modified:
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.h
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionAdapter.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/DumpClient.cpp
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp?rev=705359&r1=705358&r2=705359&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp Thu Oct 16 14:05:45 2008
@@ -86,11 +86,11 @@
void SemanticState::consume(DeliveryToken::shared_ptr token, string& tagInOut,
Queue::shared_ptr queue, bool nolocal, bool ackRequired, bool acquire,
- bool exclusive, const FieldTable*)
+ bool exclusive, const string& resumeId, uint64_t resumeTtl, const FieldTable& arguments)
{
if(tagInOut.empty())
tagInOut = tagGenerator.generate();
- ConsumerImpl::shared_ptr c(new ConsumerImpl(this, token, tagInOut, queue, ackRequired, nolocal, acquire));
+ ConsumerImpl::shared_ptr c(new ConsumerImpl(this, token, tagInOut, queue, ackRequired, nolocal, acquire, exclusive, resumeId, resumeTtl, arguments));
queue->consume(c, exclusive);//may throw exception
outputTasks.addOutputTask(c.get());
consumers[tagInOut] = c;
@@ -233,13 +233,19 @@
}
SemanticState::ConsumerImpl::ConsumerImpl(SemanticState* _parent,
- DeliveryToken::shared_ptr _token,
- const string& _name,
- Queue::shared_ptr _queue,
- bool ack,
- bool _nolocal,
- bool _acquire
- ) :
+ DeliveryToken::shared_ptr _token,
+ const string& _name,
+ Queue::shared_ptr _queue,
+ bool ack,
+ bool _nolocal,
+ bool _acquire,
+ bool _exclusive,
+ const string& _resumeId,
+ uint64_t _resumeTtl,
+ const framing::FieldTable& _arguments
+
+
+) :
Consumer(_acquire),
parent(_parent),
token(_token),
@@ -249,7 +255,11 @@
nolocal(_nolocal),
acquire(_acquire),
blocked(true),
- windowing(true),
+ windowing(true),
+ exclusive(_exclusive),
+ resumeId(_resumeId),
+ resumeTtl(_resumeTtl),
+ arguments(_arguments),
msgCredit(0),
byteCredit(0),
notifyEnabled(true) {}
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.h?rev=705359&r1=705358&r2=705359&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.h Thu Oct 16 14:05:45 2008
@@ -73,6 +73,10 @@
const bool acquire;
bool blocked;
bool windowing;
+ bool exclusive;
+ string resumeId;
+ uint64_t resumeTtl;
+ framing::FieldTable arguments;
uint32_t msgCredit;
uint32_t byteCredit;
bool notifyEnabled;
@@ -85,7 +89,8 @@
ConsumerImpl(SemanticState* parent, DeliveryToken::shared_ptr token,
const string& name, Queue::shared_ptr queue,
- bool ack, bool nolocal, bool acquire);
+ bool ack, bool nolocal, bool acquire, bool exclusive,
+ const std::string& resumeId, uint64_t resumeTtl, const framing::FieldTable& arguments);
~ConsumerImpl();
OwnershipToken* getSession();
bool deliver(QueuedMessage& msg);
@@ -114,8 +119,12 @@
bool isAckExpected() const { return ackExpected; }
bool isAcquire() const { return acquire; }
bool isWindowing() const { return windowing; }
+ bool isExclusive() const { return exclusive; }
uint32_t getMsgCredit() const { return msgCredit; }
uint32_t getByteCredit() const { return byteCredit; }
+ std::string getResumeId() const { return resumeId; };
+ uint64_t getResumeTtl() const { return resumeTtl; }
+ const framing::FieldTable& getArguments() const { return arguments; }
};
private:
@@ -168,7 +177,9 @@
*@param tagInOut - if empty it is updated with the generated token.
*/
void consume(DeliveryToken::shared_ptr token, string& tagInOut, Queue::shared_ptr queue,
- bool nolocal, bool ackRequired, bool acquire, bool exclusive, const framing::FieldTable* = 0);
+ bool nolocal, bool ackRequired, bool acquire, bool exclusive,
+ const string& resumeId=string(), uint64_t resumeTtl=0,
+ const framing::FieldTable& = framing::FieldTable());
void cancel(const string& tag);
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionAdapter.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionAdapter.cpp?rev=705359&r1=705358&r2=705359&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionAdapter.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionAdapter.cpp Thu Oct 16 14:05:45 2008
@@ -457,13 +457,13 @@
void
SessionAdapter::MessageHandlerImpl::subscribe(const string& queueName,
- const string& destination,
- uint8_t acceptMode,
- uint8_t acquireMode,
- bool exclusive,
- const string& /*resumeId*/,//TODO implement resume behaviour. Need to update cluster.
- uint64_t /*resumeTtl*/,
- const FieldTable& arguments)
+ const string& destination,
+ uint8_t acceptMode,
+ uint8_t acquireMode,
+ bool exclusive,
+ const string& resumeId,
+ uint64_t resumeTtl,
+ const FieldTable& arguments)
{
AclModule* acl = getBroker().getAcl();
@@ -481,7 +481,7 @@
string tag = destination;
state.consume(MessageDelivery::getMessageDeliveryToken(destination, acceptMode, acquireMode),
tag, queue, false, //TODO get rid of no-local
- acceptMode == 0, acquireMode == 0, exclusive, &arguments);
+ acceptMode == 0, acquireMode == 0, exclusive, resumeId, resumeTtl, arguments);
ManagementAgent* agent = ManagementAgent::Singleton::getInstance();
if (agent)
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/DumpClient.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/DumpClient.cpp?rev=705359&r1=705358&r2=705359&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/DumpClient.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/DumpClient.cpp Thu Oct 16 14:05:45 2008
@@ -219,13 +219,10 @@
arg::destination = ci->getName(),
arg::acceptMode = ci->isAckExpected() ? ACCEPT_MODE_EXPLICIT : ACCEPT_MODE_NONE,
arg::acquireMode = ci->isAcquire() ? ACQUIRE_MODE_PRE_ACQUIRED : ACQUIRE_MODE_NOT_ACQUIRED,
- arg::exclusive = false , // FIXME aconway 2008-09-23: duplicate from consumer
-
- // TODO aconway 2008-09-23: remaining args not used by current broker.
- // Update this code when they are.
- arg::resumeId=std::string(),
- arg::resumeTtl=0,
- arg::arguments=FieldTable()
+ arg::exclusive = ci->isExclusive(),
+ arg::resumeId = ci->getResumeId(),
+ arg::resumeTtl = ci->getResumeTtl(),
+ arg::arguments = ci->getArguments()
);
shadowSession.messageSetFlowMode(ci->getName(), ci->isWindowing() ? FLOW_MODE_WINDOW : FLOW_MODE_CREDIT);
shadowSession.messageFlow(ci->getName(), CREDIT_UNIT_MESSAGE, ci->getMsgCredit());