You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ac...@apache.org on 2008/11/12 18:15:21 UTC

svn commit: r713425 - in /incubator/qpid/trunk/qpid/cpp: src/qpid/broker/ src/qpid/cluster/ src/qpid/framing/ src/qpid/log/ src/tests/ xml/

Author: aconway
Date: Wed Nov 12 09:15:20 2008
New Revision: 713425

URL: http://svn.apache.org/viewvc?rev=713425&view=rev
Log:
Cluster replicates queues/exchanges with same encode/decode functions as the store.

Removed un-necessary heap allocation in QPID_LOG statements.

Modified:
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Exchange.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/DumpClient.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/framing/Buffer.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/log/Selector.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/log/Selector.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/log/Statement.h
    incubator/qpid/trunk/qpid/cpp/src/tests/cluster_test.cpp
    incubator/qpid/trunk/qpid/cpp/xml/cluster.xml

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Exchange.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Exchange.cpp?rev=713425&r1=713424&r2=713425&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Exchange.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Exchange.cpp Wed Nov 12 09:15:20 2008
@@ -40,6 +40,7 @@
 namespace 
 {
 const std::string qpidMsgSequence("qpid.msg_sequence");
+const std::string qpidSequenceCounter("qpid.sequence_counter");
 const std::string qpidIVE("qpid.ive");
 const std::string qpidFedOp("qpid.fed.op");
 const std::string qpidFedTags("qpid.fed.tags");
@@ -119,7 +120,10 @@
     }
 
     sequence = _args.get(qpidMsgSequence);
-    if (sequence) QPID_LOG(debug, "Configured exchange "+ _name +" with Msg sequencing");
+    if (sequence) {
+        QPID_LOG(debug, "Configured exchange "+ _name +" with Msg sequencing");
+        args.setInt64(std::string(qpidSequenceCounter), sequenceNo);
+    }
 
     ive = _args.get(qpidIVE);
     if (ive) QPID_LOG(debug, "Configured exchange "+ _name +" with Initial Value");
@@ -153,7 +157,7 @@
     buffer.get(args);
 
     Exchange::shared_ptr exch = exchanges.declare(name, type, durable, args).first;
-    exch->sequenceNo = args.getAsInt64("qpid.sequence_counter");
+    exch->sequenceNo = args.getAsInt64(qpidSequenceCounter);
     return exch;
 }
 
@@ -162,7 +166,8 @@
     buffer.putShortString(name);
     buffer.putOctet(durable);
     buffer.putShortString(getType());
-    if (sequenceNo) args.setInt64(std::string("qpid.sequence_counter"),sequenceNo);
+    if (args.isSet(qpidSequenceCounter))
+        args.setInt64(std::string(qpidSequenceCounter),sequenceNo);
     buffer.put(args);
 }
 

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp?rev=713425&r1=713424&r2=713425&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp Wed Nov 12 09:15:20 2008
@@ -29,6 +29,8 @@
 #include "qpid/broker/TxAccept.h"
 #include "qpid/broker/RecoveredEnqueue.h"
 #include "qpid/broker/RecoveredDequeue.h"
+#include "qpid/broker/Exchange.h"
+#include "qpid/broker/Queue.h"
 #include "qpid/framing/enum.h"
 #include "qpid/framing/AMQFrame.h"
 #include "qpid/framing/AllInvoker.h"
@@ -347,6 +349,17 @@
     semanticState().setAccumulatedAck(s);
 }
 
+void Connection::exchange(const std::string& encoded) {
+    Buffer buf(const_cast<char*>(encoded.data()), encoded.size());
+    broker::Exchange::shared_ptr ex = broker::Exchange::decode(cluster.getBroker().getExchanges(), buf);
+    QPID_LOG(debug, cluster << " decoded exchange " << ex->getName());    
+}
+
+void Connection::queue(const std::string& encoded) {
+    Buffer buf(const_cast<char*>(encoded.data()), encoded.size());
+    broker::Queue::shared_ptr q = broker::Queue::decode(cluster.getBroker().getQueues(), buf);
+    QPID_LOG(debug, cluster << " decoded queue " << q->getName());    
+}
 
 }} // namespace qpid::cluster
 

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.h?rev=713425&r1=713424&r2=713425&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.h Wed Nov 12 09:15:20 2008
@@ -137,6 +137,10 @@
     void txEnd();
     void accumulatedAck(const qpid::framing::SequenceSet&);
 
