You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by kp...@apache.org on 2012/08/27 17:40:45 UTC

svn commit: r1377715 [6/12] - in /qpid/branches/asyncstore: ./ bin/ cpp/ cpp/docs/api/ cpp/examples/old_api/tradedemo/ cpp/include/qmf/engine/ cpp/include/qpid/client/ cpp/src/ cpp/src/qmf/engine/ cpp/src/qpid/acl/ cpp/src/qpid/asyncStore/ cpp/src/qpid...

Modified: qpid/branches/asyncstore/cpp/src/qpid/broker/TxBuffer.h
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/broker/TxBuffer.h?rev=1377715&r1=1377714&r2=1377715&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/broker/TxBuffer.h (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/broker/TxBuffer.h Mon Aug 27 15:40:33 2012
@@ -108,9 +108,6 @@ namespace qpid {
              * commit
              */
             QPID_BROKER_EXTERN bool commitLocal(TransactionalStore* const store);
-
-            // Used by cluster to replicate transaction status.
-            void accept(TxOpConstVisitor& v) const;
         };
     }
 }

Modified: qpid/branches/asyncstore/cpp/src/qpid/broker/TxOp.h
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/broker/TxOp.h?rev=1377715&r1=1377714&r2=1377715&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/broker/TxOp.h (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/broker/TxOp.h Mon Aug 27 15:40:33 2012
@@ -21,7 +21,6 @@
 #ifndef _TxOp_
 #define _TxOp_
 
-#include "qpid/broker/TxOpVisitor.h"
 #include "qpid/broker/TransactionalStore.h"
 #include <boost/shared_ptr.hpp>
 
@@ -36,8 +35,6 @@ namespace qpid {
             virtual void commit()  throw() = 0;
             virtual void rollback()  throw() = 0;
             virtual ~TxOp(){}
-
-            virtual void accept(TxOpConstVisitor&) const = 0;
         };
 
 }} // namespace qpid::broker

