You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ac...@apache.org on 2008/11/12 18:15:21 UTC
svn commit: r713425 - in /incubator/qpid/trunk/qpid/cpp: src/qpid/broker/
src/qpid/cluster/ src/qpid/framing/ src/qpid/log/ src/tests/ xml/
Author: aconway
Date: Wed Nov 12 09:15:20 2008
New Revision: 713425
URL: http://svn.apache.org/viewvc?rev=713425&view=rev
Log:
Cluster replicates queues/exchanges with same encode/decode functions as the store.
Removed un-necessary heap allocation in QPID_LOG statements.
Modified:
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Exchange.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.h
incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/DumpClient.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/framing/Buffer.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/log/Selector.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/log/Selector.h
incubator/qpid/trunk/qpid/cpp/src/qpid/log/Statement.h
incubator/qpid/trunk/qpid/cpp/src/tests/cluster_test.cpp
incubator/qpid/trunk/qpid/cpp/xml/cluster.xml
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Exchange.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Exchange.cpp?rev=713425&r1=713424&r2=713425&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Exchange.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Exchange.cpp Wed Nov 12 09:15:20 2008
@@ -40,6 +40,7 @@
namespace
{
const std::string qpidMsgSequence("qpid.msg_sequence");
+const std::string qpidSequenceCounter("qpid.sequence_counter");
const std::string qpidIVE("qpid.ive");
const std::string qpidFedOp("qpid.fed.op");
const std::string qpidFedTags("qpid.fed.tags");
@@ -119,7 +120,10 @@
}
sequence = _args.get(qpidMsgSequence);
- if (sequence) QPID_LOG(debug, "Configured exchange "+ _name +" with Msg sequencing");
+ if (sequence) {
+ QPID_LOG(debug, "Configured exchange "+ _name +" with Msg sequencing");
+ args.setInt64(std::string(qpidSequenceCounter), sequenceNo);
+ }
ive = _args.get(qpidIVE);
if (ive) QPID_LOG(debug, "Configured exchange "+ _name +" with Initial Value");
@@ -153,7 +157,7 @@
buffer.get(args);
Exchange::shared_ptr exch = exchanges.declare(name, type, durable, args).first;
- exch->sequenceNo = args.getAsInt64("qpid.sequence_counter");
+ exch->sequenceNo = args.getAsInt64(qpidSequenceCounter);
return exch;
}
@@ -162,7 +166,8 @@
buffer.putShortString(name);
buffer.putOctet(durable);
buffer.putShortString(getType());
- if (sequenceNo) args.setInt64(std::string("qpid.sequence_counter"),sequenceNo);
+ if (args.isSet(qpidSequenceCounter))
+ args.setInt64(std::string(qpidSequenceCounter),sequenceNo);
buffer.put(args);
}
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp?rev=713425&r1=713424&r2=713425&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp Wed Nov 12 09:15:20 2008
@@ -29,6 +29,8 @@
#include "qpid/broker/TxAccept.h"
#include "qpid/broker/RecoveredEnqueue.h"
#include "qpid/broker/RecoveredDequeue.h"
+#include "qpid/broker/Exchange.h"
+#include "qpid/broker/Queue.h"
#include "qpid/framing/enum.h"
#include "qpid/framing/AMQFrame.h"
#include "qpid/framing/AllInvoker.h"
@@ -347,6 +349,17 @@
semanticState().setAccumulatedAck(s);
}
+void Connection::exchange(const std::string& encoded) {
+ Buffer buf(const_cast<char*>(encoded.data()), encoded.size());
+ broker::Exchange::shared_ptr ex = broker::Exchange::decode(cluster.getBroker().getExchanges(), buf);
+ QPID_LOG(debug, cluster << " decoded exchange " << ex->getName());
+}
+
+void Connection::queue(const std::string& encoded) {
+ Buffer buf(const_cast<char*>(encoded.data()), encoded.size());
+ broker::Queue::shared_ptr q = broker::Queue::decode(cluster.getBroker().getQueues(), buf);
+ QPID_LOG(debug, cluster << " decoded queue " << q->getName());
+}
}} // namespace qpid::cluster
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.h?rev=713425&r1=713424&r2=713425&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.h Wed Nov 12 09:15:20 2008
@@ -137,6 +137,10 @@
void txEnd();
void accumulatedAck(const qpid::framing::SequenceSet&);
+ // Encoded queue/exchange replication.
+ void queue(const std::string& encoded);
+ void exchange(const std::string& encoded);
+
private:
bool checkUnsupported(const framing::AMQBody& body);
void deliverClose();
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/DumpClient.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/DumpClient.cpp?rev=713425&r1=713424&r2=713425&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/DumpClient.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/DumpClient.cpp Wed Nov 12 09:15:20 2008
@@ -133,14 +133,20 @@
delete this;
}
+namespace {
+template <class T> std::string encode(const T& t) {
+ std::string encoded;
+ encoded.resize(t.encodedSize());
+ framing::Buffer buf(const_cast<char*>(encoded.data()), encoded.size());
+ t.encode(buf);
+ return encoded;
+}
+} // namespace
+
void DumpClient::dumpExchange(const boost::shared_ptr<Exchange>& ex) {
- session.exchangeDeclare(
- ex->getName(), ex->getType(),
- ex->getAlternate() ? ex->getAlternate()->getName() : std::string(),
- arg::passive=false,
- arg::durable=ex->isDurable(),
- arg::autoDelete=false,
- arg::arguments=ex->getArgs());
+ QPID_LOG(debug, dumperId << " dumping exchange " << ex->getName());
+ ClusterConnectionProxy proxy(session);
+ proxy.exchange(encode(*ex));
}
/** Bind a queue to the dump exchange and dump messges to it
@@ -181,14 +187,9 @@
void DumpClient::dumpQueue(const boost::shared_ptr<Queue>& q) {
- session.queueDeclare(
- q->getName(),
- q->getAlternateExchange() ? q->getAlternateExchange()->getName() : std::string(),
- arg::passive=false,
- arg::durable=q->isDurable(),
- arg::exclusive=q->hasExclusiveConsumer(),
- arg::autoDelete=q->isAutoDelete(),
- arg::arguments=q->getSettings());
+ QPID_LOG(debug, dumperId << " dumping queue " << q->getName());
+ ClusterConnectionProxy proxy(session);
+ proxy.queue(encode(*q));
MessageDumper dumper(q->getName(), session);
q->eachMessage(boost::bind(&MessageDumper::dumpQueuedMessage, &dumper, _1));
q->eachBinding(boost::bind(&DumpClient::dumpBinding, this, q->getName(), _1));
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/framing/Buffer.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/framing/Buffer.cpp?rev=713425&r1=713424&r2=713425&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/framing/Buffer.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/framing/Buffer.cpp Wed Nov 12 09:15:20 2008
@@ -51,12 +51,14 @@
void Buffer::putOctet(uint8_t i){
data[position++] = i;
+ assert(position <= size);
}
void Buffer::putShort(uint16_t i){
uint16_t b = i;
data[position++] = (uint8_t) (0xFF & (b >> 8));
data[position++] = (uint8_t) (0xFF & b);
+ assert(position <= size);
}
void Buffer::putLong(uint32_t i){
@@ -65,6 +67,7 @@
data[position++] = (uint8_t) (0xFF & (b >> 16));
data[position++] = (uint8_t) (0xFF & (b >> 8));
data[position++] = (uint8_t) (0xFF & b);
+ assert(position <= size);
}
void Buffer::putLongLong(uint64_t i){
@@ -76,6 +79,7 @@
void Buffer::putInt8(int8_t i){
data[position++] = (uint8_t) i;
+ assert(position <= size);
}
void Buffer::putInt16(int16_t i){
@@ -116,13 +120,16 @@
}
uint8_t Buffer::getOctet(){
- return (uint8_t) data[position++];
+ uint8_t octet = static_cast<uint8_t>(data[position++]);
+ assert(position <= size);
+ return octet;
}
uint16_t Buffer::getShort(){
uint16_t hi = (unsigned char) data[position++];
hi = hi << 8;
hi |= (unsigned char) data[position++];
+ assert(position <= size);
return hi;
}
@@ -131,6 +138,7 @@
uint32_t b = (unsigned char) data[position++];
uint32_t c = (unsigned char) data[position++];
uint32_t d = (unsigned char) data[position++];
+ assert(position <= size);
a = a << 24;
a |= b << 16;
a |= c << 8;
@@ -146,7 +154,9 @@
}
int8_t Buffer::getInt8(){
- return (int8_t) data[position++];
+ int8_t i = static_cast<int8_t>(data[position++]);
+ assert(position <= size);
+ return i;
}
int16_t Buffer::getInt16(){
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/log/Selector.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/log/Selector.cpp?rev=713425&r1=713424&r2=713425&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/log/Selector.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/log/Selector.cpp Wed Nov 12 09:15:20 2008
@@ -52,12 +52,13 @@
boost::bind(&Selector::enable, this, _1));
}
-bool Selector::isEnabled(Level level, const std::string& function) {
+bool Selector::isEnabled(Level level, const char* function) {
+ const char* functionEnd = function+::strlen(function);
for (std::vector<std::string>::iterator i=substrings[level].begin();
i != substrings[level].end();
++i)
{
- if (function.find(*i) != std::string::npos)
+ if (std::search(function, functionEnd, i->begin(), i->end()) != functionEnd)
return true;
}
return false;
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/log/Selector.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/log/Selector.h?rev=713425&r1=713424&r2=713425&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/log/Selector.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/log/Selector.h Wed Nov 12 09:15:20 2008
@@ -57,7 +57,7 @@
void enable(const std::string& enableStr);
/** True if level is enabled for file. */
- bool isEnabled(Level level, const std::string& function);
+ bool isEnabled(Level level, const char* function);
private:
std::vector<std::string> substrings[LevelTraits::COUNT];
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/log/Statement.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/log/Statement.h?rev=713425&r1=713424&r2=713425&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/log/Statement.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/log/Statement.h Wed Nov 12 09:15:20 2008
@@ -102,10 +102,6 @@
* QPID_LOG(error, boost::format("Dohickey %s exploded") % dohicky.name());
* @endcode
*
- * All code with logging statements should be built with
- * -DQPID_COMPONENT=<component name>
- * where component name is the name of the component this file belongs to.
- *
* You can subscribe to log messages by level, by component, by filename
* or a combination @see Configuration.
Modified: incubator/qpid/trunk/qpid/cpp/src/tests/cluster_test.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/tests/cluster_test.cpp?rev=713425&r1=713424&r2=713425&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/tests/cluster_test.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/tests/cluster_test.cpp Wed Nov 12 09:15:20 2008
@@ -209,6 +209,30 @@
uint16_t channel;
};
+int64_t getMsgSequence(const Message& m) {
+ return m.getMessageProperties().getApplicationHeaders().getAsInt64("qpid.msg_sequence");
+}
+
+QPID_AUTO_TEST_CASE(testSequenceOptions) {
+ // Make sure the exchange qpid.msg_sequence property is properly replicated.
+ ClusterFixture cluster(1);
+ Client c0(cluster[0], "c0");
+ FieldTable args;
+ args.setInt("qpid.msg_sequence", 1); // FIXME aconway 2008-11-11: works with "qpid.sequence_counter"??
+ c0.session.queueDeclare(arg::queue="q");
+ c0.session.exchangeDeclare(arg::exchange="ex", arg::type="direct", arg::arguments=args);
+ c0.session.exchangeBind(arg::exchange="ex", arg::queue="q", arg::bindingKey="k");
+ c0.session.messageTransfer(arg::content=Message("1", "k"), arg::destination="ex");
+ c0.session.messageTransfer(arg::content=Message("2", "k"), arg::destination="ex");
+ BOOST_CHECK_EQUAL(1, getMsgSequence(c0.subs.get("q", TIME_SEC)));
+ BOOST_CHECK_EQUAL(2, getMsgSequence(c0.subs.get("q", TIME_SEC)));
+
+ cluster.add();
+ Client c1(cluster[1]);
+ c1.session.messageTransfer(arg::content=Message("3", "k"), arg::destination="ex");
+ BOOST_CHECK_EQUAL(3, getMsgSequence(c1.subs.get("q", TIME_SEC)));
+}
+
QPID_AUTO_TEST_CASE(testUnsupported) {
ScopedSuppressLogging sl;
ClusterFixture cluster(1);
Modified: incubator/qpid/trunk/qpid/cpp/xml/cluster.xml
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/xml/cluster.xml?rev=713425&r1=713424&r2=713425&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/xml/cluster.xml (original)
+++ incubator/qpid/trunk/qpid/cpp/xml/cluster.xml Wed Nov 12 09:15:20 2008
@@ -140,5 +140,10 @@
<field name="position" type="sequence-no"/>
</control>
+ <!-- Replicate encoded exchanges/queues. -->
+ <control name="exchange" code="0x31"><field name="encoded" type="str32"/></control>
+ <control name="queue" code="0x32"><field name="encoded" type="str32"/></control>
+
+
</class>
</amqp>