+    // Encoded queue/exchange replication.
+    void queue(const std::string& encoded);
+    void exchange(const std::string& encoded);
+    
   private:
     bool checkUnsupported(const framing::AMQBody& body);
     void deliverClose();

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/DumpClient.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/DumpClient.cpp?rev=713425&r1=713424&r2=713425&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/DumpClient.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/DumpClient.cpp Wed Nov 12 09:15:20 2008
@@ -133,14 +133,20 @@
     delete this;
 }
 
+namespace {
+template <class T> std::string encode(const T& t) {
+    std::string encoded;
+    encoded.resize(t.encodedSize());
+    framing::Buffer buf(const_cast<char*>(encoded.data()), encoded.size());
+    t.encode(buf);
+    return encoded;
+}
+} // namespace
+
 void DumpClient::dumpExchange(const boost::shared_ptr<Exchange>& ex) {
-    session.exchangeDeclare(
-        ex->getName(), ex->getType(),
-        ex->getAlternate() ? ex->getAlternate()->getName() : std::string(),
-        arg::passive=false,
-        arg::durable=ex->isDurable(),
-        arg::autoDelete=false,
-        arg::arguments=ex->getArgs());
+    QPID_LOG(debug, dumperId << " dumping exchange " << ex->getName());
+    ClusterConnectionProxy proxy(session);
+    proxy.exchange(encode(*ex));
 }
 
 /** Bind a queue to the dump exchange and dump messges to it
@@ -181,14 +187,9 @@
 
 
 void DumpClient::dumpQueue(const boost::shared_ptr<Queue>& q) {
-    session.queueDeclare(
-        q->getName(),
-        q->getAlternateExchange() ? q->getAlternateExchange()->getName() : std::string(),
-        arg::passive=false,
-        arg::durable=q->isDurable(),
-        arg::exclusive=q->hasExclusiveConsumer(),
-        arg::autoDelete=q->isAutoDelete(),
-        arg::arguments=q->getSettings());
+    QPID_LOG(debug, dumperId << " dumping queue " << q->getName());
+    ClusterConnectionProxy proxy(session);
+    proxy.queue(encode(*q));
     MessageDumper dumper(q->getName(), session);
     q->eachMessage(boost::bind(&MessageDumper::dumpQueuedMessage, &dumper, _1));
     q->eachBinding(boost::bind(&DumpClient::dumpBinding, this, q->getName(), _1));

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/framing/Buffer.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/framing/Buffer.cpp?rev=713425&r1=713424&r2=713425&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/framing/Buffer.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/framing/Buffer.cpp Wed Nov 12 09:15:20 2008
@@ -51,12 +51,14 @@
 
 void Buffer::putOctet(uint8_t i){
     data[position++] = i;
+    assert(position <= size);
 }
 
 void Buffer::putShort(uint16_t i){
     uint16_t b = i;
     data[position++] = (uint8_t) (0xFF & (b >> 8));
     data[position++] = (uint8_t) (0xFF & b);
+    assert(position <= size);
 }
 
 void Buffer::putLong(uint32_t i){
@@ -65,6 +67,7 @@
     data[position++] = (uint8_t) (0xFF & (b >> 16));
     data[position++] = (uint8_t) (0xFF & (b >> 8));
     data[position++] = (uint8_t) (0xFF & b);
+    assert(position <= size);
 }
 
 void Buffer::putLongLong(uint64_t i){
@@ -76,6 +79,7 @@
 
 void Buffer::putInt8(int8_t i){
     data[position++] = (uint8_t) i;
+    assert(position <= size);
 }
 
 void Buffer::putInt16(int16_t i){
@@ -116,13 +120,16 @@
 }
 
 uint8_t Buffer::getOctet(){ 
-    return (uint8_t) data[position++]; 
+    uint8_t octet = static_cast<uint8_t>(data[position++]);
+    assert(position <= size);
+    return octet;
 }
 
 uint16_t Buffer::getShort(){ 
     uint16_t hi = (unsigned char) data[position++];
     hi = hi << 8;
     hi |= (unsigned char) data[position++];
+    assert(position <= size);
     return hi;
 }
 
@@ -131,6 +138,7 @@
     uint32_t b = (unsigned char) data[position++];
     uint32_t c = (unsigned char) data[position++];
     uint32_t d = (unsigned char) data[position++];
+    assert(position <= size);
     a = a << 24;
     a |= b << 16;
     a |= c << 8;
@@ -146,7 +154,9 @@
 }
 
 int8_t Buffer::getInt8(){
-    return (int8_t) data[position++];
+    int8_t i = static_cast<int8_t>(data[position++]);
+    assert(position <= size);
+    return i;
 }
 
 int16_t Buffer::getInt16(){

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/log/Selector.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/log/Selector.cpp?rev=713425&r1=713424&r2=713425&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/log/Selector.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/log/Selector.cpp Wed Nov 12 09:15:20 2008
@@ -52,12 +52,13 @@
              boost::bind(&Selector::enable, this, _1));
 }
 
-bool Selector::isEnabled(Level level, const std::string& function) {
+bool Selector::isEnabled(Level level, const char* function) {
+    const char* functionEnd = function+::strlen(function);
     for (std::vector<std::string>::iterator i=substrings[level].begin();
          i != substrings[level].end();
          ++i)
     {
-        if (function.find(*i) != std::string::npos)
+        if (std::search(function, functionEnd, i->begin(), i->end()) != functionEnd)
             return true;
     }
     return false;

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/log/Selector.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/log/Selector.h?rev=713425&r1=713424&r2=713425&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/log/Selector.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/log/Selector.h Wed Nov 12 09:15:20 2008
@@ -57,7 +57,7 @@
     void enable(const std::string& enableStr);
 
     /** True if level is enabled for file. */
