You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by kp...@apache.org on 2012/08/27 17:40:45 UTC
svn commit: r1377715 [6/12] - in /qpid/branches/asyncstore: ./ bin/ cpp/
cpp/docs/api/ cpp/examples/old_api/tradedemo/ cpp/include/qmf/engine/
cpp/include/qpid/client/ cpp/src/ cpp/src/qmf/engine/ cpp/src/qpid/acl/
cpp/src/qpid/asyncStore/ cpp/src/qpid...
Modified: qpid/branches/asyncstore/cpp/src/qpid/broker/TxBuffer.h
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/broker/TxBuffer.h?rev=1377715&r1=1377714&r2=1377715&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/broker/TxBuffer.h (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/broker/TxBuffer.h Mon Aug 27 15:40:33 2012
@@ -108,9 +108,6 @@ namespace qpid {
* commit
*/
QPID_BROKER_EXTERN bool commitLocal(TransactionalStore* const store);
-
- // Used by cluster to replicate transaction status.
- void accept(TxOpConstVisitor& v) const;
};
}
}
Modified: qpid/branches/asyncstore/cpp/src/qpid/broker/TxOp.h
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/broker/TxOp.h?rev=1377715&r1=1377714&r2=1377715&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/broker/TxOp.h (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/broker/TxOp.h Mon Aug 27 15:40:33 2012
@@ -21,7 +21,6 @@
#ifndef _TxOp_
#define _TxOp_
-#include "qpid/broker/TxOpVisitor.h"
#include "qpid/broker/TransactionalStore.h"
#include <boost/shared_ptr.hpp>
@@ -36,8 +35,6 @@ namespace qpid {
virtual void commit() throw() = 0;
virtual void rollback() throw() = 0;
virtual ~TxOp(){}
-
- virtual void accept(TxOpConstVisitor&) const = 0;
};
}} // namespace qpid::broker
Modified: qpid/branches/asyncstore/cpp/src/qpid/broker/TxOpVisitor.h
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/broker/TxOpVisitor.h?rev=1377715&r1=1377714&r2=1377715&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/broker/TxOpVisitor.h (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/broker/TxOpVisitor.h Mon Aug 27 15:40:33 2012
@@ -1,97 +0,0 @@
-#ifndef QPID_BROKER_TXOPVISITOR_H
-#define QPID_BROKER_TXOPVISITOR_H
-
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-namespace qpid {
-namespace broker {
-
-class DtxAck;
-class RecoveredDequeue;
-class RecoveredEnqueue;
-class TxAccept;
-class TxPublish;
-
-/**
- * Visitor for TxOp familly of classes.
- */
-struct TxOpConstVisitor
-{
- virtual ~TxOpConstVisitor() {}
- virtual void operator()(const DtxAck&) = 0;
- virtual void operator()(const RecoveredDequeue&) = 0;
- virtual void operator()(const RecoveredEnqueue&) = 0;
- virtual void operator()(const TxAccept&) = 0;
- virtual void operator()(const TxPublish&) = 0;
-};
-
-}} // namespace qpid::broker
-
-#endif /*!QPID_BROKER_TXOPVISITOR_H*/
-#ifndef QPID_BROKER_TXOPVISITOR_H
-#define QPID_BROKER_TXOPVISITOR_H
-
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-namespace qpid {
-namespace broker {
-
-class DtxAck;
-class RecoveredDequeue;
-class RecoveredEnqueue;
-class TxAccept;
-class TxPublish;
-
-/**
- * Visitor for TxOp familly of classes.
- */
-struct TxOpConstVisitor
-{
- virtual ~TxOpConstVisitor() {}
- virtual void operator()(const DtxAck&) = 0;
- virtual void operator()(const RecoveredDequeue&) = 0;
- virtual void operator()(const RecoveredEnqueue&) = 0;
- virtual void operator()(const TxAccept&) = 0;
- virtual void operator()(const TxPublish&) = 0;
-};
-
-}} // namespace qpid::broker
-
-#endif /*!QPID_BROKER_TXOPVISITOR_H*/
Modified: qpid/branches/asyncstore/cpp/src/qpid/broker/TxPublish.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/broker/TxPublish.cpp?rev=1377715&r1=1377714&r2=1377715&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/broker/TxPublish.cpp (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/broker/TxPublish.cpp Mon Aug 27 15:40:33 2012
@@ -1,111 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-#include "qpid/log/Statement.h"
-#include "qpid/broker/TxPublish.h"
-#include "qpid/broker/Queue.h"
-
-using boost::intrusive_ptr;
-using namespace qpid::broker;
-
-TxPublish::TxPublish(intrusive_ptr<Message> _msg) : msg(_msg) {}
-
-bool TxPublish::prepare(TransactionContext* ctxt) throw()
-{
- try{
- while (!queues.empty()) {
- prepare(ctxt, queues.front());
- prepared.push_back(queues.front());
- queues.pop_front();
- }
- return true;
- }catch(const std::exception& e){
- QPID_LOG(error, "Failed to prepare: " << e.what());
- }catch(...){
- QPID_LOG(error, "Failed to prepare (unknown error)");
- }
- return false;
-}
-
-void TxPublish::commit() throw()
-{
- try {
- for_each(prepared.begin(), prepared.end(), Commit(msg));
- if (msg->isContentReleaseRequested()) {
- // NOTE: The log messages in this section are used for flow-to-disk testing (which checks the log for the
- // presence of these messages). Do not change these without also checking these tests.
- if (msg->isContentReleaseBlocked()) {
- QPID_LOG(debug, "Message id=\"" << msg->getProperties<qpid::framing::MessageProperties>()->getMessageId() << "\"; pid=0x" <<
- std::hex << msg->getPersistenceId() << std::dec << ": Content release blocked on commit");
- } else {
- msg->releaseContent();
- QPID_LOG(debug, "Message id=\"" << msg->getProperties<qpid::framing::MessageProperties>()->getMessageId() << "\"; pid=0x" <<
- std::hex << msg->getPersistenceId() << std::dec << ": Content released on commit");
- }
- }
- } catch (const std::exception& e) {
- QPID_LOG(error, "Failed to commit: " << e.what());
- } catch(...) {
- QPID_LOG(error, "Failed to commit (unknown error)");
- }
-}
-
-void TxPublish::rollback() throw()
-{
- try {
- for_each(prepared.begin(), prepared.end(), Rollback(msg));
- } catch (const std::exception& e) {
- QPID_LOG(error, "Failed to complete rollback: " << e.what());
- } catch(...) {
- QPID_LOG(error, "Failed to complete rollback (unknown error)");
- }
-
-}
-
-void TxPublish::deliverTo(const boost::shared_ptr<Queue>& queue){
- if (!queue->isLocal(msg)) {
- queues.push_back(queue);
- delivered = true;
- } else {
- QPID_LOG(debug, "Won't enqueue local message for " << queue->getName());
- }
-}
-
-void TxPublish::prepare(TransactionContext* ctxt, const boost::shared_ptr<Queue> queue)
-{
- queue->enqueue(ctxt, msg);
-}
-
-TxPublish::Commit::Commit(intrusive_ptr<Message>& _msg) : msg(_msg){}
-
-void TxPublish::Commit::operator()(const boost::shared_ptr<Queue>& queue){
- queue->process(msg);
-}
-
-TxPublish::Rollback::Rollback(intrusive_ptr<Message>& _msg) : msg(_msg){}
-
-void TxPublish::Rollback::operator()(const boost::shared_ptr<Queue>& queue){
- queue->enqueueAborted(msg);
-}
-
-uint64_t TxPublish::contentSize ()
-{
- return msg->contentSize ();
-}
Modified: qpid/branches/asyncstore/cpp/src/qpid/broker/TxPublish.h
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/broker/TxPublish.h?rev=1377715&r1=1377714&r2=1377715&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/broker/TxPublish.h (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/broker/TxPublish.h Mon Aug 27 15:40:33 2012
@@ -1,92 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-#ifndef _TxPublish_
-#define _TxPublish_
-
-#include "qpid/broker/BrokerImportExport.h"
-#include "qpid/broker/Deliverable.h"
-#include "qpid/broker/Message.h"
-#include "qpid/broker/MessageStore.h"
-#include "qpid/broker/TxOp.h"
-
-#include <algorithm>
-#include <functional>
-#include <list>
-
-#include <boost/intrusive_ptr.hpp>
-
-namespace qpid {
-namespace broker {
-/**
- * Defines the behaviour for publish operations on a
- * transactional channel. Messages are routed through
- * exchanges when received but are not at that stage delivered
- * to the matching queues, rather the queues are held in an
- * instance of this class. On prepare() the message is marked
- * enqueued to the relevant queues in the MessagesStore. On
- * commit() the messages will be passed to the queue for
- * dispatch or to be added to the in-memory queue.
- */
-class QPID_BROKER_CLASS_EXTERN TxPublish : public TxOp, public Deliverable{
-
- class Commit{
- boost::intrusive_ptr<Message>& msg;
- public:
- Commit(boost::intrusive_ptr<Message>& msg);
- void operator()(const boost::shared_ptr<Queue>& queue);
- };
- class Rollback{
- boost::intrusive_ptr<Message>& msg;
- public:
- Rollback(boost::intrusive_ptr<Message>& msg);
- void operator()(const boost::shared_ptr<Queue>& queue);
- };
-
- boost::intrusive_ptr<Message> msg;
- std::list<boost::shared_ptr<Queue> > queues;
- std::list<boost::shared_ptr<Queue> > prepared;
-
- void prepare(TransactionContext* ctxt, boost::shared_ptr<Queue>);
-
- public:
- QPID_BROKER_EXTERN TxPublish(boost::intrusive_ptr<Message> msg);
- QPID_BROKER_EXTERN virtual bool prepare(TransactionContext* ctxt) throw();
- QPID_BROKER_EXTERN virtual void commit() throw();
- QPID_BROKER_EXTERN virtual void rollback() throw();
-
- virtual Message& getMessage() { return *msg; };
-
- QPID_BROKER_EXTERN virtual void deliverTo(const boost::shared_ptr<Queue>& queue);
-
- virtual ~TxPublish(){}
- virtual void accept(TxOpConstVisitor& visitor) const { visitor(*this); }
-
- QPID_BROKER_EXTERN uint64_t contentSize();
-
- boost::intrusive_ptr<Message> getMessage() const { return msg; }
- const std::list<boost::shared_ptr<Queue> >& getQueues() const { return queues; }
- const std::list<boost::shared_ptr<Queue> >& getPrepared() const { return prepared; }
-};
-}
-}
-
-
-#endif
Modified: qpid/branches/asyncstore/cpp/src/qpid/broker/windows/SslProtocolFactory.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/broker/windows/SslProtocolFactory.cpp?rev=1377715&r1=1377714&r2=1377715&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/broker/windows/SslProtocolFactory.cpp (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/broker/windows/SslProtocolFactory.cpp Mon Aug 27 15:40:33 2012
@@ -281,7 +281,7 @@ void SslProtocolFactory::established(sys
boost::bind(&AsynchIOHandler::idle, async, _1));
}
- async->init(aio, brokerTimer, maxNegotiateTime, 4);
+ async->init(aio, brokerTimer, maxNegotiateTime);
aio->start(poller);
}
Modified: qpid/branches/asyncstore/cpp/src/qpid/client/SslConnector.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/client/SslConnector.cpp?rev=1377715&r1=1377714&r2=1377715&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/client/SslConnector.cpp (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/client/SslConnector.cpp Mon Aug 27 15:40:33 2012
@@ -38,7 +38,6 @@
#include "qpid/Msg.h"
#include <iostream>
-#include <map>
#include <boost/bind.hpp>
#include <boost/format.hpp>
@@ -54,53 +53,29 @@ using boost::str;
class SslConnector : public Connector
{
- struct Buff;
-
- /** Batch up frames for writing to aio. */
- class Writer : public framing::FrameHandler {
- typedef sys::ssl::SslIOBufferBase BufferBase;
- typedef std::vector<framing::AMQFrame> Frames;
-
- const uint16_t maxFrameSize;
- sys::Mutex lock;
- sys::ssl::SslIO* aio;
- BufferBase* buffer;
- Frames frames;
- size_t lastEof; // Position after last EOF in frames
- framing::Buffer encode;
- size_t framesEncoded;
- std::string identifier;
- Bounds* bounds;
-
- void writeOne();
- void newBuffer();
-
- public:
-
- Writer(uint16_t maxFrameSize, Bounds*);
- ~Writer();
- void init(std::string id, sys::ssl::SslIO*);
- void handle(framing::AMQFrame&);
- void write(sys::ssl::SslIO&);
- };
+ typedef std::deque<framing::AMQFrame> Frames;
const uint16_t maxFrameSize;
+
+ sys::Mutex lock;
+ Frames frames;
+ size_t lastEof; // Position after last EOF in frames
+ uint64_t currentSize;
+ Bounds* bounds;
+
framing::ProtocolVersion version;
bool initiated;
- SecuritySettings securitySettings;
-
- sys::Mutex closedLock;
bool closed;
sys::ShutdownHandler* shutdownHandler;
framing::InputHandler* input;
- Writer writer;
-
sys::ssl::SslSocket socket;
sys::ssl::SslIO* aio;
+ std::string identifier;
Poller::shared_ptr poller;
+ SecuritySettings securitySettings;
~SslConnector();
@@ -110,10 +85,7 @@ class SslConnector : public Connector
void eof(qpid::sys::ssl::SslIO&);
void disconnected(qpid::sys::ssl::SslIO&);
- std::string identifier;
-
void connect(const std::string& host, const std::string& port);
- void init();
void close();
void send(framing::AMQFrame& frame);
void abort() {} // TODO: Need to fix for heartbeat timeouts to work
@@ -126,17 +98,16 @@ class SslConnector : public Connector
const SecuritySettings* getSecuritySettings();
void socketClosed(qpid::sys::ssl::SslIO&, const qpid::sys::ssl::SslSocket&);
+ size_t decode(const char* buffer, size_t size);
+ size_t encode(const char* buffer, size_t size);
+ bool canEncode();
+
public:
SslConnector(Poller::shared_ptr p, framing::ProtocolVersion pVersion,
const ConnectionSettings&,
ConnectionImpl*);
};
-struct SslConnector::Buff : public SslIO::BufferBase {
- Buff(size_t size) : SslIO::BufferBase(new char[size], size) {}
- ~Buff() { delete [] bytes;}
-};
-
// Static constructor which registers connector here
namespace {
Connector* create(Poller::shared_ptr p, framing::ProtocolVersion v, const ConnectionSettings& s, ConnectionImpl* c) {
@@ -170,12 +141,14 @@ SslConnector::SslConnector(Poller::share
const ConnectionSettings& settings,
ConnectionImpl* cimpl)
: maxFrameSize(settings.maxFrameSize),
+ lastEof(0),
+ currentSize(0),
+ bounds(cimpl),
version(ver),
initiated(false),
closed(true),
shutdownHandler(0),
input(0),
- writer(maxFrameSize, cimpl),
aio(0),
poller(p)
{
@@ -192,7 +165,7 @@ SslConnector::~SslConnector() {
}
void SslConnector::connect(const std::string& host, const std::string& port){
- Mutex::ScopedLock l(closedLock);
+ Mutex::ScopedLock l(lock);
assert(closed);
try {
socket.connect(host, port);
@@ -201,7 +174,6 @@ void SslConnector::connect(const std::st
throw TransportFailure(e.what());
}
- identifier = str(format("[%1% %2%]") % socket.getLocalPort() % socket.getPeerAddress());
closed = false;
aio = new SslIO(socket,
boost::bind(&SslConnector::readbuff, this, _1, _2),
@@ -210,21 +182,16 @@ void SslConnector::connect(const std::st
boost::bind(&SslConnector::socketClosed, this, _1, _2),
0, // nobuffs
boost::bind(&SslConnector::writebuff, this, _1));
- writer.init(identifier, aio);
-}
-void SslConnector::init(){
- Mutex::ScopedLock l(closedLock);
+ aio->createBuffers(maxFrameSize);
+ identifier = str(format("[%1% %2%]") % socket.getLocalPort() % socket.getPeerAddress());
ProtocolInitiation init(version);
writeDataBlock(init);
- for (int i = 0; i < 32; i++) {
- aio->queueReadBuffer(new Buff(maxFrameSize));
- }
aio->start(poller);
}
void SslConnector::close() {
- Mutex::ScopedLock l(closedLock);
+ Mutex::ScopedLock l(lock);
if (!closed) {
closed = true;
if (aio)
@@ -260,76 +227,110 @@ const std::string& SslConnector::getIden
}
void SslConnector::send(AMQFrame& frame) {
- writer.handle(frame);
-}
-
-SslConnector::Writer::Writer(uint16_t s, Bounds* b) : maxFrameSize(s), aio(0), buffer(0), lastEof(0), bounds(b)
-{
-}
-
-SslConnector::Writer::~Writer() { delete buffer; }
-
-void SslConnector::Writer::init(std::string id, sys::ssl::SslIO* a) {
- Mutex::ScopedLock l(lock);
- identifier = id;
- aio = a;
- newBuffer();
-}
-void SslConnector::Writer::handle(framing::AMQFrame& frame) {
+ bool notifyWrite = false;
+ {
Mutex::ScopedLock l(lock);
frames.push_back(frame);
- if (frame.getEof() || (bounds && bounds->getCurrentSize() >= maxFrameSize)) {
+ //only ask to write if this is the end of a frameset or if we
+ //already have a buffers worth of data
+ currentSize += frame.encodedSize();
+ if (frame.getEof()) {
lastEof = frames.size();
- aio->notifyPendingWrite();
+ notifyWrite = true;
+ } else {
+ notifyWrite = (currentSize >= maxFrameSize);
+ }
+ /*
+ NOTE: Moving the following line into this mutex block
+ is a workaround for BZ 570168, in which the test
+ testConcurrentSenders causes a hang about 1.5%
+ of the time. ( To see the hang much more frequently
+ leave this line out of the mutex block, and put a
+ small usleep just before it.)
+
+ TODO mgoulish - fix the underlying cause and then
+ move this call back outside the mutex.
+ */
+ if (notifyWrite && !closed) aio->notifyPendingWrite();
}
- QPID_LOG(trace, "SENT [" << identifier << "]: " << frame);
}
-void SslConnector::Writer::writeOne() {
- assert(buffer);
- framesEncoded = 0;
+void SslConnector::writebuff(SslIO& /*aio*/)
+{
+ // It's possible to be disconnected and be writable
+ if (closed)
+ return;
- buffer->dataStart = 0;
- buffer->dataCount = encode.getPosition();
- aio->queueWrite(buffer);
- newBuffer();
-}
+ if (!canEncode()) {
+ return;
+ }
-void SslConnector::Writer::newBuffer() {
- buffer = aio->getQueuedBuffer();
- if (!buffer) buffer = new Buff(maxFrameSize);
- encode = framing::Buffer(buffer->bytes, buffer->byteCount);
- framesEncoded = 0;
+ SslIO::BufferBase* buffer = aio->getQueuedBuffer();
+ if (buffer) {
+
+ size_t encoded = encode(buffer->bytes, buffer->byteCount);
+
+ buffer->dataStart = 0;
+ buffer->dataCount = encoded;
+ aio->queueWrite(buffer);
+ }
}
// Called in IO thread.
-void SslConnector::Writer::write(sys::ssl::SslIO&) {
+bool SslConnector::canEncode()
+{
Mutex::ScopedLock l(lock);
- assert(buffer);
+ //have at least one full frameset or a whole buffers worth of data
+ return lastEof || currentSize >= maxFrameSize;
+}
+
+// Called in IO thread.
+size_t SslConnector::encode(const char* buffer, size_t size)
+{
+ framing::Buffer out(const_cast<char*>(buffer), size);
size_t bytesWritten(0);
- for (size_t i = 0; i < lastEof; ++i) {
- AMQFrame& frame = frames[i];
- uint32_t size = frame.encodedSize();
- if (size > encode.available()) writeOne();
- assert(size <= encode.available());
- frame.encode(encode);
- ++framesEncoded;
- bytesWritten += size;
+ {
+ Mutex::ScopedLock l(lock);
+ while (!frames.empty() && out.available() >= frames.front().encodedSize() ) {
+ frames.front().encode(out);
+ QPID_LOG(trace, "SENT [" << identifier << "]: " << frames.front());
+ frames.pop_front();
+ if (lastEof) --lastEof;
+ }
+ bytesWritten = size - out.available();
+ currentSize -= bytesWritten;
}
- frames.erase(frames.begin(), frames.begin()+lastEof);
- lastEof = 0;
if (bounds) bounds->reduce(bytesWritten);
- if (encode.getPosition() > 0) writeOne();
+ return bytesWritten;
}
-void SslConnector::readbuff(SslIO& aio, SslIO::BufferBase* buff) {
- framing::Buffer in(buff->bytes+buff->dataStart, buff->dataCount);
+void SslConnector::readbuff(SslIO& aio, SslIO::BufferBase* buff)
+{
+ int32_t decoded = decode(buff->bytes+buff->dataStart, buff->dataCount);
+ // TODO: unreading needs to go away, and when we can cope
+ // with multiple sub-buffers in the general buffer scheme, it will
+ if (decoded < buff->dataCount) {
+ // Adjust buffer for used bytes and then "unread them"
+ buff->dataStart += decoded;
+ buff->dataCount -= decoded;
+ aio.unread(buff);
+ } else {
+ // Give whole buffer back to aio subsystem
+ aio.queueReadBuffer(buff);
+ }
+}
+size_t SslConnector::decode(const char* buffer, size_t size)
+{
+ framing::Buffer in(const_cast<char*>(buffer), size);
if (!initiated) {
framing::ProtocolInitiation protocolInit;
if (protocolInit.decode(in)) {
- //TODO: check the version is correct
QPID_LOG(debug, "RECV [" << identifier << "]: INIT(" << protocolInit << ")");
+ if(!(protocolInit==version)){
+ throw Exception(QPID_MSG("Unsupported version: " << protocolInit
+ << " supported version " << version));
+ }
}
initiated = true;
}
@@ -338,25 +339,12 @@ void SslConnector::readbuff(SslIO& aio,
QPID_LOG(trace, "RECV [" << identifier << "]: " << frame);
input->received(frame);
}
- // TODO: unreading needs to go away, and when we can cope
- // with multiple sub-buffers in the general buffer scheme, it will
- if (in.available() != 0) {
- // Adjust buffer for used bytes and then "unread them"
- buff->dataStart += buff->dataCount-in.available();
- buff->dataCount = in.available();
- aio.unread(buff);
- } else {
- // Give whole buffer back to aio subsystem
- aio.queueReadBuffer(buff);
- }
-}
-
-void SslConnector::writebuff(SslIO& aio_) {
- writer.write(aio_);
+ return size - in.available();
}
void SslConnector::writeDataBlock(const AMQDataBlock& data) {
- SslIO::BufferBase* buff = new Buff(maxFrameSize);
+ SslIO::BufferBase* buff = aio->getQueuedBuffer();
+ assert(buff);
framing::Buffer out(buff->bytes, buff->byteCount);
data.encode(out);
buff->dataCount = data.encodedSize();
Modified: qpid/branches/asyncstore/cpp/src/qpid/client/SubscriptionManagerImpl.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/client/SubscriptionManagerImpl.cpp?rev=1377715&r1=1377714&r2=1377715&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/client/SubscriptionManagerImpl.cpp (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/client/SubscriptionManagerImpl.cpp Mon Aug 27 15:40:33 2012
@@ -28,6 +28,7 @@
#include <qpid/client/Session.h>
#include <qpid/client/MessageListener.h>
#include <qpid/framing/Uuid.h>
+#include <qpid/log/Statement.h>
#include <set>
#include <sstream>
@@ -167,6 +168,15 @@ void SubscriptionManagerImpl::setFlowCon
setFlowControl(name, FlowControl(messages, bytes, window));
}
+AutoCancel::AutoCancel(SubscriptionManager& sm_, const std::string& tag_) : sm(sm_), tag(tag_) {}
+AutoCancel::~AutoCancel() {
+ try {
+ sm.cancel(tag);
+ } catch (const qpid::Exception& e) {
+ QPID_LOG(info, "Exception in AutoCancel destructor: " << e.what());
+ }
+}
+
}} // namespace qpid::client
Modified: qpid/branches/asyncstore/cpp/src/qpid/client/TCPConnector.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/client/TCPConnector.cpp?rev=1377715&r1=1377714&r2=1377715&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/client/TCPConnector.cpp (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/client/TCPConnector.cpp Mon Aug 27 15:40:33 2012
@@ -46,11 +46,6 @@ using namespace qpid::framing;
using boost::format;
using boost::str;
-struct TCPConnector::Buff : public AsynchIO::BufferBase {
- Buff(size_t size) : AsynchIO::BufferBase(new char[size], size) {}
- ~Buff() { delete [] bytes;}
-};
-
// Static constructor which registers connector here
namespace {
Connector* create(Poller::shared_ptr p, framing::ProtocolVersion v, const ConnectionSettings& s, ConnectionImpl* c) {
@@ -118,9 +113,8 @@ void TCPConnector::connected(const Socke
void TCPConnector::start(sys::AsynchIO* aio_) {
aio = aio_;
- for (int i = 0; i < 4; i++) {
- aio->queueReadBuffer(new Buff(maxFrameSize));
- }
+
+ aio->createBuffers(maxFrameSize);
identifier = str(format("[%1%]") % socket.getFullAddress());
}
@@ -226,15 +220,19 @@ void TCPConnector::writebuff(AsynchIO& /
return;
Codec* codec = securityLayer.get() ? (Codec*) securityLayer.get() : (Codec*) this;
- if (codec->canEncode()) {
- std::auto_ptr<AsynchIO::BufferBase> buffer = std::auto_ptr<AsynchIO::BufferBase>(aio->getQueuedBuffer());
- if (!buffer.get()) buffer = std::auto_ptr<AsynchIO::BufferBase>(new Buff(maxFrameSize));
+
+ if (!codec->canEncode()) {
+ return;
+ }
+
+ AsynchIO::BufferBase* buffer = aio->getQueuedBuffer();
+ if (buffer) {
size_t encoded = codec->encode(buffer->bytes, buffer->byteCount);
buffer->dataStart = 0;
buffer->dataCount = encoded;
- aio->queueWrite(buffer.release());
+ aio->queueWrite(buffer);
}
}
@@ -307,6 +305,7 @@ size_t TCPConnector::decode(const char*
void TCPConnector::writeDataBlock(const AMQDataBlock& data) {
AsynchIO::BufferBase* buff = aio->getQueuedBuffer();
+ assert(buff);
framing::Buffer out(buff->bytes, buff->byteCount);
data.encode(out);
buff->dataCount = data.encodedSize();
Modified: qpid/branches/asyncstore/cpp/src/qpid/client/TCPConnector.h
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/client/TCPConnector.h?rev=1377715&r1=1377714&r2=1377715&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/client/TCPConnector.h (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/client/TCPConnector.h Mon Aug 27 15:40:33 2012
@@ -50,7 +50,6 @@ namespace client {
class TCPConnector : public Connector, public sys::Codec
{
typedef std::deque<framing::AMQFrame> Frames;
- struct Buff;
const uint16_t maxFrameSize;
Modified: qpid/branches/asyncstore/cpp/src/qpid/ha/Backup.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/ha/Backup.cpp?rev=1377715&r1=1377714&r2=1377715&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/ha/Backup.cpp (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/ha/Backup.cpp Mon Aug 27 15:40:33 2012
@@ -35,6 +35,7 @@
#include "qpid/framing/MessageTransferBody.h"
#include "qpid/sys/SystemInfo.h"
#include "qpid/types/Variant.h"
+#include "qpid/log/Statement.h"
namespace qpid {
namespace ha {
@@ -51,35 +52,15 @@ Backup::Backup(HaBroker& hb, const Setti
if (!s.brokerUrl.empty()) initialize(Url(s.brokerUrl));
}
-bool Backup::isSelf(const Address& a) const {
- return sys::SystemInfo::isLocalHost(a.host) &&
- a.port == haBroker.getBroker().getPort(a.protocol);
-}
-
-// Remove my own address from the URL if possible.
-// This isn't 100% reliable given the many ways to specify a host,
-// but should work in most cases. We have additional measures to prevent
-// self-connection in ConnectionObserver
-Url Backup::removeSelf(const Url& brokers) const {
- Url url;
- for (Url::const_iterator i = brokers.begin(); i != brokers.end(); ++i)
- if (!isSelf(*i)) url.push_back(*i);
- if (url.empty())
- throw Url::Invalid(logPrefix+"Failover URL is empty");
- QPID_LOG(debug, logPrefix << "Failover URL (excluding self): " << url);
- return url;
-}
-
void Backup::initialize(const Url& brokers) {
if (brokers.empty()) throw Url::Invalid("HA broker URL is empty");
QPID_LOG(info, logPrefix << "Connecting to cluster, broker URL: " << brokers);
- Url url = removeSelf(brokers);
- string protocol = url[0].protocol.empty() ? "tcp" : url[0].protocol;
+ string protocol = brokers[0].protocol.empty() ? "tcp" : brokers[0].protocol;
types::Uuid uuid(true);
// Declare the link
std::pair<Link::shared_ptr, bool> result = broker.getLinks().declare(
broker::QPID_NAME_PREFIX + string("ha.link.") + uuid.str(),
- url[0].host, url[0].port, protocol,
+ brokers[0].host, brokers[0].port, protocol,
false, // durable
settings.mechanism, settings.username, settings.password,
false); // no amq.failover - don't want to use client URL.
@@ -90,7 +71,7 @@ void Backup::initialize(const Url& broke
replicator->initialize();
broker.getExchanges().registerExchange(replicator);
}
- link->setUrl(url); // Outside the lock, once set link doesn't change.
+ link->setUrl(brokers); // Outside the lock, once set link doesn't change.
}
Backup::~Backup() {
@@ -107,10 +88,8 @@ void Backup::setBrokerUrl(const Url& url
sys::Mutex::ScopedLock l(lock);
linkSet = link;
}
- if (linkSet) {
- QPID_LOG(info, logPrefix << "Broker URL set to: " << url);
- link->setUrl(removeSelf(url)); // Outside lock, once set link doesn't change
- }
+ if (linkSet)
+ link->setUrl(url); // Outside lock, once set link doesn't change
else
initialize(url); // Deferred initialization
}
Modified: qpid/branches/asyncstore/cpp/src/qpid/ha/Backup.h
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/ha/Backup.h?rev=1377715&r1=1377714&r2=1377715&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/ha/Backup.h (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/ha/Backup.h Mon Aug 27 15:40:33 2012
@@ -53,8 +53,6 @@ class Backup
void setStatus(BrokerStatus);
private:
- bool isSelf(const Address& a) const;
- Url removeSelf(const Url&) const;
void initialize(const Url&);
std::string logPrefix;
Modified: qpid/branches/asyncstore/cpp/src/qpid/ha/BackupConnectionExcluder.h
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/ha/BackupConnectionExcluder.h?rev=1377715&r1=1377714&r2=1377715&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/ha/BackupConnectionExcluder.h (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/ha/BackupConnectionExcluder.h Mon Aug 27 15:40:33 2012
@@ -36,7 +36,7 @@ class BackupConnectionExcluder : public
{
public:
void opened(broker::Connection& connection) {
- QPID_LOG(debug, "Backup broker rejected connection "+connection.getMgmtId());
+ QPID_LOG(debug, "Backup: Rejected connection "+connection.getMgmtId());
connection.abort();
}
Modified: qpid/branches/asyncstore/cpp/src/qpid/ha/BrokerReplicator.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/ha/BrokerReplicator.cpp?rev=1377715&r1=1377714&r2=1377715&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/ha/BrokerReplicator.cpp (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/ha/BrokerReplicator.cpp Mon Aug 27 15:40:33 2012
@@ -24,7 +24,9 @@
#include "qpid/broker/Broker.h"
#include "qpid/broker/Connection.h"
#include "qpid/broker/Queue.h"
+#include "qpid/broker/QueueSettings.h"
#include "qpid/broker/Link.h"
+#include "qpid/broker/amqp_0_10/MessageTransfer.h"
#include "qpid/framing/FieldTable.h"
#include "qpid/log/Statement.h"
#include "qpid/amqp_0_10/Codecs.h"
@@ -117,11 +119,6 @@ const string _QUERY_REQUEST("_query_requ
const string BROKER("broker");
const string MEMBERS("members");
-bool isQMFv2(const Message& message) {
- const framing::MessageProperties* props = message.getProperties<framing::MessageProperties>();
- return props && props->getAppId() == QMF2;
-}
-
template <class T> bool match(Variant::Map& schema) {
return T::match(schema[CLASS_NAME], schema[PACKAGE_NAME]);
}
@@ -219,7 +216,7 @@ void BrokerReplicator::initializeBridge(
link->getRemoteAddress(primary);
string queueName = bridge.getQueueName();
- QPID_LOG(info, logPrefix << (initialized ? "Connecting" : "Failing over")
+ QPID_LOG(info, logPrefix << (initialized ? "Failing over" : "Connecting")
<< " to primary " << primary
<< " status:" << printable(haBroker.getStatus()));
initialized = true;
@@ -253,18 +250,15 @@ void BrokerReplicator::route(Deliverable
haBroker.setStatus(CATCHUP);
QPID_LOG(notice, logPrefix << "Connected to primary " << primary);
}
-
- const framing::FieldTable* headers = msg.getMessage().getApplicationHeaders();
- const MessageProperties* messageProperties = msg.getMessage().getProperties<MessageProperties>();
Variant::List list;
try {
- if (!isQMFv2(msg.getMessage()) || !headers || !messageProperties)
+ if (!qpid::broker::amqp_0_10::MessageTransfer::isQMFv2(msg.getMessage()))
throw Exception("Unexpected message, not QMF2 event or query response.");
// decode as list
- string content = msg.getMessage().getFrames().getContent();
- amqp_0_10::ListCodec::decode(content, list);
- QPID_LOG(trace, "Broker replicator received: " << *messageProperties);
- if (headers->getAsString(QMF_CONTENT) == EVENT) {
+ string content = msg.getMessage().getContent();
+ qpid::amqp_0_10::ListCodec::decode(content, list);
+
+ if (msg.getMessage().getPropertyAsString(QMF_CONTENT) == EVENT) {
for (Variant::List::iterator i = list.begin(); i != list.end(); ++i) {
Variant::Map& map = i->asMap();
QPID_LOG(trace, "Broker replicator event: " << map);
@@ -278,20 +272,20 @@ void BrokerReplicator::route(Deliverable
else if (match<EventUnbind>(schema)) doEventUnbind(values);
else if (match<EventMembersUpdate>(schema)) doEventMembersUpdate(values);
}
- } else if (headers->getAsString(QMF_OPCODE) == QUERY_RESPONSE) {
+ } else if (msg.getMessage().getPropertyAsString(QMF_OPCODE) == QUERY_RESPONSE) {
for (Variant::List::iterator i = list.begin(); i != list.end(); ++i) {
Variant::Map& map = i->asMap();
QPID_LOG(trace, "Broker replicator response: " << map);
string type = map[SCHEMA_ID].asMap()[CLASS_NAME].asString();
Variant::Map& values = map[VALUES].asMap();
framing::FieldTable args;
- amqp_0_10::translate(asMapVoid(values[ARGUMENTS]), args);
+ qpid::amqp_0_10::translate(asMapVoid(values[ARGUMENTS]), args);
if (type == QUEUE) doResponseQueue(values);
else if (type == EXCHANGE) doResponseExchange(values);
else if (type == BINDING) doResponseBind(values);
else if (type == HA_BROKER) doResponseHaBroker(values);
}
- if (messageProperties->getCorrelationId() == EXCHANGE && !headers->isSet(PARTIAL)) {
+ if (qpid::broker::amqp_0_10::MessageTransfer::isLastQMFResponse(msg.getMessage(), EXCHANGE)) {
// We have received all of the exchange response.
alternates.clear();
}
@@ -309,13 +303,11 @@ void BrokerReplicator::doEventQueueDecla
Variant::Map argsMap = asMapVoid(values[ARGS]);
bool autoDel = values[AUTODEL].asBool();
bool excl = values[EXCL].asBool();
- if (values[DISP] == CREATED &&
- replicationTest.isReplicated(CONFIGURATION, argsMap, autoDel, excl))
- {
+ if (values[DISP] == CREATED && replicationTest.isReplicated(CONFIGURATION, argsMap, autoDel, excl)) {
string name = values[QNAME].asString();
- QPID_LOG(debug, logPrefix << "Queue declare event: " << name);
+ QueueSettings settings(values[DURABLE].asBool(), values[AUTODEL].asBool());
framing::FieldTable args;
- amqp_0_10::translate(argsMap, args);
+ qpid::amqp_0_10::translate(argsMap, args);
// If we already have a queue with this name, replace it.
// The queue was definitely created on the primary.
if (broker.getQueues().find(name)) {
@@ -323,10 +315,17 @@ void BrokerReplicator::doEventQueueDecla
broker.getQueues().destroy(name);
stopQueueReplicator(name);
}
- boost::shared_ptr<Queue> queue = createQueue(
- name, values[DURABLE].asBool(), autoDel, args, values[ALTEX].asString());
- assert(queue); // Should be created since we destroed the previous queue above.
- if (queue) startQueueReplicator(queue);
+ settings.populate(args, settings.storeSettings);
+ std::pair<boost::shared_ptr<Queue>, bool> result =
+ broker.createQueue(
+ name,
+ settings,
+ 0 /*i.e. no owner regardless of exclusivity on master*/,
+ values[ALTEX].asString(),
+ userId,
+ remoteHost);
+ assert(result.second); // Should be true since we destroyed existing queue above
+ startQueueReplicator(result.first);
}
}
@@ -343,7 +342,7 @@ void BrokerReplicator::doEventQueueDelet
// sessions may be closed by a "queue deleted" exception.
string name = values[QNAME].asString();
boost::shared_ptr<Queue> queue = broker.getQueues().find(name);
- if (queue && replicationTest.replicateLevel(queue->getSettings())) {
+ if (queue && replicationTest.replicateLevel(queue->getSettings().storeSettings)) {
QPID_LOG(debug, logPrefix << "Queue delete event: " << name);
stopQueueReplicator(name);
broker.deleteQueue(name, userId, remoteHost);
@@ -357,7 +356,7 @@ void BrokerReplicator::doEventExchangeDe
string name = values[EXNAME].asString();
QPID_LOG(debug, logPrefix << "Exchange declare event: " << name);
framing::FieldTable args;
- amqp_0_10::translate(argsMap, args);
+ qpid::amqp_0_10::translate(argsMap, args);
// If we already have a exchange with this name, replace it.
// The exchange was definitely created on the primary.
if (broker.getExchanges().find(name)) {
@@ -391,10 +390,10 @@ void BrokerReplicator::doEventBind(Varia
// We only replicate binds for a replicated queue to replicated
// exchange that both exist locally.
if (exchange && replicationTest.replicateLevel(exchange->getArgs()) &&
- queue && replicationTest.replicateLevel(queue->getSettings()))
+ queue && replicationTest.replicateLevel(queue->getSettings().storeSettings))
{
framing::FieldTable args;
- amqp_0_10::translate(asMapVoid(values[ARGS]), args);
+ qpid::amqp_0_10::translate(asMapVoid(values[ARGS]), args);
string key = values[KEY].asString();
QPID_LOG(debug, logPrefix << "Bind event: exchange=" << exchange->getName()
<< " queue=" << queue->getName()
@@ -411,10 +410,10 @@ void BrokerReplicator::doEventUnbind(Var
// We only replicate unbinds for a replicated queue to replicated
// exchange that both exist locally.
if (exchange && replicationTest.replicateLevel(exchange->getArgs()) &&
- queue && replicationTest.replicateLevel(queue->getSettings()))
+ queue && replicationTest.replicateLevel(queue->getSettings().storeSettings))
{
framing::FieldTable args;
- amqp_0_10::translate(asMapVoid(values[ARGS]), args);
+ qpid::amqp_0_10::translate(asMapVoid(values[ARGS]), args);
string key = values[KEY].asString();
QPID_LOG(debug, logPrefix << "Unbind event: exchange=" << exchange->getName()
<< " queue=" << queue->getName()
@@ -455,7 +454,7 @@ void BrokerReplicator::doResponseQueue(V
string name(values[NAME].asString());
QPID_LOG(debug, logPrefix << "Queue response: " << name);
framing::FieldTable args;
- amqp_0_10::translate(argsMap, args);
+ qpid::amqp_0_10::translate(argsMap, args);
boost::shared_ptr<Queue> queue =
createQueue(name, values[DURABLE].asBool(), values[AUTODELETE].asBool(), args,
getAltExchange(values[ALTEXCHANGE]));
@@ -470,11 +469,12 @@ void BrokerReplicator::doResponseExchang
string name = values[NAME].asString();
QPID_LOG(debug, logPrefix << "Exchange response: " << name);
framing::FieldTable args;
- amqp_0_10::translate(argsMap, args);
+ qpid::amqp_0_10::translate(argsMap, args);
boost::shared_ptr<Exchange> exchange = createExchange(
name, values[TYPE].asString(), values[DURABLE].asBool(), args,
getAltExchange(values[ALTEXCHANGE]));
- QPID_LOG_IF(debug, !exchange, logPrefix << "Exchange already exists: " << name);
+ // It is normal for the exchange to already exist if we are failing over.
+ QPID_LOG_IF(debug, !exchange, logPrefix << "Exchange already replicated: " << name);
}
namespace {
@@ -506,14 +506,14 @@ void BrokerReplicator::doResponseBind(Va
// Automatically replicate binding if queue and exchange exist and are replicated
if (exchange && replicationTest.replicateLevel(exchange->getArgs()) &&
- queue && replicationTest.replicateLevel(queue->getSettings()))
+ queue && replicationTest.replicateLevel(queue->getSettings().storeSettings))
{
string key = values[KEY].asString();
QPID_LOG(debug, logPrefix << "Bind response: exchange:" << exName
<< " queue:" << qName
<< " key:" << key);
framing::FieldTable args;
- amqp_0_10::translate(asMapVoid(values[ARGUMENTS]), args);
+ qpid::amqp_0_10::translate(asMapVoid(values[ARGUMENTS]), args);
exchange->bind(queue, key, &args);
}
}
@@ -543,7 +543,7 @@ void BrokerReplicator::doResponseHaBroke
void BrokerReplicator::startQueueReplicator(const boost::shared_ptr<Queue>& queue)
{
- if (replicationTest.replicateLevel(queue->getSettings()) == ALL) {
+ if (replicationTest.replicateLevel(queue->getSettings().storeSettings) == ALL) {
boost::shared_ptr<QueueReplicator> qr(
new QueueReplicator(haBroker, queue, link));
if (!broker.getExchanges().registerExchange(qr))
@@ -569,14 +569,14 @@ boost::shared_ptr<Queue> BrokerReplicato
const qpid::framing::FieldTable& arguments,
const std::string& alternateExchange)
{
+ QueueSettings settings(durable, autodelete);
+ settings.populate(arguments, settings.storeSettings);
std::pair<boost::shared_ptr<Queue>, bool> result =
broker.createQueue(
name,
- durable,
- autodelete,
- 0, // no owner regardless of exclusivity on primary
+ settings,
+ 0,// no owner regardless of exclusivity on primary
string(), // Set alternate exchange below
- arguments,
userId,
remoteHost);
if (result.second) {
Modified: qpid/branches/asyncstore/cpp/src/qpid/ha/ConnectionObserver.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/ha/ConnectionObserver.cpp?rev=1377715&r1=1377714&r2=1377715&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/ha/ConnectionObserver.cpp (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/ha/ConnectionObserver.cpp Mon Aug 27 15:40:33 2012
@@ -32,7 +32,7 @@ namespace ha {
ConnectionObserver::ConnectionObserver(HaBroker& hb, const types::Uuid& uuid)
: haBroker(hb), logPrefix("Connections: "), self(uuid) {}
-bool ConnectionObserver::getBrokerInfo(broker::Connection& connection, BrokerInfo& info) {
+bool ConnectionObserver::getBrokerInfo(const broker::Connection& connection, BrokerInfo& info) {
framing::FieldTable ft;
if (connection.getClientProperties().getTable(ConnectionObserver::BACKUP_TAG, ft)) {
info = BrokerInfo(ft);
@@ -51,21 +51,23 @@ ConnectionObserver::ObserverPtr Connecti
return observer;
}
+bool ConnectionObserver::isSelf(const broker::Connection& connection) {
+ BrokerInfo info;
+ return getBrokerInfo(connection, info) && info.getSystemId() == self;
+}
+
void ConnectionObserver::opened(broker::Connection& connection) {
try {
if (connection.isLink()) return; // Allow outgoing links.
if (connection.getClientProperties().isSet(ADMIN_TAG)) {
- QPID_LOG(debug, logPrefix << "Allowing admin connection: "
+ QPID_LOG(debug, logPrefix << "Accepted admin connection: "
<< connection.getMgmtId());
return; // No need to call observer, always allow admins.
}
- BrokerInfo info; // Avoid self connections.
- if (getBrokerInfo(connection, info)) {
- if (info.getSystemId() == self) {
- QPID_LOG(debug, "HA broker rejected self connection "+connection.getMgmtId());
- connection.abort();
- }
-
+ if (isSelf(connection)) { // Reject self connections
+ QPID_LOG(debug, logPrefix << "Rejected self connection "+connection.getMgmtId());
+ connection.abort();
+ return;
}
ObserverPtr o(getObserver());
if (o) o->opened(connection);
@@ -77,8 +79,8 @@ void ConnectionObserver::opened(broker::
}
void ConnectionObserver::closed(broker::Connection& connection) {
+ if (isSelf(connection)) return; // Ignore closing of self connections.
try {
- BrokerInfo info;
ObserverPtr o(getObserver());
if (o) o->closed(connection);
}
Modified: qpid/branches/asyncstore/cpp/src/qpid/ha/ConnectionObserver.h
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/ha/ConnectionObserver.h?rev=1377715&r1=1377714&r2=1377715&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/ha/ConnectionObserver.h (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/ha/ConnectionObserver.h Mon Aug 27 15:40:33 2012
@@ -51,7 +51,7 @@ class ConnectionObserver : public broker
static const std::string ADMIN_TAG;
static const std::string BACKUP_TAG;
- static bool getBrokerInfo(broker::Connection& connection, BrokerInfo& info);
+ static bool getBrokerInfo(const broker::Connection& connection, BrokerInfo& info);
ConnectionObserver(HaBroker& haBroker, const types::Uuid& self);
@@ -62,6 +62,8 @@ class ConnectionObserver : public broker
void closed(broker::Connection& connection);
private:
+ bool isSelf(const broker::Connection&);
+
sys::Mutex lock;
HaBroker& haBroker;
std::string logPrefix;
Modified: qpid/branches/asyncstore/cpp/src/qpid/ha/HaBroker.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/ha/HaBroker.cpp?rev=1377715&r1=1377714&r2=1377715&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/ha/HaBroker.cpp (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/ha/HaBroker.cpp Mon Aug 27 15:40:33 2012
@@ -83,7 +83,11 @@ void HaBroker::initialize() {
// FIXME aconway 2012-07-19: assumes there's a TCP transport with a meaningful port.
brokerInfo = BrokerInfo(
- broker.getSystem()->getNodeName(), broker.getPort(broker::Broker::TCP_TRANSPORT), systemId);
+ broker.getSystem()->getNodeName(),
+ broker.getPort(broker::Broker::TCP_TRANSPORT),
+ systemId);
+
+ QPID_LOG(notice, logPrefix << "Initializing: " << brokerInfo);
// Set up the management object.
ManagementAgent* ma = broker.getManagementAgent();
@@ -111,8 +115,6 @@ void HaBroker::initialize() {
if (!settings.brokerUrl.empty()) setBrokerUrl(Url(settings.brokerUrl));
- QPID_LOG(notice, logPrefix << "Initializing: " << brokerInfo);
-
// NOTE: lock is not needed in a constructor, but create one
// to pass to functions that have a ScopedLock parameter.
Mutex::ScopedLock l(lock);
@@ -226,6 +228,7 @@ void HaBroker::setBrokerUrl(const Url& u
if (url.empty()) throw Url::Invalid("HA broker URL is empty");
brokerUrl = url;
mgmtObject->set_brokersUrl(brokerUrl.str());
+ QPID_LOG(info, logPrefix << "Broker URL set to: " << url);
if (backup.get()) backup->setBrokerUrl(brokerUrl);
// Updating broker URL also updates defaulted client URL:
if (clientUrl.empty()) updateClientUrl(l);
@@ -292,6 +295,7 @@ void HaBroker::statusChanged(Mutex::Scop
}
void HaBroker::membershipUpdated(Mutex::ScopedLock&) {
+ QPID_LOG(info, logPrefix << "Membership changed: " << membership);
Variant::List brokers = membership.asList();
mgmtObject->set_members(brokers);
broker.getManagementAgent()->raiseEvent(_qmf::EventMembersUpdate(brokers));
@@ -321,14 +325,14 @@ void HaBroker::resetMembership(const Bro
void HaBroker::addBroker(const BrokerInfo& b) {
Mutex::ScopedLock l(lock);
membership.add(b);
- QPID_LOG(debug, logPrefix << "Membership add: " << b << " now: " << membership);
+ QPID_LOG(debug, logPrefix << "Membership add: " << b);
membershipUpdated(l);
}
void HaBroker::removeBroker(const Uuid& id) {
Mutex::ScopedLock l(lock);
membership.remove(id);
- QPID_LOG(debug, logPrefix << "Membership remove: " << id << " now: " << membership);
+ QPID_LOG(debug, logPrefix << "Membership remove: " << id);
membershipUpdated(l);
}
Modified: qpid/branches/asyncstore/cpp/src/qpid/ha/HaBroker.h
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/ha/HaBroker.h?rev=1377715&r1=1377714&r2=1377715&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/ha/HaBroker.h (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/ha/HaBroker.h Mon Aug 27 15:40:33 2012
@@ -97,6 +97,8 @@ class HaBroker : public management::Mana
void addBroker(const BrokerInfo& b); // Add a broker to the membership.
void removeBroker(const types::Uuid& id); // Remove a broker from membership.
+ types::Uuid getSystemId() const { return systemId; }
+
private:
void setClientUrl(const Url&);
void setBrokerUrl(const Url&);
Modified: qpid/branches/asyncstore/cpp/src/qpid/ha/Membership.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/ha/Membership.cpp?rev=1377715&r1=1377714&r2=1377715&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/ha/Membership.cpp (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/ha/Membership.cpp Mon Aug 27 15:40:33 2012
@@ -66,7 +66,7 @@ types::Variant::List Membership::asList(
BrokerInfo::Set Membership::otherBackups() const {
BrokerInfo::Set result;
for (BrokerInfo::Map::const_iterator i = brokers.begin(); i != brokers.end(); ++i)
- if (isBackup(i->second.getStatus()) && i->second.getSystemId() != self)
+ if (i->second.getStatus() == READY && i->second.getSystemId() != self)
result.insert(i->second);
return result;
}
Modified: qpid/branches/asyncstore/cpp/src/qpid/ha/Membership.h
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/ha/Membership.h?rev=1377715&r1=1377714&r2=1377715&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/ha/Membership.h (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/ha/Membership.h Mon Aug 27 15:40:33 2012
@@ -47,7 +47,7 @@ class Membership
void add(const BrokerInfo& b);
void remove(const types::Uuid& id);
bool contains(const types::Uuid& id);
- /** Return IDs of all backups other than self */
+ /** Return IDs of all READY backups other than self */
BrokerInfo::Set otherBackups() const;
void assign(const types::Variant::List&);
Modified: qpid/branches/asyncstore/cpp/src/qpid/ha/Primary.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/ha/Primary.cpp?rev=1377715&r1=1377714&r2=1377715&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/ha/Primary.cpp (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/ha/Primary.cpp Mon Aug 27 15:40:33 2012
@@ -180,7 +180,7 @@ void Primary::readyReplica(const Replica
void Primary::queueCreate(const QueuePtr& q) {
// Throw if there is an invalid replication level in the queue settings.
- haBroker.getReplicationTest().replicateLevel(q->getSettings());
+ haBroker.getReplicationTest().replicateLevel(q->getSettings().storeSettings);
Mutex::ScopedLock l(lock);
for (BackupMap::iterator i = backups.begin(); i != backups.end(); ++i) {
i->second->queueCreate(q);
@@ -201,6 +201,7 @@ void Primary::opened(broker::Connection&
Mutex::ScopedLock l(lock);
BackupMap::iterator i = backups.find(info.getSystemId());
if (i == backups.end()) {
+ QPID_LOG(debug, logPrefix << "New backup connected: " << info);
boost::shared_ptr<RemoteBackup> backup(
new RemoteBackup(info, haBroker.getReplicationTest(), true));
{
@@ -209,7 +210,6 @@ void Primary::opened(broker::Connection&
backup->setInitialQueues(haBroker.getBroker().getQueues(), false);
}
backups[info.getSystemId()] = backup;
- QPID_LOG(debug, logPrefix << "New backup connected: " << info);
}
else {
QPID_LOG(debug, logPrefix << "Known backup connected: " << info);
@@ -225,6 +225,12 @@ void Primary::opened(broker::Connection&
}
void Primary::closed(broker::Connection& connection) {
+ // NOTE: It is possible for a backup connection to be rejected while we are
+ // a backup, but closed() is called after we have become primary.
+ //
+ // For this reason we do not remove from the backups map here, the backups
+ // map holds all the backups we know about whether connected or not.
+ //
Mutex::ScopedLock l(lock);
BrokerInfo info;
if (ha::ConnectionObserver::getBrokerInfo(connection, info)) {
@@ -233,12 +239,6 @@ void Primary::closed(broker::Connection&
BackupMap::iterator i = backups.find(info.getSystemId());
if (i != backups.end()) i->second->setConnected(false);
}
- // NOTE: we do not remove from the backups map here, the backups map holds
- // all the backups we know about whether connected or not.
- //
- // It is possible for a backup connection to be rejected while we are a backup,
- // but the closed is seen after we have become primary. Removing the entry
- // from backups in this case would be incorrect.
}
Modified: qpid/branches/asyncstore/cpp/src/qpid/ha/QueueGuard.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/ha/QueueGuard.cpp?rev=1377715&r1=1377714&r2=1377715&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/ha/QueueGuard.cpp (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/ha/QueueGuard.cpp Mon Aug 27 15:40:33 2012
@@ -39,10 +39,10 @@ class QueueGuard::QueueObserver : public
{
public:
QueueObserver(QueueGuard& g) : guard(g) {}
- void enqueued(const broker::QueuedMessage& qm) { guard.enqueued(qm); }
- void dequeued(const broker::QueuedMessage& qm) { guard.dequeued(qm); }
- void acquired(const broker::QueuedMessage&) {}
- void requeued(const broker::QueuedMessage&) {}
+ void enqueued(const broker::Message& m) { guard.enqueued(m); }
+ void dequeued(const broker::Message& m) { guard.dequeued(m); }
+ void acquired(const broker::Message&) {}
+ void requeued(const broker::Message&) {}
private:
QueueGuard& guard;
};
@@ -64,39 +64,47 @@ QueueGuard::QueueGuard(broker::Queue& q,
QueueGuard::~QueueGuard() { cancel(); }
// NOTE: Called with message lock held.
-void QueueGuard::enqueued(const QueuedMessage& qm) {
- assert(qm.queue == &queue);
+void QueueGuard::enqueued(const Message& m) {
// Delay completion
- QPID_LOG(trace, logPrefix << "Delayed completion of " << qm);
- qm.payload->getIngressCompletion().startCompleter();
+ QPID_LOG(trace, logPrefix << "Delayed completion of " << m);
+ m.getIngressCompletion()->startCompleter();
{
Mutex::ScopedLock l(lock);
- assert(!delayed.contains(qm.position));
- delayed += qm.position;
+ if (!delayed.insert(Delayed::value_type(m.getSequence(), m.getIngressCompletion())).second) {
+ QPID_LOG(critical, logPrefix << "Second enqueue for message with sequence " << m.getSequence());
+ assert(false);
+ }
}
}
// NOTE: Called with message lock held.
-void QueueGuard::dequeued(const QueuedMessage& qm) {
- assert(qm.queue == &queue);
- QPID_LOG(trace, logPrefix << "Dequeued " << qm);
+void QueueGuard::dequeued(const Message& m) {
+ QPID_LOG(trace, logPrefix << "Dequeued " << m);
ReplicatingSubscription* rs=0;
{
Mutex::ScopedLock l(lock);
rs = subscription;
}
- if (rs) rs->dequeued(qm);
- complete(qm);
+ if (rs) rs->dequeued(m);
+ complete(m.getSequence());
+}
+
+void QueueGuard::completeRange(Delayed::iterator begin, Delayed::iterator end) {
+ for (Delayed::iterator i = begin; i != end; ++i) {
+ QPID_LOG(trace, logPrefix << "Completed " << i->first);
+ i->second->finishCompleter();
+ }
}
void QueueGuard::cancel() {
queue.removeObserver(observer);
+ Delayed removed;
{
Mutex::ScopedLock l(lock);
if (delayed.empty()) return; // No need if no delayed messages.
+ delayed.swap(removed);
}
- // FIXME aconway 2012-06-15: optimize, only messages in delayed set.
- queue.eachMessage(boost::bind(&QueueGuard::complete, this, _1));
+ completeRange(removed.begin(), removed.end());
}
void QueueGuard::attach(ReplicatingSubscription& rs) {
@@ -104,36 +112,39 @@ void QueueGuard::attach(ReplicatingSubsc
subscription = &rs;
}
-namespace {
-void completeBefore(QueueGuard* guard, SequenceNumber position, const QueuedMessage& qm) {
- if (qm.position <= position) guard->complete(qm);
-}
-}
-
bool QueueGuard::subscriptionStart(SequenceNumber position) {
- // Complete any messages before or at the ReplicatingSubscription start position.
- // Those messages are already on the backup.
- if (!delayed.empty() && delayed.front() <= position) {
- // FIXME aconway 2012-06-15: queue iteration, only messages in delayed
- queue.eachMessage(boost::bind(&completeBefore, this, position, _1));
+ Delayed removed;
+ {
+ Mutex::ScopedLock l(lock);
+ // Complete any messages before or at the ReplicatingSubscription start position.
+ // Those messages are already on the backup.
+ for (Delayed::iterator i = delayed.begin(); i != delayed.end() && i->first <= position;) {
+ removed.insert(*i);
+ delayed.erase(i++);
+ }
}
+ completeRange(removed.begin(), removed.end());
return position >= range.back;
}
-void QueueGuard::complete(const QueuedMessage& qm) {
- assert(qm.queue == &queue);
+void QueueGuard::complete(SequenceNumber sequence) {
+ boost::intrusive_ptr<broker::AsyncCompletion> m;
{
Mutex::ScopedLock l(lock);
// The same message can be completed twice, by
// ReplicatingSubscription::acknowledged and dequeued. Remove it
- // from the set so we only call finishCompleter() once
- if (delayed.contains(qm.position))
- delayed -= qm.position;
- else
- return;
+ // from the map so we only call finishCompleter() once
+ Delayed::iterator i = delayed.find(sequence);
+ if (i != delayed.end()) {
+ m = i->second;
+ delayed.erase(i);
+ }
+
+ }
+ if (m) {
+ QPID_LOG(trace, logPrefix << "Completed " << sequence);
+ m->finishCompleter();
}
- QPID_LOG(trace, logPrefix << "Completed " << qm);
- qm.payload->getIngressCompletion().finishCompleter();
}
}} // namespaces qpid::ha
Modified: qpid/branches/asyncstore/cpp/src/qpid/ha/QueueGuard.h
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/ha/QueueGuard.h?rev=1377715&r1=1377714&r2=1377715&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/ha/QueueGuard.h (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/ha/QueueGuard.h Mon Aug 27 15:40:33 2012
@@ -63,15 +63,15 @@ class QueueGuard {
/** QueueObserver override. Delay completion of the message.
* NOTE: Called under the queues message lock.
*/
- void enqueued(const broker::QueuedMessage&);
+ void enqueued(const broker::Message&);
/** QueueObserver override: Complete a delayed message.
* NOTE: Called under the queues message lock.
*/
- void dequeued(const broker::QueuedMessage&);
+ void dequeued(const broker::Message&);
/** Complete a delayed message. */
- void complete(const broker::QueuedMessage&);
+ void complete(framing::SequenceNumber);
/** Complete all delayed messages. */
void cancel();
@@ -108,10 +108,13 @@ class QueueGuard {
sys::Mutex lock;
std::string logPrefix;
broker::Queue& queue;
- framing::SequenceSet delayed;
+ typedef std::map<framing::SequenceNumber, boost::intrusive_ptr<broker::AsyncCompletion> > Delayed;
+ Delayed delayed;
ReplicatingSubscription* subscription;
boost::shared_ptr<QueueObserver> observer;
QueueRange range;
+
+ void completeRange(Delayed::iterator begin, Delayed::iterator end);
};
}} // namespace qpid::ha
Modified: qpid/branches/asyncstore/cpp/src/qpid/ha/QueueReplicator.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/ha/QueueReplicator.cpp?rev=1377715&r1=1377714&r2=1377715&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/ha/QueueReplicator.cpp (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/ha/QueueReplicator.cpp Mon Aug 27 15:40:33 2012
@@ -120,8 +120,10 @@ void QueueReplicator::initializeBridge(B
settings.setTable(ReplicatingSubscription::QPID_BROKER_INFO,
brokerInfo.asFieldTable());
SequenceNumber front;
- if (ReplicatingSubscription::getFront(*queue, front))
+ if (ReplicatingSubscription::getFront(*queue, front)) {
settings.setInt(ReplicatingSubscription::QPID_FRONT, front);
+ QPID_LOG(debug, "QPID_FRONT for " << queue->getName() << " is " << front);
+ }
peer.getMessage().subscribe(
args.i_src, args.i_dest, 0/*accept-explicit*/, 1/*not-acquired*/,
false/*exclusive*/, "", 0, settings);
@@ -137,8 +139,7 @@ void QueueReplicator::initializeBridge(B
namespace {
template <class T> T decodeContent(Message& m) {
- std::string content;
- m.getFrames().getContent(content);
+ std::string content = m.getContent();
Buffer buffer(const_cast<char*>(content.c_str()), content.size());
T result;
result.decode(buffer);
@@ -148,9 +149,7 @@ template <class T> T decodeContent(Messa
void QueueReplicator::dequeue(SequenceNumber n, sys::Mutex::ScopedLock&) {
// Thread safe: only calls thread safe Queue functions.
- QueuedMessage message;
- if (queue->acquireMessageAt(n, message))
- queue->dequeue(0, message);
+ queue->dequeueMessageAt(n);
}
// Called in connection thread of the queues bridge to primary.
Modified: qpid/branches/asyncstore/cpp/src/qpid/ha/RemoteBackup.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/ha/RemoteBackup.cpp?rev=1377715&r1=1377714&r2=1377715&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/ha/RemoteBackup.cpp (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/ha/RemoteBackup.cpp Mon Aug 27 15:40:33 2012
@@ -23,6 +23,7 @@
#include "qpid/broker/Broker.h"
#include "qpid/broker/Queue.h"
#include "qpid/broker/QueueRegistry.h"
+#include "qpid/log/Statement.h"
#include <boost/bind.hpp>
namespace qpid {
@@ -32,7 +33,7 @@ using sys::Mutex;
using boost::bind;
RemoteBackup::RemoteBackup(const BrokerInfo& info, ReplicationTest rt, bool con) :
- logPrefix("Primary remote backup "+info.getLogId()+": "),
+ logPrefix("Primary: Remote backup "+info.getLogId()+": "),
brokerInfo(info), replicationTest(rt), connected(con), reportedReady(false)
{}
Modified: qpid/branches/asyncstore/cpp/src/qpid/ha/ReplicatingSubscription.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/ha/ReplicatingSubscription.cpp?rev=1377715&r1=1377714&r2=1377715&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/ha/ReplicatingSubscription.cpp (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/ha/ReplicatingSubscription.cpp Mon Aug 27 15:40:33 2012
@@ -27,6 +27,7 @@
#include "qpid/broker/Queue.h"
#include "qpid/broker/SessionContext.h"
#include "qpid/broker/ConnectionState.h"
+#include "qpid/broker/amqp_0_10/MessageTransfer.h"
#include "qpid/framing/AMQFrame.h"
#include "qpid/framing/MessageTransferBody.h"
#include "qpid/log/Statement.h"
@@ -66,10 +67,10 @@ class DequeueScanner
at = front - 1;
}
- void operator()(const QueuedMessage& qm) {
- if (qm.position >= front && qm.position <= back) {
- if (qm.position > at+1) subscription.dequeued(at+1, qm.position-1);
- at = qm.position;
+ void operator()(const Message& m) {
+ if (m.getSequence() >= front && m.getSequence() <= back) {
+ if (m.getSequence() > at+1) subscription.dequeued(at+1, m.getSequence()-1);
+ at = m.getSequence();
}
}
@@ -90,37 +91,23 @@ string mask(const string& in)
return DOLLAR + in + INTERNAL;
}
-
-/** Dummy consumer used to get the front position on the queue */
-class GetPositionConsumer : public Consumer
+namespace {
+bool getSequence(const Message& message, SequenceNumber& result)
{
- public:
- GetPositionConsumer() :
- Consumer("ha.GetPositionConsumer."+types::Uuid(true).str(), false) {}
- bool deliver(broker::QueuedMessage& ) { return true; }
- void notify() {}
- bool filter(boost::intrusive_ptr<broker::Message>) { return true; }
- bool accept(boost::intrusive_ptr<broker::Message>) { return true; }
- void cancel() {}
- void acknowledged(const broker::QueuedMessage&) {}
- bool browseAcquired() const { return true; }
- broker::OwnershipToken* getSession() { return 0; }
-};
-
-
+ result = message.getSequence();
+ return true;
+}
+}
bool ReplicatingSubscription::getNext(
broker::Queue& q, SequenceNumber from, SequenceNumber& result)
{
- boost::shared_ptr<Consumer> c(new GetPositionConsumer);
- c->setPosition(from);
- if (!q.dispatch(c)) return false;
- result = c->getPosition();
- return true;
+ QueueCursor cursor(REPLICATOR);
+ return q.seek(cursor, boost::bind(&getSequence, _1, boost::ref(result)), from);
}
bool ReplicatingSubscription::getFront(broker::Queue& q, SequenceNumber& front) {
- // FIXME aconway 2012-05-23: won't wrap, assumes 0 is < all messages in queue.
- return getNext(q, 0, front);
+ QueueCursor cursor(REPLICATOR);
+ return q.seek(cursor, boost::bind(&getSequence, _1, boost::ref(front)));
}
/* Called by SemanticState::consume to create a consumer */
@@ -152,15 +139,14 @@ ReplicatingSubscription::ReplicatingSubs
const string& name,
Queue::shared_ptr queue,
bool ack,
- bool acquire,
+ bool /*acquire*/,
bool exclusive,
const string& tag,
const string& resumeId,
uint64_t resumeTtl,
const framing::FieldTable& arguments
-) : ConsumerImpl(parent, name, queue, ack, acquire, exclusive, tag,
+) : ConsumerImpl(parent, name, queue, ack, REPLICATOR, exclusive, tag,
resumeId, resumeTtl, arguments),
- dummy(new Queue(mask(name))),
ready(false)
{
try {
@@ -213,6 +199,8 @@ ReplicatingSubscription::ReplicatingSubs
queue->eachMessage(boost::ref(scan)); // Remove missing messages in between.
scan.finish();
position = backup.back;
+ //move cursor to position
+ queue->seek(*this, position);
}
// NOTE: we are assuming that the messages that are on the backup are
// consistent with those on the primary. If the backup is a replica
@@ -260,32 +248,31 @@ void ReplicatingSubscription::initialize
}
// Message is delivered in the subscription's connection thread.
-bool ReplicatingSubscription::deliver(QueuedMessage& qm) {
+bool ReplicatingSubscription::deliver(const qpid::broker::QueueCursor& c, const qpid::broker::Message& m) {
+ position = m.getSequence();
try {
- // Add position events for the subscribed queue, not the internal event queue.
- if (qm.queue == getQueue().get()) {
- QPID_LOG(trace, logPrefix << "Replicating " << qm);
- {
- Mutex::ScopedLock l(lock);
- assert(position == qm.position);
- // qm.position is the position of the newly enqueued qm on local queue.
- // backupPosition is latest position on backup queue before enqueueing
- if (qm.position <= backupPosition)
- throw Exception(
- QPID_MSG("Expected position > " << backupPosition
- << " but got " << qm.position));
- if (qm.position - backupPosition > 1) {
- // Position has advanced because of messages dequeued ahead of us.
- // Send the position before qm was enqueued.
- sendPositionEvent(qm.position-1, l);
- }
- // Backup will automatically advance by 1 on delivery of message.
- backupPosition = qm.position;
+ QPID_LOG(trace, logPrefix << "Replicating " << getQueue()->getName() << "[" << m.getSequence() << "]");
+ {
+ Mutex::ScopedLock l(lock);
+ //FIXME GRS: position is no longer set//assert(position == m.getSequence());
+
+ // m.getSequence() is the position of the newly enqueued message on local queue.
+ // backupPosition is latest position on backup queue before enqueueing
+ if (m.getSequence() <= backupPosition)
+ throw Exception(
+ QPID_MSG("Expected position > " << backupPosition
+ << " but got " << m.getSequence()));
+ if (m.getSequence() - backupPosition > 1) {
+ // Position has advanced because of messages dequeued ahead of us.
+ // Send the position before message was enqueued.
+ sendPositionEvent(m.getSequence()-1, l);
}
+ // Backup will automatically advance by 1 on delivery of message.
+ backupPosition = m.getSequence();
}
- return ConsumerImpl::deliver(qm);
+ return ConsumerImpl::deliver(c, m);
} catch (const std::exception& e) {
- QPID_LOG(critical, logPrefix << "Error replicating " << qm
+ QPID_LOG(critical, logPrefix << "Error replicating " << getQueue()->getName() << "[" << m.getSequence() << "]"
<< ": " << e.what());
throw;
}
@@ -310,15 +297,13 @@ void ReplicatingSubscription::cancel()
}
// Consumer override, called on primary in the backup's IO thread.
-void ReplicatingSubscription::acknowledged(const QueuedMessage& qm) {
- if (qm.queue == getQueue().get()) { // Don't complete messages on the internal queue
- // Finish completion of message, it has been acknowledged by the backup.
- QPID_LOG(trace, logPrefix << "Acknowledged " << qm);
- guard->complete(qm);
- // If next message is protected by the guard then we are ready
- if (qm.position >= guard->getRange().back) setReady();
- }
- ConsumerImpl::acknowledged(qm);
+void ReplicatingSubscription::acknowledged(const broker::DeliveryRecord& r) {
+ // Finish completion of message, it has been acknowledged by the backup.
+ QPID_LOG(trace, logPrefix << "Acknowledged " << getQueue()->getName() << "[" << r.getMessageId() << "]");
+ guard->complete(r.getMessageId());
+ // If next message is protected by the guard then we are ready
+ if (r.getMessageId() >= guard->getRange().back) setReady();
+ ConsumerImpl::acknowledged(r);
}
// Called with lock held. Called in subscription's connection thread.
@@ -341,13 +326,12 @@ void ReplicatingSubscription::sendDequeu
// Called after the message has been removed
// from the deque and under the messageLock in the queue. Called in
// arbitrary connection threads.
-void ReplicatingSubscription::dequeued(const QueuedMessage& qm)
+void ReplicatingSubscription::dequeued(const Message& m)
{
- assert (qm.queue == getQueue().get());
- QPID_LOG(trace, logPrefix << "Dequeued " << qm);
+ QPID_LOG(trace, logPrefix << "Dequeued " << getQueue()->getName() << "[" << m.getSequence() << "]");
{
Mutex::ScopedLock l(lock);
- dequeues.add(qm.position);
+ dequeues.add(m.getSequence());
}
notify(); // Ensure a call to doDispatch
}
@@ -379,7 +363,7 @@ void ReplicatingSubscription::sendPositi
void ReplicatingSubscription::sendEvent(const std::string& key, framing::Buffer& buffer)
{
//generate event message
- boost::intrusive_ptr<Message> event = new Message();
+ boost::intrusive_ptr<qpid::broker::amqp_0_10::MessageTransfer> event(new qpid::broker::amqp_0_10::MessageTransfer());
AMQFrame method((MessageTransferBody(ProtocolVersion(), string(), 0, 0)));
AMQFrame header((AMQHeaderBody()));
AMQFrame content((AMQContentBody()));
@@ -400,10 +384,8 @@ void ReplicatingSubscription::sendEvent(
event->getFrames().getHeaders()->get<DeliveryProperties>(true);
props->setRoutingKey(key);
// Send the event directly to the base consumer implementation.
- // We don't really need a queue here but we pass a dummy queue
- // to conform to the consumer API.
- QueuedMessage qm(dummy.get(), event);
- ConsumerImpl::deliver(qm);
+ //dummy consumer prevents acknowledgements being handled, which is what we want for events
+ ConsumerImpl::deliver(QueueCursor(), Message(event, 0), boost::shared_ptr<Consumer>());
}
Modified: qpid/branches/asyncstore/cpp/src/qpid/ha/ReplicatingSubscription.h
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/ha/ReplicatingSubscription.h?rev=1377715&r1=1377714&r2=1377715&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/ha/ReplicatingSubscription.h (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/ha/ReplicatingSubscription.h Mon Aug 27 15:40:33 2012
@@ -101,15 +101,15 @@ class ReplicatingSubscription : public b
// Called via QueueGuard::dequeued.
//@return true if the message requires completion.
- void dequeued(const broker::QueuedMessage& qm);
+ void dequeued(const broker::Message&);
// Called during initial scan for dequeues.
void dequeued(framing::SequenceNumber first, framing::SequenceNumber last);
// Consumer overrides.
- bool deliver(broker::QueuedMessage& msg);
+ bool deliver(const broker::QueueCursor& cursor, const broker::Message& msg);
void cancel();
- void acknowledged(const broker::QueuedMessage&);
+ void acknowledged(const broker::DeliveryRecord&);
bool browseAcquired() const { return true; }
// Hide the "queue deleted" error for a ReplicatingSubscription when a
// queue is deleted, this is normal and not an error.
@@ -127,8 +127,8 @@ class ReplicatingSubscription : public b
private:
std::string logPrefix;
- boost::shared_ptr<broker::Queue> dummy; // Used to send event messages
framing::SequenceSet dequeues;
+ framing::SequenceNumber position;
framing::SequenceNumber backupPosition;
bool ready;
BrokerInfo info;
Modified: qpid/branches/asyncstore/cpp/src/qpid/ha/ReplicationTest.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/ha/ReplicationTest.cpp?rev=1377715&r1=1377714&r2=1377715&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/ha/ReplicationTest.cpp (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/ha/ReplicationTest.cpp Mon Aug 27 15:40:33 2012
@@ -68,7 +68,7 @@ bool ReplicationTest::isReplicated(
bool ReplicationTest::isReplicated(ReplicateLevel level, const broker::Queue& q)
{
- return isReplicated(level, q.getSettings(), q.isAutoDelete(), q.hasExclusiveOwner());
+ return isReplicated(level, q.getSettings().storeSettings, q.isAutoDelete(), q.hasExclusiveOwner());
}
Modified: qpid/branches/asyncstore/cpp/src/qpid/management/ManagementAgent.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/qpid/management/ManagementAgent.cpp?rev=1377715&r1=1377714&r2=1377715&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/qpid/management/ManagementAgent.cpp (original)
+++ qpid/branches/asyncstore/cpp/src/qpid/management/ManagementAgent.cpp Mon Aug 27 15:40:33 2012
@@ -31,6 +31,7 @@
#include <qpid/broker/Message.h>
#include "qpid/framing/MessageTransferBody.h"
#include "qpid/framing/FieldValue.h"
+#include "qpid/broker/amqp_0_10/MessageTransfer.h"
#include "qpid/sys/Time.h"
#include "qpid/sys/Thread.h"
#include "qpid/broker/ConnectionState.h"
@@ -535,7 +536,7 @@ void ManagementAgent::sendBufferLH(Buffe
}
if (exchange.get() == 0) return;
- intrusive_ptr<Message> msg(new Message());
+ intrusive_ptr<qpid::broker::amqp_0_10::MessageTransfer> transfer(new qpid::broker::amqp_0_10::MessageTransfer());
AMQFrame method((MessageTransferBody(ProtocolVersion(), exchange->getName (), 0, 0)));
AMQFrame header((AMQHeaderBody()));
AMQFrame content((AMQContentBody()));
@@ -547,24 +548,26 @@ void ManagementAgent::sendBufferLH(Buffe
header.setEof(false);
content.setBof(false);
- msg->getFrames().append(method);
- msg->getFrames().append(header);
+ transfer->getFrames().append(method);
+ transfer->getFrames().append(header);
MessageProperties* props =
- msg->getFrames().getHeaders()->get<MessageProperties>(true);
+ transfer->getFrames().getHeaders()->get<MessageProperties>(true);
props->setContentLength(length);
DeliveryProperties* dp =
- msg->getFrames().getHeaders()->get<DeliveryProperties>(true);
+ transfer->getFrames().getHeaders()->get<DeliveryProperties>(true);
dp->setRoutingKey(routingKey);
- msg->getFrames().append(content);
- msg->setIsManagementMessage(true);
+ transfer->getFrames().append(content);
+
+ Message msg(transfer, transfer);
+ msg.setIsManagementMessage(true);
{
sys::Mutex::ScopedUnlock u(userLock);
- DeliverableMessage deliverable (msg);
+ DeliverableMessage deliverable (msg, 0);
try {
exchange->route(deliverable);
} catch(exception&) {}
@@ -602,7 +605,7 @@ void ManagementAgent::sendBufferLH(const
}
if (exchange.get() == 0) return;
- intrusive_ptr<Message> msg(new Message());
+ intrusive_ptr<qpid::broker::amqp_0_10::MessageTransfer> transfer(new qpid::broker::amqp_0_10::MessageTransfer);
AMQFrame method((MessageTransferBody(ProtocolVersion(), exchange->getName (), 0, 0)));
AMQFrame header((AMQHeaderBody()));
AMQFrame content((AMQContentBody(data)));
@@ -612,11 +615,11 @@ void ManagementAgent::sendBufferLH(const
header.setEof(false);
content.setBof(false);
- msg->getFrames().append(method);
- msg->getFrames().append(header);
+ transfer->getFrames().append(method);
+ transfer->getFrames().append(header);
MessageProperties* props =
- msg->getFrames().getHeaders()->get<MessageProperties>(true);
+ transfer->getFrames().getHeaders()->get<MessageProperties>(true);
props->setContentLength(data.length());
if (!cid.empty()) {
props->setCorrelationId(cid);
@@ -625,23 +628,25 @@ void ManagementAgent::sendBufferLH(const
props->setAppId("qmf2");
for (i = headers.begin(); i != headers.end(); ++i) {
- msg->insertCustomProperty(i->first, i->second.asString());
+ props->getApplicationHeaders().setString(i->first, i->second.asString());
}
DeliveryProperties* dp =
- msg->getFrames().getHeaders()->get<DeliveryProperties>(true);
+ transfer->getFrames().getHeaders()->get<DeliveryProperties>(true);
dp->setRoutingKey(routingKey);
if (ttl_msec) {
dp->setTtl(ttl_msec);
- msg->computeExpiration(broker->getExpiryPolicy());
}
- msg->getFrames().append(content);
- msg->setIsManagementMessage(true);
+ transfer->getFrames().append(content);
+ transfer->computeRequiredCredit();
+ Message msg(transfer, transfer);
+ msg.setIsManagementMessage(true);
+ msg.computeExpiration(broker->getExpiryPolicy());
{
sys::Mutex::ScopedUnlock u(userLock);
- DeliverableMessage deliverable (msg);
+ DeliverableMessage deliverable (msg, 0);
try {
exchange->route(deliverable);
} catch(exception&) {}
@@ -2135,19 +2140,20 @@ bool ManagementAgent::authorizeAgentMess
// authorized or not. In this case, return true (authorized) if there is no ACL in place,
// otherwise return false;
//
- if (msg.encodedSize() > MA_BUFFER_SIZE)
+ if (msg.getContentSize() > MA_BUFFER_SIZE)
return broker->getAcl() == 0;
- msg.encodeContent(inBuffer);
+ inBuffer.putRawData(msg.getContent());
uint32_t bufferLen = inBuffer.getPosition();
inBuffer.reset();
+ qpid::broker::amqp_0_10::MessageTransfer& transfer(qpid::broker::amqp_0_10::MessageTransfer::get(msg));
const framing::MessageProperties* p =
- msg.getFrames().getHeaders()->get<framing::MessageProperties>();
+ transfer.getFrames().getHeaders()->get<framing::MessageProperties>();
- const framing::FieldTable *headers = msg.getApplicationHeaders();
+ const framing::FieldTable *headers = p ? &p->getApplicationHeaders() : 0;
- if (headers && msg.getAppId() == "qmf2")
+ if (headers && p->getAppId() == "qmf2")
{
mapMsg = true;
@@ -2238,8 +2244,9 @@ bool ManagementAgent::authorizeAgentMess
// authorization failed, send reply if replyTo present
+ qpid::broker::amqp_0_10::MessageTransfer& transfer(qpid::broker::amqp_0_10::MessageTransfer::get(msg));
const framing::MessageProperties* p =
- msg.getFrames().getHeaders()->get<framing::MessageProperties>();
+ transfer.getFrames().getHeaders()->get<framing::MessageProperties>();
if (p && p->hasReplyTo()) {
const framing::ReplyTo& rt = p->getReplyTo();
string rte = rt.getExchange();
@@ -2277,8 +2284,9 @@ void ManagementAgent::dispatchAgentComma
{
string rte;
string rtk;
+ qpid::broker::amqp_0_10::MessageTransfer& transfer(qpid::broker::amqp_0_10::MessageTransfer::get(msg));
const framing::MessageProperties* p =
- msg.getFrames().getHeaders()->get<framing::MessageProperties>();
+ transfer.getFrames().getHeaders()->get<framing::MessageProperties>();
if (p && p->hasReplyTo()) {
const framing::ReplyTo& rt = p->getReplyTo();
rte = rt.getExchange();
@@ -2290,19 +2298,19 @@ void ManagementAgent::dispatchAgentComma
Buffer inBuffer(inputBuffer, MA_BUFFER_SIZE);
uint8_t opcode;
- if (msg.encodedSize() > MA_BUFFER_SIZE) {
+ if (msg.getContentSize() > MA_BUFFER_SIZE) {
QPID_LOG(debug, "ManagementAgent::dispatchAgentCommandLH: Message too large: " <<
- msg.encodedSize());
+ msg.getContentSize());
return;
}
- msg.encodeContent(inBuffer);
+ inBuffer.putRawData(msg.getContent());
uint32_t bufferLen = inBuffer.getPosition();
inBuffer.reset();
ScopedManagementContext context((const qpid::broker::ConnectionState*) msg.getPublisher());
- const framing::FieldTable *headers = msg.getApplicationHeaders();
- if (headers && msg.getAppId() == "qmf2")
+ const framing::FieldTable *headers = p ? &p->getApplicationHeaders() : 0;
+ if (headers && p->getAppId() == "qmf2")
{
string opcode = headers->getAsString("qmf.opcode");
string contentType = headers->getAsString("qmf.content");
Propchange: qpid/branches/asyncstore/cpp/src/qpid/management/ManagementAgent.cpp
------------------------------------------------------------------------------
Merged /qpid/trunk/qpid/cpp/src/qpid/management/ManagementAgent.cpp:r1368652-1375508
Propchange: qpid/branches/asyncstore/cpp/src/qpid/management/ManagementAgent.h
------------------------------------------------------------------------------
Merged /qpid/trunk/qpid/cpp/src/qpid/management/ManagementAgent.h:r1368652-1375508
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org