Modified: qpid/branches/asyncstore/cpp/src/qpid/broker/TxOpVisitor.h
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/broker/TxOpVisitor.h?rev=1377715&r1=1377714&r2=1377715&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/broker/TxOpVisitor.h (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/broker/TxOpVisitor.h Mon Aug 27 15:40:33 2012
@@ -1,97 +0,0 @@
-#ifndef QPID_BROKER_TXOPVISITOR_H
-#define QPID_BROKER_TXOPVISITOR_H
-
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *   http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-namespace qpid {
-namespace broker {
-
-class DtxAck;
-class RecoveredDequeue;
-class RecoveredEnqueue;
-class TxAccept;
-class TxPublish;
-
-/**
- * Visitor for TxOp familly of classes.
- */
-struct TxOpConstVisitor
-{
-    virtual ~TxOpConstVisitor() {}
-    virtual void operator()(const DtxAck&) = 0;
-    virtual void operator()(const RecoveredDequeue&) = 0;
-    virtual void operator()(const RecoveredEnqueue&) = 0;
-    virtual void operator()(const TxAccept&) = 0;
-    virtual void operator()(const TxPublish&) = 0;
-};
-
-}} // namespace qpid::broker
-
-#endif  /*!QPID_BROKER_TXOPVISITOR_H*/
-#ifndef QPID_BROKER_TXOPVISITOR_H
-#define QPID_BROKER_TXOPVISITOR_H
-
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *   http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-namespace qpid {
-namespace broker {
-
-class DtxAck;
-class RecoveredDequeue;
-class RecoveredEnqueue;
-class TxAccept;
-class TxPublish;
-
-/**
- * Visitor for TxOp familly of classes.
- */
-struct TxOpConstVisitor
-{
-    virtual ~TxOpConstVisitor() {}
-    virtual void operator()(const DtxAck&) = 0;
-    virtual void operator()(const RecoveredDequeue&) = 0;
-    virtual void operator()(const RecoveredEnqueue&) = 0;
-    virtual void operator()(const TxAccept&) = 0;
-    virtual void operator()(const TxPublish&) = 0;
-};
-
-}} // namespace qpid::broker
-
-#endif  /*!QPID_BROKER_TXOPVISITOR_H*/

Modified: qpid/branches/asyncstore/cpp/src/qpid/broker/TxPublish.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/broker/TxPublish.cpp?rev=1377715&r1=1377714&r2=1377715&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/broker/TxPublish.cpp (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/broker/TxPublish.cpp Mon Aug 27 15:40:33 2012
@@ -1,111 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *   http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-#include "qpid/log/Statement.h"
-#include "qpid/broker/TxPublish.h"
-#include "qpid/broker/Queue.h"
-
-using boost::intrusive_ptr;
-using namespace qpid::broker;
-
-TxPublish::TxPublish(intrusive_ptr<Message> _msg) : msg(_msg) {}
-
-bool TxPublish::prepare(TransactionContext* ctxt) throw()
-{
-    try{
-        while (!queues.empty()) {
-            prepare(ctxt, queues.front());
-            prepared.push_back(queues.front());
-            queues.pop_front();
-        }
-        return true;
-    }catch(const std::exception& e){
-        QPID_LOG(error, "Failed to prepare: " << e.what());
-    }catch(...){
-        QPID_LOG(error, "Failed to prepare (unknown error)");
-    }
-    return false;
-}
-
-void TxPublish::commit() throw()
-{
-    try {
-        for_each(prepared.begin(), prepared.end(), Commit(msg));
-        if (msg->isContentReleaseRequested()) {
-            // NOTE: The log messages in this section are used for flow-to-disk testing (which checks the log for the
-            // presence of these messages). Do not change these without also checking these tests.
-            if (msg->isContentReleaseBlocked()) {
-                QPID_LOG(debug, "Message id=\"" << msg->getProperties<qpid::framing::MessageProperties>()->getMessageId() << "\"; pid=0x" <<
-                                std::hex << msg->getPersistenceId() << std::dec << ": Content release blocked on commit");
-            } else {
-                msg->releaseContent();
-                QPID_LOG(debug, "Message id=\"" << msg->getProperties<qpid::framing::MessageProperties>()->getMessageId() << "\"; pid=0x" <<
-                                std::hex << msg->getPersistenceId() << std::dec << ": Content released on commit");
-            }
-        }
-    } catch (const std::exception& e) {
-        QPID_LOG(error, "Failed to commit: " << e.what());
-    } catch(...) {
-        QPID_LOG(error, "Failed to commit (unknown error)");
-    }
-}
-
-void TxPublish::rollback() throw()
-{
-    try {
-        for_each(prepared.begin(), prepared.end(), Rollback(msg));
-    } catch (const std::exception& e) {
-        QPID_LOG(error, "Failed to complete rollback: " << e.what());
-    } catch(...) {
-        QPID_LOG(error, "Failed to complete rollback (unknown error)");
-    }
-
-}
-
-void TxPublish::deliverTo(const boost::shared_ptr<Queue>& queue){
-    if (!queue->isLocal(msg)) {
-        queues.push_back(queue);
-        delivered = true;
-    } else {
-        QPID_LOG(debug, "Won't enqueue local message for " << queue->getName());
-    }
-}
-
-void TxPublish::prepare(TransactionContext* ctxt, const boost::shared_ptr<Queue> queue)
-{
-    queue->enqueue(ctxt, msg);
-}
-
-TxPublish::Commit::Commit(intrusive_ptr<Message>& _msg) : msg(_msg){}
-
-void TxPublish::Commit::operator()(const boost::shared_ptr<Queue>& queue){
-    queue->process(msg);
-}
-
-TxPublish::Rollback::Rollback(intrusive_ptr<Message>& _msg) : msg(_msg){}
-
-void TxPublish::Rollback::operator()(const boost::shared_ptr<Queue>& queue){
-    queue->enqueueAborted(msg);
-}
-
-uint64_t TxPublish::contentSize ()
-{
-    return msg->contentSize ();
-}

Modified: qpid/branches/asyncstore/cpp/src/qpid/broker/TxPublish.h
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/broker/TxPublish.h?rev=1377715&r1=1377714&r2=1377715&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/broker/TxPublish.h (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/broker/TxPublish.h Mon Aug 27 15:40:33 2012
@@ -1,92 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-#ifndef _TxPublish_
-#define _TxPublish_
-
-#include "qpid/broker/BrokerImportExport.h"
-#include "qpid/broker/Deliverable.h"
-#include "qpid/broker/Message.h"
-#include "qpid/broker/MessageStore.h"
-#include "qpid/broker/TxOp.h"
-
-#include <algorithm>
-#include <functional>
-#include <list>
-
-#include <boost/intrusive_ptr.hpp>
-
-namespace qpid {
-namespace broker {
-/**
- * Defines the behaviour for publish operations on a
- * transactional channel. Messages are routed through
- * exchanges when received but are not at that stage delivered
- * to the matching queues, rather the queues are held in an
- * instance of this class. On prepare() the message is marked
- * enqueued to the relevant queues in the MessagesStore. On
- * commit() the messages will be passed to the queue for
- * dispatch or to be added to the in-memory queue.
- */
-class QPID_BROKER_CLASS_EXTERN TxPublish : public TxOp, public Deliverable{
-
-    class Commit{
-        boost::intrusive_ptr<Message>& msg;
-      public:
-        Commit(boost::intrusive_ptr<Message>& msg);
-        void operator()(const boost::shared_ptr<Queue>& queue);
-    };
-    class Rollback{
-        boost::intrusive_ptr<Message>& msg;
-      public:
-        Rollback(boost::intrusive_ptr<Message>& msg);
-        void operator()(const boost::shared_ptr<Queue>& queue);
-    };
-
-    boost::intrusive_ptr<Message> msg;
-    std::list<boost::shared_ptr<Queue> > queues;
-    std::list<boost::shared_ptr<Queue> > prepared;
-
-    void prepare(TransactionContext* ctxt, boost::shared_ptr<Queue>);
-
-  public:
-    QPID_BROKER_EXTERN TxPublish(boost::intrusive_ptr<Message> msg);
-    QPID_BROKER_EXTERN virtual bool prepare(TransactionContext* ctxt) throw();
-    QPID_BROKER_EXTERN virtual void commit() throw();
-    QPID_BROKER_EXTERN virtual void rollback() throw();
-
-    virtual Message& getMessage() { return *msg; };
-
-    QPID_BROKER_EXTERN virtual void deliverTo(const boost::shared_ptr<Queue>& queue);
-
-    virtual ~TxPublish(){}
-    virtual void accept(TxOpConstVisitor& visitor) const { visitor(*this); }
-
-    QPID_BROKER_EXTERN uint64_t contentSize();
-
-    boost::intrusive_ptr<Message> getMessage() const { return msg; }
-    const std::list<boost::shared_ptr<Queue> >& getQueues() const { return queues; }
-    const std::list<boost::shared_ptr<Queue> >& getPrepared() const { return prepared; }
-};
-}
-}
-
-
-#endif

Modified: qpid/branches/asyncstore/cpp/src/qpid/broker/windows/SslProtocolFactory.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/broker/windows/SslProtocolFactory.cpp?rev=1377715&r1=1377714&r2=1377715&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/broker/windows/SslProtocolFactory.cpp (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/broker/windows/SslProtocolFactory.cpp Mon Aug 27 15:40:33 2012
@@ -281,7 +281,7 @@ void SslProtocolFactory::established(sys
                                                     boost::bind(&AsynchIOHandler::idle, async, _1));
     }
 
-    async->init(aio, brokerTimer, maxNegotiateTime, 4);
+    async->init(aio, brokerTimer, maxNegotiateTime);
     aio->start(poller);
 }
 

Modified: qpid/branches/asyncstore/cpp/src/qpid/client/SslConnector.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/client/SslConnector.cpp?rev=1377715&r1=1377714&r2=1377715&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/client/SslConnector.cpp (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/client/SslConnector.cpp Mon Aug 27 15:40:33 2012
@@ -38,7 +38,6 @@
 #include "qpid/Msg.h"
 
 #include <iostream>
-#include <map>
 #include <boost/bind.hpp>
 #include <boost/format.hpp>
 
@@ -54,53 +53,29 @@ using boost::str;
 
 class SslConnector : public Connector
 {
-    struct Buff;
-
-    /** Batch up frames for writing to aio. */
-    class Writer : public framing::FrameHandler {
-        typedef sys::ssl::SslIOBufferBase BufferBase;
-        typedef std::vector<framing::AMQFrame> Frames;
-
-        const uint16_t maxFrameSize;
-        sys::Mutex lock;
-        sys::ssl::SslIO* aio;
-        BufferBase* buffer;
-        Frames frames;
-        size_t lastEof; // Position after last EOF in frames
-        framing::Buffer encode;
-        size_t framesEncoded;
-        std::string identifier;
-        Bounds* bounds;
-
-        void writeOne();
-        void newBuffer();
-
-      public:
-
-        Writer(uint16_t maxFrameSize, Bounds*);
-        ~Writer();
-        void init(std::string id, sys::ssl::SslIO*);
-        void handle(framing::AMQFrame&);
-        void write(sys::ssl::SslIO&);
-    };
+    typedef std::deque<framing::AMQFrame> Frames;
 
     const uint16_t maxFrameSize;
+
+    sys::Mutex lock;
+    Frames frames;
+    size_t lastEof; // Position after last EOF in frames
+    uint64_t currentSize;
+    Bounds* bounds;
+
     framing::ProtocolVersion version;
     bool initiated;
-    SecuritySettings securitySettings;
-
-    sys::Mutex closedLock;
     bool closed;
 
     sys::ShutdownHandler* shutdownHandler;
     framing::InputHandler* input;
 
-    Writer writer;
-
     sys::ssl::SslSocket socket;
 
     sys::ssl::SslIO* aio;
+    std::string identifier;
     Poller::shared_ptr poller;
+    SecuritySettings securitySettings;
 
     ~SslConnector();
 
@@ -110,10 +85,7 @@ class SslConnector : public Connector
     void eof(qpid::sys::ssl::SslIO&);
     void disconnected(qpid::sys::ssl::SslIO&);
 
-    std::string identifier;
-
     void connect(const std::string& host, const std::string& port);
-    void init();
     void close();
     void send(framing::AMQFrame& frame);
     void abort() {} // TODO: Need to fix for heartbeat timeouts to work
@@ -126,17 +98,16 @@ class SslConnector : public Connector
     const SecuritySettings* getSecuritySettings();
     void socketClosed(qpid::sys::ssl::SslIO&, const qpid::sys::ssl::SslSocket&);
 
+    size_t decode(const char* buffer, size_t size);
+    size_t encode(const char* buffer, size_t size);
+    bool canEncode();
+
 public:
     SslConnector(Poller::shared_ptr p, framing::ProtocolVersion pVersion,
               const ConnectionSettings&,
               ConnectionImpl*);
 };
 
-struct SslConnector::Buff : public SslIO::BufferBase {
-    Buff(size_t size) : SslIO::BufferBase(new char[size], size) {}
-    ~Buff() { delete [] bytes;}
-};
-
 // Static constructor which registers connector here
 namespace {
     Connector* create(Poller::shared_ptr p, framing::ProtocolVersion v, const ConnectionSettings& s, ConnectionImpl* c) {
@@ -170,12 +141,14 @@ SslConnector::SslConnector(Poller::share
                      const ConnectionSettings& settings,
                      ConnectionImpl* cimpl)
     : maxFrameSize(settings.maxFrameSize),
+      lastEof(0),
+      currentSize(0),
+      bounds(cimpl),
       version(ver),
       initiated(false),
       closed(true),
       shutdownHandler(0),
       input(0),
-      writer(maxFrameSize, cimpl),
       aio(0),
       poller(p)
 {
@@ -192,7 +165,7 @@ SslConnector::~SslConnector() {
 }
 
 void SslConnector::connect(const std::string& host, const std::string& port){
-    Mutex::ScopedLock l(closedLock);
+    Mutex::ScopedLock l(lock);
     assert(closed);
     try {
         socket.connect(host, port);
@@ -201,7 +174,6 @@ void SslConnector::connect(const std::st
         throw TransportFailure(e.what());
     }
 
-    identifier = str(format("[%1% %2%]") % socket.getLocalPort() % socket.getPeerAddress());
     closed = false;
     aio = new SslIO(socket,
                        boost::bind(&SslConnector::readbuff, this, _1, _2),
@@ -210,21 +182,16 @@ void SslConnector::connect(const std::st
                        boost::bind(&SslConnector::socketClosed, this, _1, _2),
                        0, // nobuffs
                        boost::bind(&SslConnector::writebuff, this, _1));
-    writer.init(identifier, aio);
-}
 
-void SslConnector::init(){
-    Mutex::ScopedLock l(closedLock);
+    aio->createBuffers(maxFrameSize);
+    identifier = str(format("[%1% %2%]") % socket.getLocalPort() % socket.getPeerAddress());
     ProtocolInitiation init(version);
     writeDataBlock(init);
-    for (int i = 0; i < 32; i++) {
-        aio->queueReadBuffer(new Buff(maxFrameSize));
-    }
     aio->start(poller);
 }
 
 void SslConnector::close() {
-    Mutex::ScopedLock l(closedLock);
+    Mutex::ScopedLock l(lock);
     if (!closed) {
         closed = true;
         if (aio)
@@ -260,76 +227,110 @@ const std::string& SslConnector::getIden
 }
 
 void SslConnector::send(AMQFrame& frame) {
-    writer.handle(frame);
-}
-
-SslConnector::Writer::Writer(uint16_t s, Bounds* b) : maxFrameSize(s), aio(0), buffer(0), lastEof(0), bounds(b)
-{
-}
-
-SslConnector::Writer::~Writer() { delete buffer; }
-
-void SslConnector::Writer::init(std::string id, sys::ssl::SslIO* a) {
-    Mutex::ScopedLock l(lock);
-    identifier = id;
-    aio = a;
-    newBuffer();
-}
-void SslConnector::Writer::handle(framing::AMQFrame& frame) {
+    bool notifyWrite = false;
+    {
     Mutex::ScopedLock l(lock);
     frames.push_back(frame);
-    if (frame.getEof() || (bounds && bounds->getCurrentSize() >= maxFrameSize)) {
+    //only ask to write if this is the end of a frameset or if we
+    //already have a buffers worth of data
+    currentSize += frame.encodedSize();
+    if (frame.getEof()) {
         lastEof = frames.size();
-        aio->notifyPendingWrite();
+        notifyWrite = true;
+    } else {
+        notifyWrite = (currentSize >= maxFrameSize);
+    }
+    /*
+      NOTE: Moving the following line into this mutex block
+            is a workaround for BZ 570168, in which the test
+            testConcurrentSenders causes a hang about 1.5%
+            of the time.  ( To see the hang much more frequently
+            leave this line out of the mutex block, and put a
+            small usleep just before it.)
+
+            TODO mgoulish - fix the underlying cause and then
+                            move this call back outside the mutex.
+    */
+    if (notifyWrite && !closed) aio->notifyPendingWrite();
     }
-    QPID_LOG(trace, "SENT [" << identifier << "]: " << frame);
 }
 
-void SslConnector::Writer::writeOne() {
-    assert(buffer);
-    framesEncoded = 0;
+void SslConnector::writebuff(SslIO& /*aio*/)
+{
+    // It's possible to be disconnected and be writable
+    if (closed)
+        return;
 
-    buffer->dataStart = 0;
-    buffer->dataCount = encode.getPosition();
-    aio->queueWrite(buffer);
-    newBuffer();
-}
+    if (!canEncode()) {
+        return;
+    }
 
-void SslConnector::Writer::newBuffer() {
-    buffer = aio->getQueuedBuffer();
-    if (!buffer) buffer = new Buff(maxFrameSize);
-    encode = framing::Buffer(buffer->bytes, buffer->byteCount);
-    framesEncoded = 0;
+    SslIO::BufferBase* buffer = aio->getQueuedBuffer();
+    if (buffer) {
+
+        size_t encoded = encode(buffer->bytes, buffer->byteCount);
+
+        buffer->dataStart = 0;
+        buffer->dataCount = encoded;
+        aio->queueWrite(buffer);
+    }
 }
 
 // Called in IO thread.
-void SslConnector::Writer::write(sys::ssl::SslIO&) {
+bool SslConnector::canEncode()
+{
     Mutex::ScopedLock l(lock);
-    assert(buffer);
+    //have at least one full frameset or a whole buffers worth of data
+    return lastEof || currentSize >= maxFrameSize;
+}
+
+// Called in IO thread.
+size_t SslConnector::encode(const char* buffer, size_t size)
+{
+    framing::Buffer out(const_cast<char*>(buffer), size);
     size_t bytesWritten(0);
-    for (size_t i = 0; i < lastEof; ++i) {
-        AMQFrame& frame = frames[i];
-        uint32_t size = frame.encodedSize();
-        if (size > encode.available()) writeOne();
-        assert(size <= encode.available());
-        frame.encode(encode);
-        ++framesEncoded;
-        bytesWritten += size;
+    {
+        Mutex::ScopedLock l(lock);
+        while (!frames.empty() && out.available() >= frames.front().encodedSize() ) {
+            frames.front().encode(out);
+            QPID_LOG(trace, "SENT [" << identifier << "]: " << frames.front());
+            frames.pop_front();
+            if (lastEof) --lastEof;
+        }
+        bytesWritten = size - out.available();
+        currentSize -= bytesWritten;
     }
-    frames.erase(frames.begin(), frames.begin()+lastEof);
-    lastEof = 0;
     if (bounds) bounds->reduce(bytesWritten);
-    if (encode.getPosition() > 0) writeOne();
+    return bytesWritten;
 }
 
-void SslConnector::readbuff(SslIO& aio, SslIO::BufferBase* buff) {
-    framing::Buffer in(buff->bytes+buff->dataStart, buff->dataCount);
+void SslConnector::readbuff(SslIO& aio, SslIO::BufferBase* buff)
+{
+    int32_t decoded = decode(buff->bytes+buff->dataStart, buff->dataCount);
+    // TODO: unreading needs to go away, and when we can cope
+    // with multiple sub-buffers in the general buffer scheme, it will
+    if (decoded < buff->dataCount) {
+        // Adjust buffer for used bytes and then "unread them"
+        buff->dataStart += decoded;
+        buff->dataCount -= decoded;
+        aio.unread(buff);
+    } else {
+        // Give whole buffer back to aio subsystem
+        aio.queueReadBuffer(buff);
+    }
+}
 
+size_t SslConnector::decode(const char* buffer, size_t size)
+{
+    framing::Buffer in(const_cast<char*>(buffer), size);
     if (!initiated) {
         framing::ProtocolInitiation protocolInit;
         if (protocolInit.decode(in)) {
-            //TODO: check the version is correct
             QPID_LOG(debug, "RECV [" << identifier << "]: INIT(" << protocolInit << ")");
+            if(!(protocolInit==version)){
+                throw Exception(QPID_MSG("Unsupported version: " << protocolInit
+                                         << " supported version " << version));
+            }
         }
         initiated = true;
     }
@@ -338,25 +339,12 @@ void SslConnector::readbuff(SslIO& aio, 
         QPID_LOG(trace, "RECV [" << identifier << "]: " << frame);
         input->received(frame);
     }
-    // TODO: unreading needs to go away, and when we can cope
-    // with multiple sub-buffers in the general buffer scheme, it will
-    if (in.available() != 0) {
-        // Adjust buffer for used bytes and then "unread them"
-        buff->dataStart += buff->dataCount-in.available();
-        buff->dataCount = in.available();
-        aio.unread(buff);
-    } else {
-        // Give whole buffer back to aio subsystem
-        aio.queueReadBuffer(buff);
-    }
-}
-
-void SslConnector::writebuff(SslIO& aio_) {
-    writer.write(aio_);
+    return size - in.available();
 }
 
 void SslConnector::writeDataBlock(const AMQDataBlock& data) {
-    SslIO::BufferBase* buff = new Buff(maxFrameSize);
+    SslIO::BufferBase* buff = aio->getQueuedBuffer();
+    assert(buff);
     framing::Buffer out(buff->bytes, buff->byteCount);
     data.encode(out);
     buff->dataCount = data.encodedSize();

Modified: qpid/branches/asyncstore/cpp/src/qpid/client/SubscriptionManagerImpl.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/client/SubscriptionManagerImpl.cpp?rev=1377715&r1=1377714&r2=1377715&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/client/SubscriptionManagerImpl.cpp (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/client/SubscriptionManagerImpl.cpp Mon Aug 27 15:40:33 2012
@@ -28,6 +28,7 @@
 #include <qpid/client/Session.h>
 #include <qpid/client/MessageListener.h>
 #include <qpid/framing/Uuid.h>
+#include <qpid/log/Statement.h>
 #include <set>
 #include <sstream>
 
@@ -167,6 +168,15 @@ void SubscriptionManagerImpl::setFlowCon
     setFlowControl(name, FlowControl(messages, bytes, window));
 }
 
+AutoCancel::AutoCancel(SubscriptionManager& sm_, const std::string& tag_) : sm(sm_), tag(tag_) {}
+AutoCancel::~AutoCancel() {
+    try {
+        sm.cancel(tag);
+    } catch (const qpid::Exception& e) {
+        QPID_LOG(info, "Exception in AutoCancel destructor: " << e.what());
+    }
+}
+
 }} // namespace qpid::client
 
 

Modified: qpid/branches/asyncstore/cpp/src/qpid/client/TCPConnector.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/client/TCPConnector.cpp?rev=1377715&r1=1377714&r2=1377715&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/client/TCPConnector.cpp (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/client/TCPConnector.cpp Mon Aug 27 15:40:33 2012
@@ -46,11 +46,6 @@ using namespace qpid::framing;
 using boost::format;
 using boost::str;
 
-struct TCPConnector::Buff : public AsynchIO::BufferBase {
-    Buff(size_t size) : AsynchIO::BufferBase(new char[size], size) {}
-    ~Buff() { delete [] bytes;}
-};
-
 // Static constructor which registers connector here
 namespace {
     Connector* create(Poller::shared_ptr p, framing::ProtocolVersion v, const ConnectionSettings& s, ConnectionImpl* c) {
@@ -118,9 +113,8 @@ void TCPConnector::connected(const Socke
 
 void TCPConnector::start(sys::AsynchIO* aio_) {
     aio = aio_;
-    for (int i = 0; i < 4; i++) {
-        aio->queueReadBuffer(new Buff(maxFrameSize));
-    }
+
+    aio->createBuffers(maxFrameSize);
 
     identifier = str(format("[%1%]") % socket.getFullAddress());
 }
@@ -226,15 +220,19 @@ void TCPConnector::writebuff(AsynchIO& /
         return;
 
     Codec* codec = securityLayer.get() ? (Codec*) securityLayer.get() : (Codec*) this;
-    if (codec->canEncode()) {
-        std::auto_ptr<AsynchIO::BufferBase> buffer = std::auto_ptr<AsynchIO::BufferBase>(aio->getQueuedBuffer());
-        if (!buffer.get()) buffer = std::auto_ptr<AsynchIO::BufferBase>(new Buff(maxFrameSize));
+
+    if (!codec->canEncode()) {
+        return;
+    }
+
+    AsynchIO::BufferBase* buffer = aio->getQueuedBuffer();
+    if (buffer) {
 
         size_t encoded = codec->encode(buffer->bytes, buffer->byteCount);
 
         buffer->dataStart = 0;
         buffer->dataCount = encoded;
-        aio->queueWrite(buffer.release());
+        aio->queueWrite(buffer);
     }
 }
 
@@ -307,6 +305,7 @@ size_t TCPConnector::decode(const char* 
 
 void TCPConnector::writeDataBlock(const AMQDataBlock& data) {
     AsynchIO::BufferBase* buff = aio->getQueuedBuffer();
+    assert(buff);
     framing::Buffer out(buff->bytes, buff->byteCount);
     data.encode(out);
     buff->dataCount = data.encodedSize();

Modified: qpid/branches/asyncstore/cpp/src/qpid/client/TCPConnector.h
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/client/TCPConnector.h?rev=1377715&r1=1377714&r2=1377715&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/client/TCPConnector.h (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/client/TCPConnector.h Mon Aug 27 15:40:33 2012
@@ -50,7 +50,6 @@ namespace client {
 class TCPConnector : public Connector, public sys::Codec
 {
     typedef std::deque<framing::AMQFrame> Frames;
-    struct Buff;
 
     const uint16_t maxFrameSize;
 

Modified: qpid/branches/asyncstore/cpp/src/qpid/ha/Backup.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/ha/Backup.cpp?rev=1377715&r1=1377714&r2=1377715&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/ha/Backup.cpp (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/ha/Backup.cpp Mon Aug 27 15:40:33 2012
@@ -35,6 +35,7 @@
 #include "qpid/framing/MessageTransferBody.h"
 #include "qpid/sys/SystemInfo.h"
 #include "qpid/types/Variant.h"
+#include "qpid/log/Statement.h"
 
 namespace qpid {
 namespace ha {
@@ -51,35 +52,15 @@ Backup::Backup(HaBroker& hb, const Setti
     if (!s.brokerUrl.empty()) initialize(Url(s.brokerUrl));
 }
 
-bool Backup::isSelf(const Address& a) const {
-    return sys::SystemInfo::isLocalHost(a.host) &&
-        a.port == haBroker.getBroker().getPort(a.protocol);
-}
-
-// Remove my own address from the URL if possible.
-// This isn't 100% reliable given the many ways to specify a host,
-// but should work in most cases. We have additional measures to prevent
-// self-connection in ConnectionObserver
-Url Backup::removeSelf(const Url& brokers) const {
-    Url url;
-    for (Url::const_iterator i = brokers.begin(); i != brokers.end(); ++i)
-        if (!isSelf(*i)) url.push_back(*i);
-    if (url.empty())
-        throw Url::Invalid(logPrefix+"Failover URL is empty");
-    QPID_LOG(debug, logPrefix << "Failover URL (excluding self): " << url);
-    return url;
-}
-
 void Backup::initialize(const Url& brokers) {
     if (brokers.empty()) throw Url::Invalid("HA broker URL is empty");
     QPID_LOG(info, logPrefix << "Connecting to cluster, broker URL: " << brokers);
-    Url url = removeSelf(brokers);
-    string protocol = url[0].protocol.empty() ? "tcp" : url[0].protocol;
+    string protocol = brokers[0].protocol.empty() ? "tcp" : brokers[0].protocol;
     types::Uuid uuid(true);
     // Declare the link
     std::pair<Link::shared_ptr, bool> result = broker.getLinks().declare(
         broker::QPID_NAME_PREFIX + string("ha.link.") + uuid.str(),
-        url[0].host, url[0].port, protocol,
+        brokers[0].host, brokers[0].port, protocol,
         false,                  // durable
         settings.mechanism, settings.username, settings.password,
         false);               // no amq.failover - don't want to use client URL.
@@ -90,7 +71,7 @@ void Backup::initialize(const Url& broke
         replicator->initialize();
         broker.getExchanges().registerExchange(replicator);
     }
-    link->setUrl(url);          // Outside the lock, once set link doesn't change.
+    link->setUrl(brokers);          // Outside the lock, once set link doesn't change.
 }
 
 Backup::~Backup() {
@@ -107,10 +88,8 @@ void Backup::setBrokerUrl(const Url& url
         sys::Mutex::ScopedLock l(lock);
         linkSet = link;
     }
-    if (linkSet) {
-        QPID_LOG(info, logPrefix << "Broker URL set to: " << url);
-        link->setUrl(removeSelf(url)); // Outside lock, once set link doesn't change
-    }
+    if (linkSet)
+        link->setUrl(url);      // Outside lock, once set link doesn't change
     else
         initialize(url);        // Deferred initialization
 }

Modified: qpid/branches/asyncstore/cpp/src/qpid/ha/Backup.h
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/ha/Backup.h?rev=1377715&r1=1377714&r2=1377715&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/ha/Backup.h (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/ha/Backup.h Mon Aug 27 15:40:33 2012
@@ -53,8 +53,6 @@ class Backup
     void setStatus(BrokerStatus);
 
   private:
-    bool isSelf(const Address& a) const;
-    Url removeSelf(const Url&) const;
     void initialize(const Url&);
     std::string logPrefix;
 

Modified: qpid/branches/asyncstore/cpp/src/qpid/ha/BackupConnectionExcluder.h
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/ha/BackupConnectionExcluder.h?rev=1377715&r1=1377714&r2=1377715&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/ha/BackupConnectionExcluder.h (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/ha/BackupConnectionExcluder.h Mon Aug 27 15:40:33 2012
@@ -36,7 +36,7 @@ class BackupConnectionExcluder : public 
 {
   public:
     void opened(broker::Connection& connection) {
-        QPID_LOG(debug, "Backup broker rejected connection "+connection.getMgmtId());
+        QPID_LOG(debug, "Backup: Rejected connection "+connection.getMgmtId());
         connection.abort();
     }
 

Modified: qpid/branches/asyncstore/cpp/src/qpid/ha/BrokerReplicator.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/ha/BrokerReplicator.cpp?rev=1377715&r1=1377714&r2=1377715&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/ha/BrokerReplicator.cpp (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/ha/BrokerReplicator.cpp Mon Aug 27 15:40:33 2012
@@ -24,7 +24,9 @@
 #include "qpid/broker/Broker.h"
 #include "qpid/broker/Connection.h"
 #include "qpid/broker/Queue.h"
+#include "qpid/broker/QueueSettings.h"
 #include "qpid/broker/Link.h"
+#include "qpid/broker/amqp_0_10/MessageTransfer.h"
 #include "qpid/framing/FieldTable.h"
 #include "qpid/log/Statement.h"
 #include "qpid/amqp_0_10/Codecs.h"
@@ -117,11 +119,6 @@ const string _QUERY_REQUEST("_query_requ
 const string BROKER("broker");
 const string MEMBERS("members");
 
-bool isQMFv2(const Message& message) {
-    const framing::MessageProperties* props = message.getProperties<framing::MessageProperties>();
-    return props && props->getAppId() == QMF2;
-}
-
 template <class T> bool match(Variant::Map& schema) {
     return T::match(schema[CLASS_NAME], schema[PACKAGE_NAME]);
 }
@@ -219,7 +216,7 @@ void BrokerReplicator::initializeBridge(
     link->getRemoteAddress(primary);
     string queueName = bridge.getQueueName();
 
-    QPID_LOG(info, logPrefix << (initialized ? "Connecting" : "Failing over")
+    QPID_LOG(info, logPrefix << (initialized ? "Failing over" : "Connecting")
              << " to primary " << primary
              << " status:" << printable(haBroker.getStatus()));
     initialized = true;
@@ -253,18 +250,15 @@ void BrokerReplicator::route(Deliverable
         haBroker.setStatus(CATCHUP);
         QPID_LOG(notice, logPrefix << "Connected to primary " << primary);
     }
-
-    const framing::FieldTable* headers = msg.getMessage().getApplicationHeaders();
-    const MessageProperties* messageProperties = msg.getMessage().getProperties<MessageProperties>();
     Variant::List list;
     try {
-        if (!isQMFv2(msg.getMessage()) || !headers || !messageProperties)
+        if (!qpid::broker::amqp_0_10::MessageTransfer::isQMFv2(msg.getMessage()))
             throw Exception("Unexpected message, not QMF2 event or query response.");
         // decode as list
-        string content = msg.getMessage().getFrames().getContent();
-        amqp_0_10::ListCodec::decode(content, list);
-        QPID_LOG(trace, "Broker replicator received: " << *messageProperties);
-        if (headers->getAsString(QMF_CONTENT) == EVENT) {
+        string content = msg.getMessage().getContent();
+        qpid::amqp_0_10::ListCodec::decode(content, list);
+
+        if (msg.getMessage().getPropertyAsString(QMF_CONTENT) == EVENT) {
             for (Variant::List::iterator i = list.begin(); i != list.end(); ++i) {
                 Variant::Map& map = i->asMap();
                 QPID_LOG(trace, "Broker replicator event: " << map);
@@ -278,20 +272,20 @@ void BrokerReplicator::route(Deliverable
                 else if (match<EventUnbind>(schema)) doEventUnbind(values);
                 else if (match<EventMembersUpdate>(schema)) doEventMembersUpdate(values);
             }
-        } else if (headers->getAsString(QMF_OPCODE) == QUERY_RESPONSE) {
+        } else if (msg.getMessage().getPropertyAsString(QMF_OPCODE) == QUERY_RESPONSE) {
             for (Variant::List::iterator i = list.begin(); i != list.end(); ++i) {
                 Variant::Map& map = i->asMap();
                 QPID_LOG(trace, "Broker replicator response: " << map);
                 string type = map[SCHEMA_ID].asMap()[CLASS_NAME].asString();
                 Variant::Map& values = map[VALUES].asMap();
                 framing::FieldTable args;
-                amqp_0_10::translate(asMapVoid(values[ARGUMENTS]), args);
+                qpid::amqp_0_10::translate(asMapVoid(values[ARGUMENTS]), args);
                 if      (type == QUEUE) doResponseQueue(values);
                 else if (type == EXCHANGE) doResponseExchange(values);
                 else if (type == BINDING) doResponseBind(values);
                 else if (type == HA_BROKER) doResponseHaBroker(values);
             }
-            if (messageProperties->getCorrelationId() == EXCHANGE && !headers->isSet(PARTIAL)) {
+            if (qpid::broker::amqp_0_10::MessageTransfer::isLastQMFResponse(msg.getMessage(), EXCHANGE)) {
                 // We have received all of the exchange response.
                 alternates.clear();
             }
@@ -309,13 +303,11 @@ void BrokerReplicator::doEventQueueDecla
     Variant::Map argsMap = asMapVoid(values[ARGS]);
     bool autoDel = values[AUTODEL].asBool();
     bool excl = values[EXCL].asBool();
-    if (values[DISP] == CREATED &&
-        replicationTest.isReplicated(CONFIGURATION, argsMap, autoDel, excl))
-    {
+    if (values[DISP] == CREATED && replicationTest.isReplicated(CONFIGURATION, argsMap, autoDel, excl)) {
         string name = values[QNAME].asString();
-        QPID_LOG(debug, logPrefix << "Queue declare event: " << name);
+        QueueSettings settings(values[DURABLE].asBool(), values[AUTODEL].asBool());
         framing::FieldTable args;
-        amqp_0_10::translate(argsMap, args);
+        qpid::amqp_0_10::translate(argsMap, args);
         // If we already have a queue with this name, replace it.
         // The queue was definitely created on the primary.
         if (broker.getQueues().find(name)) {
@@ -323,10 +315,17 @@ void BrokerReplicator::doEventQueueDecla
             broker.getQueues().destroy(name);
             stopQueueReplicator(name);
         }
-        boost::shared_ptr<Queue> queue = createQueue(
-            name, values[DURABLE].asBool(), autoDel, args, values[ALTEX].asString());
-        assert(queue);  // Should be created since we destroed the previous queue above.
-        if (queue) startQueueReplicator(queue);
+        settings.populate(args, settings.storeSettings);
+        std::pair<boost::shared_ptr<Queue>, bool> result =
+            broker.createQueue(
+                name,
+                settings,
+                0 /*i.e. no owner regardless of exclusivity on master*/,
+                values[ALTEX].asString(),
+                userId,
+                remoteHost);
+        assert(result.second);  // Should be true since we destroyed existing queue above
+        startQueueReplicator(result.first);
     }
 }
 
@@ -343,7 +342,7 @@ void BrokerReplicator::doEventQueueDelet
     // sessions may be closed by a "queue deleted" exception.
     string name = values[QNAME].asString();
     boost::shared_ptr<Queue> queue = broker.getQueues().find(name);
-    if (queue && replicationTest.replicateLevel(queue->getSettings())) {
+    if (queue && replicationTest.replicateLevel(queue->getSettings().storeSettings)) {
         QPID_LOG(debug, logPrefix << "Queue delete event: " << name);
         stopQueueReplicator(name);
         broker.deleteQueue(name, userId, remoteHost);
@@ -357,7 +356,7 @@ void BrokerReplicator::doEventExchangeDe
         string name = values[EXNAME].asString();
         QPID_LOG(debug, logPrefix << "Exchange declare event: " << name);
         framing::FieldTable args;
-        amqp_0_10::translate(argsMap, args);
+        qpid::amqp_0_10::translate(argsMap, args);
         // If we already have a exchange with this name, replace it.
         // The exchange was definitely created on the primary.
         if (broker.getExchanges().find(name)) {
@@ -391,10 +390,10 @@ void BrokerReplicator::doEventBind(Varia
     // We only replicate binds for a replicated queue to replicated
     // exchange that both exist locally.
     if (exchange && replicationTest.replicateLevel(exchange->getArgs()) &&
-        queue && replicationTest.replicateLevel(queue->getSettings()))
+        queue && replicationTest.replicateLevel(queue->getSettings().storeSettings))
     {
         framing::FieldTable args;
-        amqp_0_10::translate(asMapVoid(values[ARGS]), args);
+        qpid::amqp_0_10::translate(asMapVoid(values[ARGS]), args);
         string key = values[KEY].asString();
         QPID_LOG(debug, logPrefix << "Bind event: exchange=" << exchange->getName()
                  << " queue=" << queue->getName()
@@ -411,10 +410,10 @@ void BrokerReplicator::doEventUnbind(Var
     // We only replicate unbinds for a replicated queue to replicated
     // exchange that both exist locally.
     if (exchange && replicationTest.replicateLevel(exchange->getArgs()) &&
-        queue && replicationTest.replicateLevel(queue->getSettings()))
+        queue && replicationTest.replicateLevel(queue->getSettings().storeSettings))
     {
         framing::FieldTable args;
-        amqp_0_10::translate(asMapVoid(values[ARGS]), args);
+        qpid::amqp_0_10::translate(asMapVoid(values[ARGS]), args);
         string key = values[KEY].asString();
         QPID_LOG(debug, logPrefix << "Unbind event: exchange=" << exchange->getName()
                  << " queue=" << queue->getName()
@@ -455,7 +454,7 @@ void BrokerReplicator::doResponseQueue(V
     string name(values[NAME].asString());
     QPID_LOG(debug, logPrefix << "Queue response: " << name);
     framing::FieldTable args;
-    amqp_0_10::translate(argsMap, args);
+    qpid::amqp_0_10::translate(argsMap, args);
     boost::shared_ptr<Queue> queue =
         createQueue(name, values[DURABLE].asBool(), values[AUTODELETE].asBool(), args,
                     getAltExchange(values[ALTEXCHANGE]));
@@ -470,11 +469,12 @@ void BrokerReplicator::doResponseExchang
     string name = values[NAME].asString();
     QPID_LOG(debug, logPrefix << "Exchange response: " << name);
     framing::FieldTable args;
-    amqp_0_10::translate(argsMap, args);
+    qpid::amqp_0_10::translate(argsMap, args);
     boost::shared_ptr<Exchange> exchange = createExchange(
         name, values[TYPE].asString(), values[DURABLE].asBool(), args,
         getAltExchange(values[ALTEXCHANGE]));
-    QPID_LOG_IF(debug, !exchange, logPrefix << "Exchange already exists: " << name);
+    // It is normal for the exchange to already exist if we are failing over.
+    QPID_LOG_IF(debug, !exchange, logPrefix << "Exchange already replicated: " << name);
 }
 
 namespace {
@@ -506,14 +506,14 @@ void BrokerReplicator::doResponseBind(Va
 
     // Automatically replicate binding if queue and exchange exist and are replicated
     if (exchange && replicationTest.replicateLevel(exchange->getArgs()) &&
-        queue && replicationTest.replicateLevel(queue->getSettings()))
+        queue && replicationTest.replicateLevel(queue->getSettings().storeSettings))
     {
         string key = values[KEY].asString();
         QPID_LOG(debug, logPrefix << "Bind response: exchange:" << exName
                  << " queue:" << qName
                  << " key:" << key);
         framing::FieldTable args;
-        amqp_0_10::translate(asMapVoid(values[ARGUMENTS]), args);
+        qpid::amqp_0_10::translate(asMapVoid(values[ARGUMENTS]), args);
         exchange->bind(queue, key, &args);
     }
 }
@@ -543,7 +543,7 @@ void BrokerReplicator::doResponseHaBroke
 
 void BrokerReplicator::startQueueReplicator(const boost::shared_ptr<Queue>& queue)
 {
-    if (replicationTest.replicateLevel(queue->getSettings()) == ALL) {
+    if (replicationTest.replicateLevel(queue->getSettings().storeSettings) == ALL) {
         boost::shared_ptr<QueueReplicator> qr(
             new QueueReplicator(haBroker, queue, link));
         if (!broker.getExchanges().registerExchange(qr))
@@ -569,14 +569,14 @@ boost::shared_ptr<Queue> BrokerReplicato
     const qpid::framing::FieldTable& arguments,
     const std::string& alternateExchange)
 {
+    QueueSettings settings(durable, autodelete);
+    settings.populate(arguments, settings.storeSettings);
     std::pair<boost::shared_ptr<Queue>, bool> result =
         broker.createQueue(
             name,
-            durable,
-            autodelete,
-            0, // no owner regardless of exclusivity on primary
+            settings,
+            0,// no owner regardless of exclusivity on primary
             string(), // Set alternate exchange below
-            arguments,
             userId,
             remoteHost);
     if (result.second) {

Modified: qpid/branches/asyncstore/cpp/src/qpid/ha/ConnectionObserver.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/ha/ConnectionObserver.cpp?rev=1377715&r1=1377714&r2=1377715&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/ha/ConnectionObserver.cpp (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/ha/ConnectionObserver.cpp Mon Aug 27 15:40:33 2012
@@ -32,7 +32,7 @@ namespace ha {
 ConnectionObserver::ConnectionObserver(HaBroker& hb, const types::Uuid& uuid)
     : haBroker(hb), logPrefix("Connections: "), self(uuid) {}
 
-bool ConnectionObserver::getBrokerInfo(broker::Connection& connection, BrokerInfo& info) {
+bool ConnectionObserver::getBrokerInfo(const broker::Connection& connection, BrokerInfo& info) {
     framing::FieldTable ft;
     if (connection.getClientProperties().getTable(ConnectionObserver::BACKUP_TAG, ft)) {
         info = BrokerInfo(ft);
@@ -51,21 +51,23 @@ ConnectionObserver::ObserverPtr Connecti
     return observer;
 }
 
+bool ConnectionObserver::isSelf(const broker::Connection& connection) {
+    BrokerInfo info;
+    return getBrokerInfo(connection, info) && info.getSystemId() == self;
+}
+
 void ConnectionObserver::opened(broker::Connection& connection) {
     try {
         if (connection.isLink()) return; // Allow outgoing links.
         if (connection.getClientProperties().isSet(ADMIN_TAG)) {
-            QPID_LOG(debug, logPrefix << "Allowing admin connection: "
+            QPID_LOG(debug, logPrefix << "Accepted admin connection: "
                      << connection.getMgmtId());
             return;                 // No need to call observer, always allow admins.
         }
-        BrokerInfo info;            // Avoid self connections.
-        if (getBrokerInfo(connection, info)) {
-            if (info.getSystemId() == self) {
-                QPID_LOG(debug, "HA broker rejected self connection "+connection.getMgmtId());
-                connection.abort();
-            }
-
+        if (isSelf(connection)) { // Reject self connections
+            QPID_LOG(debug, logPrefix << "Rejected self connection "+connection.getMgmtId());
+            connection.abort();
+            return;
         }
         ObserverPtr o(getObserver());
         if (o) o->opened(connection);
@@ -77,8 +79,8 @@ void ConnectionObserver::opened(broker::
 }
 
 void ConnectionObserver::closed(broker::Connection& connection) {
+    if (isSelf(connection)) return; // Ignore closing of self connections.
     try {
-        BrokerInfo info;
         ObserverPtr o(getObserver());
         if (o) o->closed(connection);
     }

Modified: qpid/branches/asyncstore/cpp/src/qpid/ha/ConnectionObserver.h
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/ha/ConnectionObserver.h?rev=1377715&r1=1377714&r2=1377715&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/ha/ConnectionObserver.h (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/ha/ConnectionObserver.h Mon Aug 27 15:40:33 2012
@@ -51,7 +51,7 @@ class ConnectionObserver : public broker
     static const std::string ADMIN_TAG;
     static const std::string BACKUP_TAG;
 
-    static bool getBrokerInfo(broker::Connection& connection, BrokerInfo& info);
+    static bool getBrokerInfo(const broker::Connection& connection, BrokerInfo& info);
 
     ConnectionObserver(HaBroker& haBroker, const types::Uuid& self);
 
@@ -62,6 +62,8 @@ class ConnectionObserver : public broker
     void closed(broker::Connection& connection);
 
   private:
+    bool isSelf(const broker::Connection&);
+
     sys::Mutex lock;
     HaBroker& haBroker;
     std::string logPrefix;

Modified: qpid/branches/asyncstore/cpp/src/qpid/ha/HaBroker.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/ha/HaBroker.cpp?rev=1377715&r1=1377714&r2=1377715&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/ha/HaBroker.cpp (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/ha/HaBroker.cpp Mon Aug 27 15:40:33 2012
@@ -83,7 +83,11 @@ void HaBroker::initialize() {
 
     // FIXME aconway 2012-07-19: assumes there's a TCP transport with a meaningful port.
     brokerInfo = BrokerInfo(
-        broker.getSystem()->getNodeName(), broker.getPort(broker::Broker::TCP_TRANSPORT), systemId);
+        broker.getSystem()->getNodeName(),
+        broker.getPort(broker::Broker::TCP_TRANSPORT),
+        systemId);
+
+    QPID_LOG(notice, logPrefix << "Initializing: " << brokerInfo);
 
     // Set up the management object.
     ManagementAgent* ma = broker.getManagementAgent();
@@ -111,8 +115,6 @@ void HaBroker::initialize() {
     if (!settings.brokerUrl.empty()) setBrokerUrl(Url(settings.brokerUrl));
 
 
-    QPID_LOG(notice, logPrefix << "Initializing: " << brokerInfo);
-
     // NOTE: lock is not needed in a constructor, but create one
     // to pass to functions that have a ScopedLock parameter.
     Mutex::ScopedLock l(lock);
@@ -226,6 +228,7 @@ void HaBroker::setBrokerUrl(const Url& u
     if (url.empty()) throw Url::Invalid("HA broker URL is empty");
     brokerUrl = url;
     mgmtObject->set_brokersUrl(brokerUrl.str());
+    QPID_LOG(info, logPrefix << "Broker URL set to: " << url);
     if (backup.get()) backup->setBrokerUrl(brokerUrl);
     // Updating broker URL also updates defaulted client URL:
     if (clientUrl.empty()) updateClientUrl(l);
@@ -292,6 +295,7 @@ void HaBroker::statusChanged(Mutex::Scop
 }
 
 void HaBroker::membershipUpdated(Mutex::ScopedLock&) {
+    QPID_LOG(info, logPrefix << "Membership changed: " <<  membership);
     Variant::List brokers = membership.asList();
     mgmtObject->set_members(brokers);
     broker.getManagementAgent()->raiseEvent(_qmf::EventMembersUpdate(brokers));
@@ -321,14 +325,14 @@ void HaBroker::resetMembership(const Bro
 void HaBroker::addBroker(const BrokerInfo& b) {
     Mutex::ScopedLock l(lock);
     membership.add(b);
-    QPID_LOG(debug, logPrefix << "Membership add: " <<  b << " now: " << membership);
+    QPID_LOG(debug, logPrefix << "Membership add: " <<  b);
     membershipUpdated(l);
 }
 
 void HaBroker::removeBroker(const Uuid& id) {
     Mutex::ScopedLock l(lock);
     membership.remove(id);
-    QPID_LOG(debug, logPrefix << "Membership remove: " <<  id << " now: " << membership);
+    QPID_LOG(debug, logPrefix << "Membership remove: " <<  id);
     membershipUpdated(l);
 }
 

Modified: qpid/branches/asyncstore/cpp/src/qpid/ha/HaBroker.h
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/ha/HaBroker.h?rev=1377715&r1=1377714&r2=1377715&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/ha/HaBroker.h (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/ha/HaBroker.h Mon Aug 27 15:40:33 2012
@@ -97,6 +97,8 @@ class HaBroker : public management::Mana
     void addBroker(const BrokerInfo& b);       // Add a broker to the membership.
     void removeBroker(const types::Uuid& id);  // Remove a broker from membership.
 
+    types::Uuid getSystemId() const { return systemId; }
+
   private:
     void setClientUrl(const Url&);
     void setBrokerUrl(const Url&);

Modified: qpid/branches/asyncstore/cpp/src/qpid/ha/Membership.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/ha/Membership.cpp?rev=1377715&r1=1377714&r2=1377715&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/ha/Membership.cpp (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/ha/Membership.cpp Mon Aug 27 15:40:33 2012
@@ -66,7 +66,7 @@ types::Variant::List Membership::asList(
 BrokerInfo::Set Membership::otherBackups() const {
     BrokerInfo::Set result;
     for (BrokerInfo::Map::const_iterator i = brokers.begin(); i != brokers.end(); ++i)
-        if (isBackup(i->second.getStatus()) && i->second.getSystemId() != self)
+        if (i->second.getStatus() == READY && i->second.getSystemId() != self)
             result.insert(i->second);
     return result;
 }

Modified: qpid/branches/asyncstore/cpp/src/qpid/ha/Membership.h
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/ha/Membership.h?rev=1377715&r1=1377714&r2=1377715&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/ha/Membership.h (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/ha/Membership.h Mon Aug 27 15:40:33 2012
@@ -47,7 +47,7 @@ class Membership
     void add(const BrokerInfo& b);
     void remove(const types::Uuid& id);
     bool contains(const types::Uuid& id);
-    /** Return IDs of all backups other than self */
+    /** Return IDs of all READY backups other than self */
     BrokerInfo::Set otherBackups() const;
 
     void assign(const types::Variant::List&);

Modified: qpid/branches/asyncstore/cpp/src/qpid/ha/Primary.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/ha/Primary.cpp?rev=1377715&r1=1377714&r2=1377715&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/ha/Primary.cpp (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/ha/Primary.cpp Mon Aug 27 15:40:33 2012
@@ -180,7 +180,7 @@ void Primary::readyReplica(const Replica
 
 void Primary::queueCreate(const QueuePtr& q) {
     // Throw if there is an invalid replication level in the queue settings.
-    haBroker.getReplicationTest().replicateLevel(q->getSettings());
+    haBroker.getReplicationTest().replicateLevel(q->getSettings().storeSettings);
     Mutex::ScopedLock l(lock);
     for (BackupMap::iterator i = backups.begin(); i != backups.end(); ++i) {
         i->second->queueCreate(q);
@@ -201,6 +201,7 @@ void Primary::opened(broker::Connection&
         Mutex::ScopedLock l(lock);
         BackupMap::iterator i = backups.find(info.getSystemId());
         if (i == backups.end()) {
+            QPID_LOG(debug, logPrefix << "New backup connected: " << info);
             boost::shared_ptr<RemoteBackup> backup(
                 new RemoteBackup(info, haBroker.getReplicationTest(), true));
             {
@@ -209,7 +210,6 @@ void Primary::opened(broker::Connection&
                 backup->setInitialQueues(haBroker.getBroker().getQueues(), false);
             }
             backups[info.getSystemId()] = backup;
-            QPID_LOG(debug, logPrefix << "New backup connected: " << info);
         }
         else {
             QPID_LOG(debug, logPrefix << "Known backup connected: " << info);
@@ -225,6 +225,12 @@ void Primary::opened(broker::Connection&
 }
 
 void Primary::closed(broker::Connection& connection) {
+    // NOTE: It is possible for a backup connection to be rejected while we are
+    // a backup, but closed() is called after we have become primary.
+    //
+    // For this reason we do not remove from the backups map here, the backups
+    // map holds all the backups we know about whether connected or not.
+    //
     Mutex::ScopedLock l(lock);
     BrokerInfo info;
     if (ha::ConnectionObserver::getBrokerInfo(connection, info)) {
@@ -233,12 +239,6 @@ void Primary::closed(broker::Connection&
         BackupMap::iterator i = backups.find(info.getSystemId());
         if (i != backups.end()) i->second->setConnected(false);
     }
-    // NOTE: we do not remove from the backups map here, the backups map holds
-    // all the backups we know about whether connected or not.
-    //
-    // It is possible for a backup connection to be rejected while we are a backup,
-    // but the closed is seen after we have become primary. Removing the entry
-    // from backups in this case would be incorrect.
 }
 
 

Modified: qpid/branches/asyncstore/cpp/src/qpid/ha/QueueGuard.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/ha/QueueGuard.cpp?rev=1377715&r1=1377714&r2=1377715&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/ha/QueueGuard.cpp (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/ha/QueueGuard.cpp Mon Aug 27 15:40:33 2012
@@ -39,10 +39,10 @@ class QueueGuard::QueueObserver : public
 {
   public:
     QueueObserver(QueueGuard& g) : guard(g) {}
-    void enqueued(const broker::QueuedMessage& qm) { guard.enqueued(qm); }
-    void dequeued(const broker::QueuedMessage& qm) { guard.dequeued(qm); }
-    void acquired(const broker::QueuedMessage&) {}
-    void requeued(const broker::QueuedMessage&) {}
+    void enqueued(const broker::Message& m) { guard.enqueued(m); }
+    void dequeued(const broker::Message& m) { guard.dequeued(m); }
+    void acquired(const broker::Message&) {}
+    void requeued(const broker::Message&) {}
   private:
     QueueGuard& guard;
 };
@@ -64,39 +64,47 @@ QueueGuard::QueueGuard(broker::Queue& q,
 QueueGuard::~QueueGuard() { cancel(); }
 
 // NOTE: Called with message lock held.
-void QueueGuard::enqueued(const QueuedMessage& qm) {
-    assert(qm.queue == &queue);
+void QueueGuard::enqueued(const Message& m) {
     // Delay completion
-    QPID_LOG(trace, logPrefix << "Delayed completion of " << qm);
-    qm.payload->getIngressCompletion().startCompleter();
+    QPID_LOG(trace, logPrefix << "Delayed completion of " << m);
+    m.getIngressCompletion()->startCompleter();
     {
         Mutex::ScopedLock l(lock);
-        assert(!delayed.contains(qm.position));
-        delayed += qm.position;
+        if (!delayed.insert(Delayed::value_type(m.getSequence(), m.getIngressCompletion())).second) {
+            QPID_LOG(critical, logPrefix << "Second enqueue for message with sequence " << m.getSequence());
+            assert(false);
+        }
     }
 }
 
 // NOTE: Called with message lock held.
-void QueueGuard::dequeued(const QueuedMessage& qm) {
-    assert(qm.queue == &queue);
-    QPID_LOG(trace, logPrefix << "Dequeued " << qm);
+void QueueGuard::dequeued(const Message& m) {
+    QPID_LOG(trace, logPrefix << "Dequeued " << m);
     ReplicatingSubscription* rs=0;
     {
         Mutex::ScopedLock l(lock);
         rs = subscription;
     }
-    if (rs) rs->dequeued(qm);
-    complete(qm);
+    if (rs) rs->dequeued(m);
+    complete(m.getSequence());
+}
+
+void QueueGuard::completeRange(Delayed::iterator begin, Delayed::iterator end) {
+    for (Delayed::iterator i = begin; i != end; ++i) {
+        QPID_LOG(trace, logPrefix << "Completed " << i->first);
+        i->second->finishCompleter();
+    }
 }
 
 void QueueGuard::cancel() {
     queue.removeObserver(observer);
+    Delayed removed;
     {
         Mutex::ScopedLock l(lock);
         if (delayed.empty()) return; // No need if no delayed messages.
+        delayed.swap(removed);
     }
-    // FIXME aconway 2012-06-15: optimize, only messages in delayed set.
-    queue.eachMessage(boost::bind(&QueueGuard::complete, this, _1));
+    completeRange(removed.begin(), removed.end());
 }
 
 void QueueGuard::attach(ReplicatingSubscription& rs) {
@@ -104,36 +112,39 @@ void QueueGuard::attach(ReplicatingSubsc
     subscription = &rs;
 }
 
-namespace {
-void completeBefore(QueueGuard* guard, SequenceNumber position, const QueuedMessage& qm) {
-    if (qm.position <= position) guard->complete(qm);
-}
-}
-
 bool QueueGuard::subscriptionStart(SequenceNumber position) {
-   // Complete any messages before or at the ReplicatingSubscription start position.
-   // Those messages are already on the backup.
-    if (!delayed.empty() && delayed.front() <= position) {
-        // FIXME aconway 2012-06-15: queue iteration, only messages in delayed
-        queue.eachMessage(boost::bind(&completeBefore, this, position, _1));
+    Delayed removed;
+    {
+        Mutex::ScopedLock l(lock);
+        // Complete any messages before or at the ReplicatingSubscription start position.
+        // Those messages are already on the backup.
+        for (Delayed::iterator i = delayed.begin(); i != delayed.end() && i->first <= position;) {
+            removed.insert(*i);
+            delayed.erase(i++);
+        }
     }
+    completeRange(removed.begin(), removed.end());
     return position >= range.back;
 }
 
-void QueueGuard::complete(const QueuedMessage& qm) {
-    assert(qm.queue == &queue);
+void QueueGuard::complete(SequenceNumber sequence) {
+    boost::intrusive_ptr<broker::AsyncCompletion> m;
     {
         Mutex::ScopedLock l(lock);
         // The same message can be completed twice, by
         // ReplicatingSubscription::acknowledged and dequeued. Remove it
-        // from the set so we only call finishCompleter() once
-        if (delayed.contains(qm.position))
-            delayed -= qm.position;
-        else
-            return;
+        // from the map so we only call finishCompleter() once
+        Delayed::iterator i = delayed.find(sequence);
+        if (i != delayed.end()) {
+            m = i->second;
+            delayed.erase(i);
+        }
+
+    }
+    if (m) {
+        QPID_LOG(trace, logPrefix << "Completed " << sequence);
+        m->finishCompleter();
     }
-    QPID_LOG(trace, logPrefix << "Completed " << qm);
-    qm.payload->getIngressCompletion().finishCompleter();
 }
 
 }} // namespaces qpid::ha

Modified: qpid/branches/asyncstore/cpp/src/qpid/ha/QueueGuard.h
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/ha/QueueGuard.h?rev=1377715&r1=1377714&r2=1377715&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/ha/QueueGuard.h (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/ha/QueueGuard.h Mon Aug 27 15:40:33 2012
@@ -63,15 +63,15 @@ class QueueGuard {
     /** QueueObserver override. Delay completion of the message.
      * NOTE: Called under the queues message lock.
      */
-    void enqueued(const broker::QueuedMessage&);
+    void enqueued(const broker::Message&);
 
     /** QueueObserver override: Complete a delayed message.
      * NOTE: Called under the queues message lock.
      */
-    void dequeued(const broker::QueuedMessage&);
+    void dequeued(const broker::Message&);
 
     /** Complete a delayed message. */
-    void complete(const broker::QueuedMessage&);
+    void complete(framing::SequenceNumber);
 
     /** Complete all delayed messages. */
     void cancel();
@@ -108,10 +108,13 @@ class QueueGuard {
     sys::Mutex lock;
     std::string logPrefix;
     broker::Queue& queue;
-    framing::SequenceSet delayed;
+    typedef std::map<framing::SequenceNumber, boost::intrusive_ptr<broker::AsyncCompletion> > Delayed;
+    Delayed delayed;
     ReplicatingSubscription* subscription;
     boost::shared_ptr<QueueObserver> observer;
     QueueRange range;
+
+    void completeRange(Delayed::iterator begin, Delayed::iterator end);
 };
 }} // namespace qpid::ha
 

Modified: qpid/branches/asyncstore/cpp/src/qpid/ha/QueueReplicator.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/ha/QueueReplicator.cpp?rev=1377715&r1=1377714&r2=1377715&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/ha/QueueReplicator.cpp (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/ha/QueueReplicator.cpp Mon Aug 27 15:40:33 2012
@@ -120,8 +120,10 @@ void QueueReplicator::initializeBridge(B
     settings.setTable(ReplicatingSubscription::QPID_BROKER_INFO,
                       brokerInfo.asFieldTable());
     SequenceNumber front;
-    if (ReplicatingSubscription::getFront(*queue, front))
+    if (ReplicatingSubscription::getFront(*queue, front)) {
         settings.setInt(ReplicatingSubscription::QPID_FRONT, front);
+        QPID_LOG(debug, "QPID_FRONT for " << queue->getName() << " is " << front);
+    }
     peer.getMessage().subscribe(
         args.i_src, args.i_dest, 0/*accept-explicit*/, 1/*not-acquired*/,
         false/*exclusive*/, "", 0, settings);
@@ -137,8 +139,7 @@ void QueueReplicator::initializeBridge(B
 
 namespace {
 template <class T> T decodeContent(Message& m) {
-    std::string content;
-    m.getFrames().getContent(content);
+    std::string content = m.getContent();
     Buffer buffer(const_cast<char*>(content.c_str()), content.size());
     T result;
     result.decode(buffer);
@@ -148,9 +149,7 @@ template <class T> T decodeContent(Messa
 
 void QueueReplicator::dequeue(SequenceNumber n, sys::Mutex::ScopedLock&) {
     // Thread safe: only calls thread safe Queue functions.
-    QueuedMessage message;
-    if (queue->acquireMessageAt(n, message))
-        queue->dequeue(0, message);
+    queue->dequeueMessageAt(n);
 }
 
 // Called in connection thread of the queues bridge to primary.

Modified: qpid/branches/asyncstore/cpp/src/qpid/ha/RemoteBackup.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/ha/RemoteBackup.cpp?rev=1377715&r1=1377714&r2=1377715&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/ha/RemoteBackup.cpp (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/ha/RemoteBackup.cpp Mon Aug 27 15:40:33 2012
@@ -23,6 +23,7 @@
 #include "qpid/broker/Broker.h"
 #include "qpid/broker/Queue.h"
 #include "qpid/broker/QueueRegistry.h"
+#include "qpid/log/Statement.h"
 #include <boost/bind.hpp>
 
 namespace qpid {
@@ -32,7 +33,7 @@ using sys::Mutex;
 using boost::bind;
 
 RemoteBackup::RemoteBackup(const BrokerInfo& info, ReplicationTest rt, bool con) :
-    logPrefix("Primary remote backup "+info.getLogId()+": "),
+    logPrefix("Primary: Remote backup "+info.getLogId()+": "),
     brokerInfo(info), replicationTest(rt), connected(con), reportedReady(false)
 {}
 

Modified: qpid/branches/asyncstore/cpp/src/qpid/ha/ReplicatingSubscription.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/ha/ReplicatingSubscription.cpp?rev=1377715&r1=1377714&r2=1377715&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/ha/ReplicatingSubscription.cpp (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/ha/ReplicatingSubscription.cpp Mon Aug 27 15:40:33 2012
@@ -27,6 +27,7 @@
 #include "qpid/broker/Queue.h"
 #include "qpid/broker/SessionContext.h"
 #include "qpid/broker/ConnectionState.h"
+#include "qpid/broker/amqp_0_10/MessageTransfer.h"
 #include "qpid/framing/AMQFrame.h"
 #include "qpid/framing/MessageTransferBody.h"
 #include "qpid/log/Statement.h"
@@ -66,10 +67,10 @@ class DequeueScanner
         at = front - 1;
     }
 
-    void operator()(const QueuedMessage& qm) {
-        if (qm.position >= front && qm.position <= back) {
-            if (qm.position > at+1) subscription.dequeued(at+1, qm.position-1);
-            at = qm.position;
+    void operator()(const Message& m) {
+        if (m.getSequence() >= front && m.getSequence() <= back) {
+            if (m.getSequence() > at+1) subscription.dequeued(at+1, m.getSequence()-1);
+            at = m.getSequence();
         }
     }
 
@@ -90,37 +91,23 @@ string mask(const string& in)
     return DOLLAR + in + INTERNAL;
 }
 
-
-/** Dummy consumer used to get the front position on the queue */
-class GetPositionConsumer : public Consumer
+namespace {
+bool getSequence(const Message& message, SequenceNumber& result)
 {
-  public:
-    GetPositionConsumer() :
-        Consumer("ha.GetPositionConsumer."+types::Uuid(true).str(), false) {}
-    bool deliver(broker::QueuedMessage& ) { return true; }
-    void notify() {}
-    bool filter(boost::intrusive_ptr<broker::Message>) { return true; }
-    bool accept(boost::intrusive_ptr<broker::Message>) { return true; }
-    void cancel() {}
-    void acknowledged(const broker::QueuedMessage&) {}
-    bool browseAcquired() const { return true; }
-    broker::OwnershipToken* getSession() { return 0; }
-};
-
-
+    result = message.getSequence();
+    return true;
+}
+}
 bool ReplicatingSubscription::getNext(
     broker::Queue& q, SequenceNumber from, SequenceNumber& result)
 {
-    boost::shared_ptr<Consumer> c(new GetPositionConsumer);
-    c->setPosition(from);
-    if (!q.dispatch(c)) return false;
-    result = c->getPosition();
-    return true;
+    QueueCursor cursor(REPLICATOR);
+    return q.seek(cursor, boost::bind(&getSequence, _1, boost::ref(result)), from);
 }
 
 bool ReplicatingSubscription::getFront(broker::Queue& q, SequenceNumber& front) {
-    // FIXME aconway 2012-05-23: won't wrap, assumes 0 is < all messages in queue.
-    return getNext(q, 0, front);
+    QueueCursor cursor(REPLICATOR);
+    return q.seek(cursor, boost::bind(&getSequence, _1, boost::ref(front)));
 }
 
 /* Called by SemanticState::consume to create a consumer */
@@ -152,15 +139,14 @@ ReplicatingSubscription::ReplicatingSubs
     const string& name,
     Queue::shared_ptr queue,
     bool ack,
-    bool acquire,
+    bool /*acquire*/,
     bool exclusive,
     const string& tag,
     const string& resumeId,
     uint64_t resumeTtl,
     const framing::FieldTable& arguments
-) : ConsumerImpl(parent, name, queue, ack, acquire, exclusive, tag,
+) : ConsumerImpl(parent, name, queue, ack, REPLICATOR, exclusive, tag,
                  resumeId, resumeTtl, arguments),
-    dummy(new Queue(mask(name))),
     ready(false)
 {
     try {
@@ -213,6 +199,8 @@ ReplicatingSubscription::ReplicatingSubs
             queue->eachMessage(boost::ref(scan)); // Remove missing messages in between.
             scan.finish();
             position = backup.back;
+            //move cursor to position
+            queue->seek(*this, position);
         }
         // NOTE: we are assuming that the messages that are on the backup are
         // consistent with those on the primary. If the backup is a replica
@@ -260,32 +248,31 @@ void ReplicatingSubscription::initialize
 }
 
 // Message is delivered in the subscription's connection thread.
-bool ReplicatingSubscription::deliver(QueuedMessage& qm) {
+bool ReplicatingSubscription::deliver(const qpid::broker::QueueCursor& c, const qpid::broker::Message& m) {
+    position = m.getSequence();
     try {
-        // Add position events for the subscribed queue, not the internal event queue.
-        if (qm.queue == getQueue().get()) {
-            QPID_LOG(trace, logPrefix << "Replicating " << qm);
-            {
-                Mutex::ScopedLock l(lock);
-                assert(position == qm.position);
-                // qm.position is the position of the newly enqueued qm on local queue.
-                // backupPosition is latest position on backup queue before enqueueing
-                if (qm.position <= backupPosition)
-                    throw Exception(
-                        QPID_MSG("Expected position >  " << backupPosition
-                                 << " but got " << qm.position));
-                if (qm.position - backupPosition > 1) {
-                    // Position has advanced because of messages dequeued ahead of us.
-                    // Send the position before qm was enqueued.
-                    sendPositionEvent(qm.position-1, l);
-                }
-                // Backup will automatically advance by 1 on delivery of message.
-                backupPosition = qm.position;
+        QPID_LOG(trace, logPrefix << "Replicating " << getQueue()->getName() << "[" << m.getSequence() << "]");
+        {
+            Mutex::ScopedLock l(lock);
+            //FIXME GRS: position is no longer set//assert(position == m.getSequence());
+
+            // m.getSequence() is the position of the newly enqueued message on local queue.
+            // backupPosition is latest position on backup queue before enqueueing
+            if (m.getSequence() <= backupPosition)
+                throw Exception(
+                    QPID_MSG("Expected position >  " << backupPosition
+                             << " but got " << m.getSequence()));
+            if (m.getSequence() - backupPosition > 1) {
+                // Position has advanced because of messages dequeued ahead of us.
+                // Send the position before message was enqueued.
+                sendPositionEvent(m.getSequence()-1, l);
             }
+            // Backup will automatically advance by 1 on delivery of message.
+            backupPosition = m.getSequence();
         }
-        return ConsumerImpl::deliver(qm);
+        return ConsumerImpl::deliver(c, m);
     } catch (const std::exception& e) {
-        QPID_LOG(critical, logPrefix << "Error replicating " << qm
+        QPID_LOG(critical, logPrefix << "Error replicating " << getQueue()->getName() << "[" << m.getSequence() << "]"
                  << ": " << e.what());
         throw;
     }
@@ -310,15 +297,13 @@ void ReplicatingSubscription::cancel()
 }
 
 // Consumer override, called on primary in the backup's IO thread.
-void ReplicatingSubscription::acknowledged(const QueuedMessage& qm) {
-    if (qm.queue == getQueue().get()) { // Don't complete messages on the internal queue
-        // Finish completion of message, it has been acknowledged by the backup.
-        QPID_LOG(trace, logPrefix << "Acknowledged " << qm);
-        guard->complete(qm);
-        // If next message is protected by the guard then we are ready
-        if (qm.position >= guard->getRange().back) setReady();
-    }
-    ConsumerImpl::acknowledged(qm);
+void ReplicatingSubscription::acknowledged(const broker::DeliveryRecord& r) {
+    // Finish completion of message, it has been acknowledged by the backup.
+    QPID_LOG(trace, logPrefix << "Acknowledged " << getQueue()->getName() << "[" << r.getMessageId() << "]");
+    guard->complete(r.getMessageId());
+    // If next message is protected by the guard then we are ready
+    if (r.getMessageId() >= guard->getRange().back) setReady();
+    ConsumerImpl::acknowledged(r);
 }
 
 // Called with lock held. Called in subscription's connection thread.
@@ -341,13 +326,12 @@ void ReplicatingSubscription::sendDequeu
 // Called after the message has been removed
 // from the deque and under the messageLock in the queue. Called in
 // arbitrary connection threads.
-void ReplicatingSubscription::dequeued(const QueuedMessage& qm)
+void ReplicatingSubscription::dequeued(const Message& m)
 {
-    assert (qm.queue == getQueue().get());
-    QPID_LOG(trace, logPrefix << "Dequeued " << qm);
+    QPID_LOG(trace, logPrefix << "Dequeued " << getQueue()->getName() << "[" << m.getSequence() << "]");
     {
         Mutex::ScopedLock l(lock);
-        dequeues.add(qm.position);
+        dequeues.add(m.getSequence());
     }
     notify();                   // Ensure a call to doDispatch
 }
@@ -379,7 +363,7 @@ void ReplicatingSubscription::sendPositi
 void ReplicatingSubscription::sendEvent(const std::string& key, framing::Buffer& buffer)
 {
     //generate event message
-    boost::intrusive_ptr<Message> event = new Message();
+    boost::intrusive_ptr<qpid::broker::amqp_0_10::MessageTransfer> event(new qpid::broker::amqp_0_10::MessageTransfer());
     AMQFrame method((MessageTransferBody(ProtocolVersion(), string(), 0, 0)));
     AMQFrame header((AMQHeaderBody()));
     AMQFrame content((AMQContentBody()));
@@ -400,10 +384,8 @@ void ReplicatingSubscription::sendEvent(
         event->getFrames().getHeaders()->get<DeliveryProperties>(true);
     props->setRoutingKey(key);
     // Send the event directly to the base consumer implementation.
-    // We don't really need a queue here but we pass a dummy queue
-    // to conform to the consumer API.
-    QueuedMessage qm(dummy.get(), event);
-    ConsumerImpl::deliver(qm);
+    //dummy consumer prevents acknowledgements being handled, which is what we want for events
+    ConsumerImpl::deliver(QueueCursor(), Message(event, 0), boost::shared_ptr<Consumer>());
 }
 
 

Modified: qpid/branches/asyncstore/cpp/src/qpid/ha/ReplicatingSubscription.h
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/ha/ReplicatingSubscription.h?rev=1377715&r1=1377714&r2=1377715&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/ha/ReplicatingSubscription.h (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/ha/ReplicatingSubscription.h Mon Aug 27 15:40:33 2012
@@ -101,15 +101,15 @@ class ReplicatingSubscription : public b
 
     // Called via QueueGuard::dequeued.
     //@return true if the message requires completion.
-    void dequeued(const broker::QueuedMessage& qm);
+    void dequeued(const broker::Message&);
 
     // Called during initial scan for dequeues.
     void dequeued(framing::SequenceNumber first, framing::SequenceNumber last);
 
     // Consumer overrides.
-    bool deliver(broker::QueuedMessage& msg);
+    bool deliver(const broker::QueueCursor& cursor, const broker::Message& msg);
     void cancel();
-    void acknowledged(const broker::QueuedMessage&);
+    void acknowledged(const broker::DeliveryRecord&);
     bool browseAcquired() const { return true; }
     // Hide the "queue deleted" error for a ReplicatingSubscription when a
     // queue is deleted, this is normal and not an error.
@@ -127,8 +127,8 @@ class ReplicatingSubscription : public b
 
   private:
     std::string logPrefix;
-    boost::shared_ptr<broker::Queue> dummy; // Used to send event messages
     framing::SequenceSet dequeues;
+    framing::SequenceNumber position;
     framing::SequenceNumber backupPosition;
     bool ready;
     BrokerInfo info;

Modified: qpid/branches/asyncstore/cpp/src/qpid/ha/ReplicationTest.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/ha/ReplicationTest.cpp?rev=1377715&r1=1377714&r2=1377715&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/ha/ReplicationTest.cpp (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/ha/ReplicationTest.cpp Mon Aug 27 15:40:33 2012
@@ -68,7 +68,7 @@ bool ReplicationTest::isReplicated(
 
 bool ReplicationTest::isReplicated(ReplicateLevel level, const broker::Queue& q)
 {
-    return isReplicated(level, q.getSettings(), q.isAutoDelete(), q.hasExclusiveOwner());
+    return isReplicated(level, q.getSettings().storeSettings, q.isAutoDelete(), q.hasExclusiveOwner());
 }
 
 

Modified: qpid/branches/asyncstore/cpp/src/qpid/management/ManagementAgent.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/management/ManagementAgent.cpp?rev=1377715&r1=1377714&r2=1377715&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/management/ManagementAgent.cpp (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/management/ManagementAgent.cpp Mon Aug 27 15:40:33 2012
@@ -31,6 +31,7 @@
 #include <qpid/broker/Message.h>
 #include "qpid/framing/MessageTransferBody.h"
 #include "qpid/framing/FieldValue.h"
+#include "qpid/broker/amqp_0_10/MessageTransfer.h"
 #include "qpid/sys/Time.h"
 #include "qpid/sys/Thread.h"
 #include "qpid/broker/ConnectionState.h"
@@ -535,7 +536,7 @@ void ManagementAgent::sendBufferLH(Buffe
     }
     if (exchange.get() == 0) return;
 
-    intrusive_ptr<Message> msg(new Message());
+    intrusive_ptr<qpid::broker::amqp_0_10::MessageTransfer> transfer(new qpid::broker::amqp_0_10::MessageTransfer());
     AMQFrame method((MessageTransferBody(ProtocolVersion(), exchange->getName (), 0, 0)));
     AMQFrame header((AMQHeaderBody()));
     AMQFrame content((AMQContentBody()));
@@ -547,24 +548,26 @@ void ManagementAgent::sendBufferLH(Buffe
     header.setEof(false);
     content.setBof(false);
 
-    msg->getFrames().append(method);
-    msg->getFrames().append(header);
+    transfer->getFrames().append(method);
+    transfer->getFrames().append(header);
 
     MessageProperties* props =
-        msg->getFrames().getHeaders()->get<MessageProperties>(true);
+        transfer->getFrames().getHeaders()->get<MessageProperties>(true);
     props->setContentLength(length);
 
     DeliveryProperties* dp =
-        msg->getFrames().getHeaders()->get<DeliveryProperties>(true);
+        transfer->getFrames().getHeaders()->get<DeliveryProperties>(true);
     dp->setRoutingKey(routingKey);
 
-    msg->getFrames().append(content);
-    msg->setIsManagementMessage(true);
+    transfer->getFrames().append(content);
+
+    Message msg(transfer, transfer);
+    msg.setIsManagementMessage(true);
 
     {
         sys::Mutex::ScopedUnlock u(userLock);
 
-        DeliverableMessage deliverable (msg);
+        DeliverableMessage deliverable (msg, 0);
         try {
             exchange->route(deliverable);
         } catch(exception&) {}
@@ -602,7 +605,7 @@ void ManagementAgent::sendBufferLH(const
     }
     if (exchange.get() == 0) return;
 
-    intrusive_ptr<Message> msg(new Message());
+    intrusive_ptr<qpid::broker::amqp_0_10::MessageTransfer> transfer(new qpid::broker::amqp_0_10::MessageTransfer);
     AMQFrame method((MessageTransferBody(ProtocolVersion(), exchange->getName (), 0, 0)));
     AMQFrame header((AMQHeaderBody()));
     AMQFrame content((AMQContentBody(data)));
@@ -612,11 +615,11 @@ void ManagementAgent::sendBufferLH(const
     header.setEof(false);
     content.setBof(false);
 
-    msg->getFrames().append(method);
-    msg->getFrames().append(header);
+    transfer->getFrames().append(method);
+    transfer->getFrames().append(header);
 
     MessageProperties* props =
-        msg->getFrames().getHeaders()->get<MessageProperties>(true);
+        transfer->getFrames().getHeaders()->get<MessageProperties>(true);
     props->setContentLength(data.length());
     if (!cid.empty()) {
         props->setCorrelationId(cid);
@@ -625,23 +628,25 @@ void ManagementAgent::sendBufferLH(const
     props->setAppId("qmf2");
 
     for (i = headers.begin(); i != headers.end(); ++i) {
-        msg->insertCustomProperty(i->first, i->second.asString());
+        props->getApplicationHeaders().setString(i->first, i->second.asString());
     }
 
     DeliveryProperties* dp =
-        msg->getFrames().getHeaders()->get<DeliveryProperties>(true);
+        transfer->getFrames().getHeaders()->get<DeliveryProperties>(true);
     dp->setRoutingKey(routingKey);
     if (ttl_msec) {
         dp->setTtl(ttl_msec);
-        msg->computeExpiration(broker->getExpiryPolicy());
     }
-    msg->getFrames().append(content);
-    msg->setIsManagementMessage(true);
+    transfer->getFrames().append(content);
+    transfer->computeRequiredCredit();
+    Message msg(transfer, transfer);
+    msg.setIsManagementMessage(true);
+    msg.computeExpiration(broker->getExpiryPolicy());
 
     {
         sys::Mutex::ScopedUnlock u(userLock);
 
-        DeliverableMessage deliverable (msg);
+        DeliverableMessage deliverable (msg, 0);
         try {
             exchange->route(deliverable);
         } catch(exception&) {}
@@ -2135,19 +2140,20 @@ bool ManagementAgent::authorizeAgentMess
     // authorized or not.  In this case, return true (authorized) if there is no ACL in place,
     // otherwise return false;
     //
-    if (msg.encodedSize() > MA_BUFFER_SIZE)
+    if (msg.getContentSize() > MA_BUFFER_SIZE)
         return broker->getAcl() == 0;
 
-    msg.encodeContent(inBuffer);
+    inBuffer.putRawData(msg.getContent());
     uint32_t bufferLen = inBuffer.getPosition();
     inBuffer.reset();
 
+    qpid::broker::amqp_0_10::MessageTransfer& transfer(qpid::broker::amqp_0_10::MessageTransfer::get(msg));
     const framing::MessageProperties* p =
-      msg.getFrames().getHeaders()->get<framing::MessageProperties>();
+      transfer.getFrames().getHeaders()->get<framing::MessageProperties>();
 
-    const framing::FieldTable *headers = msg.getApplicationHeaders();
+    const framing::FieldTable *headers = p ? &p->getApplicationHeaders() : 0;
 
-    if (headers && msg.getAppId() == "qmf2")
+    if (headers && p->getAppId() == "qmf2")
     {
         mapMsg = true;
 
@@ -2238,8 +2244,9 @@ bool ManagementAgent::authorizeAgentMess
 
         // authorization failed, send reply if replyTo present
 
+        qpid::broker::amqp_0_10::MessageTransfer& transfer(qpid::broker::amqp_0_10::MessageTransfer::get(msg));
         const framing::MessageProperties* p =
-            msg.getFrames().getHeaders()->get<framing::MessageProperties>();
+            transfer.getFrames().getHeaders()->get<framing::MessageProperties>();
         if (p && p->hasReplyTo()) {
             const framing::ReplyTo& rt = p->getReplyTo();
             string rte = rt.getExchange();
@@ -2277,8 +2284,9 @@ void ManagementAgent::dispatchAgentComma
 {
     string   rte;
     string   rtk;
+    qpid::broker::amqp_0_10::MessageTransfer& transfer(qpid::broker::amqp_0_10::MessageTransfer::get(msg));
     const framing::MessageProperties* p =
-        msg.getFrames().getHeaders()->get<framing::MessageProperties>();
+        transfer.getFrames().getHeaders()->get<framing::MessageProperties>();
     if (p && p->hasReplyTo()) {
         const framing::ReplyTo& rt = p->getReplyTo();
         rte = rt.getExchange();
@@ -2290,19 +2298,19 @@ void ManagementAgent::dispatchAgentComma
     Buffer   inBuffer(inputBuffer, MA_BUFFER_SIZE);
     uint8_t  opcode;
 
-    if (msg.encodedSize() > MA_BUFFER_SIZE) {
+    if (msg.getContentSize() > MA_BUFFER_SIZE) {
         QPID_LOG(debug, "ManagementAgent::dispatchAgentCommandLH: Message too large: " <<
-                 msg.encodedSize());
+                 msg.getContentSize());
         return;
     }
 
-    msg.encodeContent(inBuffer);
+    inBuffer.putRawData(msg.getContent());
     uint32_t bufferLen = inBuffer.getPosition();
     inBuffer.reset();
 
     ScopedManagementContext context((const qpid::broker::ConnectionState*) msg.getPublisher());
-    const framing::FieldTable *headers = msg.getApplicationHeaders();
-    if (headers && msg.getAppId() == "qmf2")
+    const framing::FieldTable *headers = p ? &p->getApplicationHeaders() : 0;
+    if (headers && p->getAppId() == "qmf2")
     {
         string opcode = headers->getAsString("qmf.opcode");
         string contentType = headers->getAsString("qmf.content");

Propchange: qpid/branches/asyncstore/cpp/src/qpid/management/ManagementAgent.cpp
------------------------------------------------------------------------------
  Merged /qpid/trunk/qpid/cpp/src/qpid/management/ManagementAgent.cpp:r1368652-1375508

Propchange: qpid/branches/asyncstore/cpp/src/qpid/management/ManagementAgent.h
------------------------------------------------------------------------------
  Merged /qpid/trunk/qpid/cpp/src/qpid/management/ManagementAgent.h:r1368652-1375508



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org