-    bool isEnabled(Level level, const std::string& function);
+    bool isEnabled(Level level, const char* function);
 
   private:
     std::vector<std::string> substrings[LevelTraits::COUNT];

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/log/Statement.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/log/Statement.h?rev=713425&r1=713424&r2=713425&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/log/Statement.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/log/Statement.h Wed Nov 12 09:15:20 2008
@@ -102,10 +102,6 @@
  * QPID_LOG(error, boost::format("Dohickey %s exploded") % dohicky.name());
  * @endcode
  *
- * All code with logging statements should be built with
- *   -DQPID_COMPONENT=<component name>
- * where component name is the name of the component this file belongs to.
- * 
  * You can subscribe to log messages by level, by component, by filename
  * or a combination @see Configuration.
 

Modified: incubator/qpid/trunk/qpid/cpp/src/tests/cluster_test.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/tests/cluster_test.cpp?rev=713425&r1=713424&r2=713425&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/tests/cluster_test.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/tests/cluster_test.cpp Wed Nov 12 09:15:20 2008
@@ -209,6 +209,30 @@
     uint16_t channel;
 };
 
+int64_t getMsgSequence(const Message& m) {
+    return m.getMessageProperties().getApplicationHeaders().getAsInt64("qpid.msg_sequence");
+}
+
+QPID_AUTO_TEST_CASE(testSequenceOptions) {
+    // Make sure the exchange qpid.msg_sequence property is properly replicated.
+    ClusterFixture cluster(1);
+    Client c0(cluster[0], "c0");
+    FieldTable args;
+    args.setInt("qpid.msg_sequence", 1); // FIXME aconway 2008-11-11: works with "qpid.sequence_counter"??
+    c0.session.queueDeclare(arg::queue="q");
+    c0.session.exchangeDeclare(arg::exchange="ex", arg::type="direct", arg::arguments=args);
+    c0.session.exchangeBind(arg::exchange="ex", arg::queue="q", arg::bindingKey="k");
+    c0.session.messageTransfer(arg::content=Message("1", "k"), arg::destination="ex");
+    c0.session.messageTransfer(arg::content=Message("2", "k"), arg::destination="ex");
+    BOOST_CHECK_EQUAL(1, getMsgSequence(c0.subs.get("q", TIME_SEC)));
+    BOOST_CHECK_EQUAL(2, getMsgSequence(c0.subs.get("q", TIME_SEC)));
+
+    cluster.add();
+    Client c1(cluster[1]);
+    c1.session.messageTransfer(arg::content=Message("3", "k"), arg::destination="ex");    
+    BOOST_CHECK_EQUAL(3, getMsgSequence(c1.subs.get("q", TIME_SEC)));
+}
+
 QPID_AUTO_TEST_CASE(testUnsupported) {
     ScopedSuppressLogging sl;
     ClusterFixture cluster(1);

Modified: incubator/qpid/trunk/qpid/cpp/xml/cluster.xml
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/xml/cluster.xml?rev=713425&r1=713424&r2=713425&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/xml/cluster.xml (original)
+++ incubator/qpid/trunk/qpid/cpp/xml/cluster.xml Wed Nov 12 09:15:20 2008
@@ -140,5 +140,10 @@
       <field name="position" type="sequence-no"/>
     </control>
     
+    <!-- Replicate encoded exchanges/queues. -->
+    <control name="exchange" code="0x31"><field name="encoded" type="str32"/></control>
+    <control name="queue" code="0x32"><field name="encoded" type="str32"/></control>
+    
+
   </class>
 </amqp>