You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by sh...@apache.org on 2009/03/11 00:11:10 UTC
svn commit: r752300 [3/12] - in /qpid/branches/qpid-1673/qpid: cpp/
cpp/examples/ cpp/examples/direct/ cpp/examples/failover/
cpp/examples/fanout/ cpp/examples/pub-sub/ cpp/examples/qmf-console/
cpp/examples/request-response/ cpp/examples/tradedemo/ cp...
Modified: qpid/branches/qpid-1673/qpid/cpp/src/qpid/cluster/Connection.h
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-1673/qpid/cpp/src/qpid/cluster/Connection.h?rev=752300&r1=752299&r2=752300&view=diff
==============================================================================
--- qpid/branches/qpid-1673/qpid/cpp/src/qpid/cluster/Connection.h (original)
+++ qpid/branches/qpid-1673/qpid/cpp/src/qpid/cluster/Connection.h Tue Mar 10 23:10:57 2009
@@ -27,14 +27,15 @@
#include "OutputInterceptor.h"
#include "NoOpConnectionOutputHandler.h"
#include "EventFrame.h"
+#include "McastFrameHandler.h"
#include "qpid/broker/Connection.h"
#include "qpid/amqp_0_10/Connection.h"
#include "qpid/sys/AtomicValue.h"
#include "qpid/sys/ConnectionInputHandler.h"
#include "qpid/sys/ConnectionOutputHandler.h"
-#include "qpid/framing/FrameDecoder.h"
#include "qpid/framing/SequenceNumber.h"
+#include "qpid/framing/FrameDecoder.h"
#include <iosfwd>
@@ -63,10 +64,10 @@
public:
typedef sys::PollableQueue<EventFrame> PollableFrameQueue;
- /** Local connection, use this in ConnectionId */
- Connection(Cluster&, sys::ConnectionOutputHandler& out, const std::string& id, MemberId, bool catchUp, bool isLink);
- /** Shadow connection */
- Connection(Cluster&, sys::ConnectionOutputHandler& out, const std::string& id, ConnectionId);
+ /** Local connection. */
+ Connection(Cluster&, sys::ConnectionOutputHandler& out, const std::string& logId, MemberId, bool catchUp, bool isLink);
+ /** Shadow connection. */
+ Connection(Cluster&, sys::ConnectionOutputHandler& out, const std::string& logId, const ConnectionId& id);
~Connection();
ConnectionId getId() const { return self; }
@@ -99,7 +100,7 @@
/** Called if the connectors member has left the cluster */
void left();
- // ConnectionCodec methods
+ // ConnectionCodec methods - called by IO layer with a read buffer.
size_t decode(const char* buffer, size_t size);
// Called for data delivered from the cluster.
@@ -117,9 +118,9 @@
const framing::SequenceNumber& received,
const framing::SequenceSet& unknownCompleted, const SequenceSet& receivedIncomplete);
- void shadowReady(uint64_t memberId, uint64_t connectionId, const std::string& username);
+ void shadowReady(uint64_t memberId, uint64_t connectionId, const std::string& username, const std::string& fragment);
- void membership(const framing::FieldTable&, const framing::FieldTable&, uint64_t frameId);
+ void membership(const framing::FieldTable&, const framing::FieldTable&);
void deliveryRecord(const std::string& queue,
const framing::SequenceNumber& position,
@@ -134,6 +135,7 @@
uint32_t credit);
void queuePosition(const std::string&, const framing::SequenceNumber&);
+ void expiryId(uint64_t);
void txStart();
void txAccept(const framing::SequenceSet&);
@@ -148,8 +150,12 @@
void exchange(const std::string& encoded);
void giveReadCredit(int credit);
-
+
private:
+ struct NullFrameHandler : public framing::FrameHandler {
+ void handle(framing::AMQFrame&) {}
+ };
+
void init();
bool checkUnsupported(const framing::AMQBody& body);
void deliverClose();
@@ -174,6 +180,8 @@
framing::ChannelId currentChannel;
boost::shared_ptr<broker::TxBuffer> txBuffer;
bool expectProtocolHeader;
+ McastFrameHandler mcastFrameHandler;
+ NullFrameHandler nullFrameHandler;
static qpid::sys::AtomicValue<uint64_t> catchUpId;
Modified: qpid/branches/qpid-1673/qpid/cpp/src/qpid/cluster/ConnectionCodec.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-1673/qpid/cpp/src/qpid/cluster/ConnectionCodec.cpp?rev=752300&r1=752299&r2=752300&view=diff
==============================================================================
--- qpid/branches/qpid-1673/qpid/cpp/src/qpid/cluster/ConnectionCodec.cpp (original)
+++ qpid/branches/qpid-1673/qpid/cpp/src/qpid/cluster/ConnectionCodec.cpp Tue Mar 10 23:10:57 2009
@@ -44,18 +44,15 @@
return 0;
}
-// Used for outgoing Link connections, we don't care.
+// Used for outgoing Link connections
sys::ConnectionCodec*
-ConnectionCodec::Factory::create(sys::OutputControl& out, const std::string& id) {
- return new ConnectionCodec(out, id, cluster, false, true);
- //return next->create(out, id);
+ConnectionCodec::Factory::create(sys::OutputControl& out, const std::string& logId) {
+ return new ConnectionCodec(out, logId, cluster, false, true);
}
-ConnectionCodec::ConnectionCodec(sys::OutputControl& out, const std::string& id, Cluster& cluster, bool catchUp, bool isLink)
- : codec(out, id, isLink),
- interceptor(new Connection(cluster, codec, id, cluster.getId(), catchUp, isLink)),
- id(interceptor->getId()),
- localId(id)
+ConnectionCodec::ConnectionCodec(sys::OutputControl& out, const std::string& logId, Cluster& cluster, bool catchUp, bool isLink)
+ : codec(out, logId, isLink),
+ interceptor(new Connection(cluster, codec, logId, cluster.getId(), catchUp, isLink))
{
std::auto_ptr<sys::ConnectionInputHandler> ih(new ProxyInputHandler(interceptor));
codec.setInputHandler(ih);
Modified: qpid/branches/qpid-1673/qpid/cpp/src/qpid/cluster/ConnectionCodec.h
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-1673/qpid/cpp/src/qpid/cluster/ConnectionCodec.h?rev=752300&r1=752299&r2=752300&view=diff
==============================================================================
--- qpid/branches/qpid-1673/qpid/cpp/src/qpid/cluster/ConnectionCodec.h (original)
+++ qpid/branches/qpid-1673/qpid/cpp/src/qpid/cluster/ConnectionCodec.h Tue Mar 10 23:10:57 2009
@@ -56,7 +56,7 @@
sys::ConnectionCodec* create(sys::OutputControl&, const std::string& id);
};
- ConnectionCodec(sys::OutputControl& out, const std::string& id, Cluster& c, bool catchUp, bool isLink);
+ ConnectionCodec(sys::OutputControl& out, const std::string& logId, Cluster& c, bool catchUp, bool isLink);
~ConnectionCodec();
// ConnectionCodec functions.
@@ -71,8 +71,6 @@
private:
amqp_0_10::Connection codec;
boost::intrusive_ptr<cluster::Connection> interceptor;
- cluster::ConnectionId id;
- std::string localId;
};
}} // namespace qpid::cluster
Modified: qpid/branches/qpid-1673/qpid/cpp/src/qpid/cluster/Cpg.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-1673/qpid/cpp/src/qpid/cluster/Cpg.cpp?rev=752300&r1=752299&r2=752300&view=diff
==============================================================================
--- qpid/branches/qpid-1673/qpid/cpp/src/qpid/cluster/Cpg.cpp (original)
+++ qpid/branches/qpid-1673/qpid/cpp/src/qpid/cluster/Cpg.cpp Tue Mar 10 23:10:57 2009
@@ -107,17 +107,16 @@
check(cpg_leave(handle, &group), cantLeaveMsg(group));
}
-bool Cpg::isFlowControlEnabled() {
- cpg_flow_control_state_t flowState;
- check(cpg_flow_control_state_get(handle, &flowState), "Cannot get CPG flow control status.");
- return flowState == CPG_FLOW_CONTROL_ENABLED;
-}
+
+
bool Cpg::mcast(const iovec* iov, int iovLen) {
- if (isFlowControlEnabled()) {
- QPID_LOG(debug, "CPG flow control enabled")
+ // Check for flow control
+ cpg_flow_control_state_t flowState;
+ check(cpg_flow_control_state_get(handle, &flowState), "Cannot get CPG flow control status.");
+ if (flowState == CPG_FLOW_CONTROL_ENABLED)
return false;
- }
+
cpg_error_t result;
do {
result = cpg_mcast_joined(handle, CPG_TYPE_AGREED, const_cast<iovec*>(iov), iovLen);
Modified: qpid/branches/qpid-1673/qpid/cpp/src/qpid/cluster/Cpg.h
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-1673/qpid/cpp/src/qpid/cluster/Cpg.h?rev=752300&r1=752299&r2=752300&view=diff
==============================================================================
--- qpid/branches/qpid-1673/qpid/cpp/src/qpid/cluster/Cpg.h (original)
+++ qpid/branches/qpid-1673/qpid/cpp/src/qpid/cluster/Cpg.h Tue Mar 10 23:10:57 2009
@@ -114,8 +114,6 @@
int getFd();
- bool isFlowControlEnabled();
-
private:
static std::string errorStr(cpg_error_t err, const std::string& msg);
static std::string cantJoinMsg(const Name&);
Modified: qpid/branches/qpid-1673/qpid/cpp/src/qpid/cluster/Event.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-1673/qpid/cpp/src/qpid/cluster/Event.cpp?rev=752300&r1=752299&r2=752300&view=diff
==============================================================================
--- qpid/branches/qpid-1673/qpid/cpp/src/qpid/cluster/Event.cpp (original)
+++ qpid/branches/qpid-1673/qpid/cpp/src/qpid/cluster/Event.cpp Tue Mar 10 23:10:57 2009
@@ -23,6 +23,7 @@
#include "Cpg.h"
#include "qpid/framing/Buffer.h"
#include "qpid/framing/AMQFrame.h"
+#include "qpid/assert.h"
#include <ostream>
#include <iterator>
#include <algorithm>
@@ -31,6 +32,7 @@
namespace cluster {
using framing::Buffer;
+using framing::AMQFrame;
const size_t EventHeader::HEADER_SIZE =
sizeof(uint8_t) + // type
@@ -42,7 +44,7 @@
;
EventHeader::EventHeader(EventType t, const ConnectionId& c, size_t s)
- : type(t), connectionId(c), size(s), sequence(0) {}
+ : type(t), connectionId(c), size(s) {}
Event::Event() {}
@@ -57,7 +59,7 @@
type = (EventType)buf.getOctet();
if(type != DATA && type != CONTROL)
throw Exception("Invalid multicast event type");
- connectionId = ConnectionId(m, reinterpret_cast<Connection*>(buf.getLongLong()));
+ connectionId = ConnectionId(m, buf.getLongLong());
size = buf.getLong();
#ifdef QPID_LATENCY_METRIC
latency_metric_timestamp = buf.getLongLong();
@@ -74,14 +76,17 @@
return e;
}
-Event Event::control(const framing::AMQBody& body, const ConnectionId& cid) {
- framing::AMQFrame f(body);
+Event Event::control(const framing::AMQFrame& f, const ConnectionId& cid) {
Event e(CONTROL, cid, f.encodedSize());
Buffer buf(e);
f.encode(buf);
return e;
}
+Event Event::control(const framing::AMQBody& body, const ConnectionId& cid) {
+ return control(framing::AMQFrame(body), cid);
+}
+
iovec Event::toIovec() {
encodeHeader();
iovec iov = { const_cast<char*>(getStore()), getStoreSize() };
@@ -90,7 +95,7 @@
void EventHeader::encode(Buffer& b) const {
b.putOctet(type);
- b.putLongLong(reinterpret_cast<uint64_t>(connectionId.getPointer()));
+ b.putLongLong(connectionId.getNumber());
b.putLong(size);
#ifdef QPID_LATENCY_METRIC
b.putLongLong(latency_metric_timestamp);
@@ -108,12 +113,22 @@
return Buffer(const_cast<char*>(getData()), getSize());
}
+AMQFrame Event::getFrame() const {
+ assert(type == CONTROL);
+ Buffer buf(*this);
+ AMQFrame frame;
+ QPID_ASSERT(frame.decode(buf));
+ return frame;
+}
+
static const char* EVENT_TYPE_NAMES[] = { "data", "control" };
+std::ostream& operator << (std::ostream& o, EventType t) {
+ return o << EVENT_TYPE_NAMES[t];
+}
+
std::ostream& operator << (std::ostream& o, const EventHeader& e) {
- o << "[event " << e.getConnectionId() << "/" << e.getSequence()
- << " " << EVENT_TYPE_NAMES[e.getType()]
- << " " << e.getSize() << " bytes]";
+ o << "Event[" << e.getConnectionId() << " " << e.getType() << " " << e.getSize() << " bytes]";
return o;
}
Modified: qpid/branches/qpid-1673/qpid/cpp/src/qpid/cluster/Event.h
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-1673/qpid/cpp/src/qpid/cluster/Event.h?rev=752300&r1=752299&r2=752300&view=diff
==============================================================================
--- qpid/branches/qpid-1673/qpid/cpp/src/qpid/cluster/Event.h (original)
+++ qpid/branches/qpid-1673/qpid/cpp/src/qpid/cluster/Event.h Tue Mar 10 23:10:57 2009
@@ -24,6 +24,7 @@
#include "types.h"
#include "qpid/RefCountedBuffer.h"
+#include "qpid/framing/AMQFrame.h"
#include "qpid/sys/LatencyMetric.h"
#include <sys/uio.h> // For iovec
#include <iosfwd>
@@ -34,6 +35,7 @@
namespace framing {
class AMQBody;
+class AMQFrame;
class Buffer;
}
@@ -55,11 +57,9 @@
/** Size of header + payload. */
size_t getStoreSize() { return size + HEADER_SIZE; }
- uint64_t getSequence() const { return sequence; }
- void setSequence(uint64_t n) { sequence = n; }
-
- bool isCluster() const { return connectionId.getPointer() == 0; }
- bool isConnection() const { return connectionId.getPointer() != 0; }
+ bool isCluster() const { return connectionId.getNumber() == 0; }
+ bool isConnection() const { return connectionId.getNumber() != 0; }
+ bool isControl() const { return type == CONTROL; }
protected:
static const size_t HEADER_SIZE;
@@ -67,7 +67,6 @@
EventType type;
ConnectionId connectionId;
size_t size;
- uint64_t sequence;
};
/**
@@ -83,8 +82,11 @@
/** Create an event copied from delivered data. */
static Event decodeCopy(const MemberId& m, framing::Buffer&);
- /** Create an event containing a control */
+ /** Create a control event. */
static Event control(const framing::AMQBody&, const ConnectionId&);
+
+ /** Create a control event. */
+ static Event control(const framing::AMQFrame&, const ConnectionId&);
// Data excluding header.
char* getData() { return store + HEADER_SIZE; }
@@ -93,6 +95,8 @@
// Store including header
char* getStore() { return store; }
const char* getStore() const { return store; }
+
+ framing::AMQFrame getFrame() const;
operator framing::Buffer() const;
@@ -105,6 +109,7 @@
};
std::ostream& operator << (std::ostream&, const EventHeader&);
+
}} // namespace qpid::cluster
#endif /*!QPID_CLUSTER_EVENT_H*/
Modified: qpid/branches/qpid-1673/qpid/cpp/src/qpid/cluster/EventFrame.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-1673/qpid/cpp/src/qpid/cluster/EventFrame.cpp?rev=752300&r1=752299&r2=752300&view=diff
==============================================================================
--- qpid/branches/qpid-1673/qpid/cpp/src/qpid/cluster/EventFrame.cpp (original)
+++ qpid/branches/qpid-1673/qpid/cpp/src/qpid/cluster/EventFrame.cpp Tue Mar 10 23:10:57 2009
@@ -24,16 +24,18 @@
namespace qpid {
namespace cluster {
-EventFrame::EventFrame() : sequence(0) {}
+EventFrame::EventFrame() {}
EventFrame::EventFrame(const EventHeader& e, const framing::AMQFrame& f, int rc)
- : connectionId(e.getConnectionId()), frame(f), sequence(e.getSequence()), readCredit(rc)
+ : connectionId(e.getConnectionId()), frame(f), readCredit(rc), type(e.getType())
{
QPID_LATENCY_INIT(frame);
}
std::ostream& operator<<(std::ostream& o, const EventFrame& e) {
- return o << e.connectionId << "/" << e.sequence << " " << e.frame << " rc=" << e.readCredit;
+ return o << e.frame << " " << e.type << " " << e.connectionId;
+ if (e.readCredit) o << " read-credit=" << e.readCredit;
+ return o;
}
}} // namespace qpid::cluster
Modified: qpid/branches/qpid-1673/qpid/cpp/src/qpid/cluster/EventFrame.h
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-1673/qpid/cpp/src/qpid/cluster/EventFrame.h?rev=752300&r1=752299&r2=752300&view=diff
==============================================================================
--- qpid/branches/qpid-1673/qpid/cpp/src/qpid/cluster/EventFrame.h (original)
+++ qpid/branches/qpid-1673/qpid/cpp/src/qpid/cluster/EventFrame.h Tue Mar 10 23:10:57 2009
@@ -42,22 +42,15 @@
EventFrame(const EventHeader& e, const framing::AMQFrame& f, int rc=0);
- bool isCluster() const { return !connectionId.getPointer(); }
- bool isConnection() const { return connectionId.getPointer(); }
+ bool isCluster() const { return connectionId.getNumber() == 0; }
+ bool isConnection() const { return connectionId.getNumber() != 0; }
bool isLastInEvent() const { return readCredit; }
- // True if this frame follows immediately after frame e.
- bool follows(const EventFrame& e) const {
- return sequence == e.sequence || (sequence == e.sequence+1 && e.readCredit);
- }
-
- bool operator<(const EventFrame& e) const { return sequence < e.sequence; }
-
ConnectionId connectionId;
framing::AMQFrame frame;
- uint64_t sequence;
- int readCredit; // last frame in an event, give credit when processed.
+ int readCredit; ///< last frame in an event, give credit when processed.
+ EventType type;
};
std::ostream& operator<<(std::ostream& o, const EventFrame& e);
Modified: qpid/branches/qpid-1673/qpid/cpp/src/qpid/cluster/ExpiryPolicy.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-1673/qpid/cpp/src/qpid/cluster/ExpiryPolicy.cpp?rev=752300&r1=752299&r2=752300&view=diff
==============================================================================
--- qpid/branches/qpid-1673/qpid/cpp/src/qpid/cluster/ExpiryPolicy.cpp (original)
+++ qpid/branches/qpid-1673/qpid/cpp/src/qpid/cluster/ExpiryPolicy.cpp Tue Mar 10 23:10:57 2009
@@ -30,48 +30,46 @@
namespace qpid {
namespace cluster {
-ExpiryPolicy::ExpiryPolicy(const boost::function<bool()> & f, Multicaster& m, const MemberId& id, broker::Timer& t)
- : expiredPolicy(new Expired), isLeader(f), mcast(m), memberId(id), timer(t) {}
-
-namespace {
-uint64_t clusterId(const broker::Message& m) {
- assert(m.getFrames().begin() != m.getFrames().end());
- return m.getFrames().begin()->getClusterId();
-}
+ExpiryPolicy::ExpiryPolicy(Multicaster& m, const MemberId& id, broker::Timer& t)
+ : expiryId(0), expiredPolicy(new Expired), mcast(m), memberId(id), timer(t) {}
struct ExpiryTask : public broker::TimerTask {
ExpiryTask(const boost::intrusive_ptr<ExpiryPolicy>& policy, uint64_t id, sys::AbsTime when)
- : TimerTask(when), expiryPolicy(policy), messageId(id) {}
- void fire() { expiryPolicy->sendExpire(messageId); }
+ : TimerTask(when), expiryPolicy(policy), expiryId(id) {}
+ void fire() { expiryPolicy->sendExpire(expiryId); }
boost::intrusive_ptr<ExpiryPolicy> expiryPolicy;
- const uint64_t messageId;
+ const uint64_t expiryId;
};
-}
void ExpiryPolicy::willExpire(broker::Message& m) {
- timer.add(new ExpiryTask(this, clusterId(m), m.getExpiration()));
+ uint64_t id = expiryId++;
+ assert(unexpiredById.find(id) == unexpiredById.end());
+ assert(unexpiredByMessage.find(&m) == unexpiredByMessage.end());
+ unexpiredById[id] = &m;
+ unexpiredByMessage[&m] = id;
+ timer.add(new ExpiryTask(this, id, m.getExpiration()));
}
bool ExpiryPolicy::hasExpired(broker::Message& m) {
- sys::Mutex::ScopedLock l(lock);
- IdSet::iterator i = expired.find(clusterId(m));
- if (i != expired.end()) {
- expired.erase(i);
- const_cast<broker::Message&>(m).setExpiryPolicy(expiredPolicy); // hasExpired() == true;
- return true;
- }
- return false;
+ return unexpiredByMessage.find(&m) == unexpiredByMessage.end();
}
void ExpiryPolicy::sendExpire(uint64_t id) {
- sys::Mutex::ScopedLock l(lock);
- if (isLeader())
- mcast.mcastControl(framing::ClusterMessageExpiredBody(framing::ProtocolVersion(), id), memberId);
+ mcast.mcastControl(framing::ClusterMessageExpiredBody(framing::ProtocolVersion(), id), memberId);
}
void ExpiryPolicy::deliverExpire(uint64_t id) {
- sys::Mutex::ScopedLock l(lock);
- expired.insert(id);
+ IdMessageMap::iterator i = unexpiredById.find(id);
+ if (i != unexpiredById.end()) {
+ i->second->setExpiryPolicy(expiredPolicy); // hasExpired() == true;
+ unexpiredByMessage.erase(i->second);
+ unexpiredById.erase(i);
+ }
+}
+
+boost::optional<uint64_t> ExpiryPolicy::getId(broker::Message& m) {
+ MessageIdMap::iterator i = unexpiredByMessage.find(&m);
+ return i == unexpiredByMessage.end() ? boost::optional<uint64_t>() : i->second;
}
bool ExpiryPolicy::Expired::hasExpired(broker::Message&) { return true; }
Modified: qpid/branches/qpid-1673/qpid/cpp/src/qpid/cluster/ExpiryPolicy.h
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-1673/qpid/cpp/src/qpid/cluster/ExpiryPolicy.h?rev=752300&r1=752299&r2=752300&view=diff
==============================================================================
--- qpid/branches/qpid-1673/qpid/cpp/src/qpid/cluster/ExpiryPolicy.h (original)
+++ qpid/branches/qpid-1673/qpid/cpp/src/qpid/cluster/ExpiryPolicy.h Tue Mar 10 23:10:57 2009
@@ -27,11 +27,15 @@
#include "qpid/sys/Mutex.h"
#include <boost/function.hpp>
#include <boost/intrusive_ptr.hpp>
-#include <set>
+#include <boost/optional.hpp>
+#include <map>
namespace qpid {
-namespace broker { class Timer; }
+namespace broker {
+class Timer;
+class Message;
+}
namespace cluster {
class Multicaster;
@@ -42,7 +46,7 @@
class ExpiryPolicy : public broker::ExpiryPolicy
{
public:
- ExpiryPolicy(const boost::function<bool()> & isLeader, Multicaster&, const MemberId&, broker::Timer&);
+ ExpiryPolicy(Multicaster&, const MemberId&, broker::Timer&);
void willExpire(broker::Message&);
@@ -54,18 +58,24 @@
// Cluster delivers expiry notice.
void deliverExpire(uint64_t);
+ void setId(uint64_t id) { expiryId = id; }
+ uint64_t getId() const { return expiryId; }
+
+ boost::optional<uint64_t> getId(broker::Message&);
+
private:
- sys::Mutex lock;
- typedef std::set<uint64_t> IdSet;
+ typedef std::map<broker::Message*, uint64_t> MessageIdMap;
+ typedef std::map<uint64_t, broker::Message*> IdMessageMap;
struct Expired : public broker::ExpiryPolicy {
bool hasExpired(broker::Message&);
void willExpire(broker::Message&);
};
- IdSet expired;
+ MessageIdMap unexpiredByMessage;
+ IdMessageMap unexpiredById;
+ uint64_t expiryId;
boost::intrusive_ptr<Expired> expiredPolicy;
- boost::function<bool()> isLeader;
Multicaster& mcast;
MemberId memberId;
broker::Timer& timer;
Modified: qpid/branches/qpid-1673/qpid/cpp/src/qpid/cluster/Multicaster.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-1673/qpid/cpp/src/qpid/cluster/Multicaster.cpp?rev=752300&r1=752299&r2=752300&view=diff
==============================================================================
--- qpid/branches/qpid-1673/qpid/cpp/src/qpid/cluster/Multicaster.cpp (original)
+++ qpid/branches/qpid-1673/qpid/cpp/src/qpid/cluster/Multicaster.cpp Tue Mar 10 23:10:57 2009
@@ -24,6 +24,7 @@
#include "qpid/log/Statement.h"
#include "qpid/sys/LatencyMetric.h"
#include "qpid/framing/AMQBody.h"
+#include "qpid/framing/AMQFrame.h"
namespace qpid {
namespace cluster {
@@ -43,6 +44,11 @@
mcast(Event::control(body, id));
}
+void Multicaster::mcastControl(const framing::AMQFrame& frame, const ConnectionId& id) {
+ QPID_LOG(trace, "MCAST " << id << ": " << frame);
+ mcast(Event::control(frame, id));
+}
+
void Multicaster::mcastBuffer(const char* data, size_t size, const ConnectionId& id) {
Event e(DATA, id, size);
memcpy(e.getData(), data, size);
Modified: qpid/branches/qpid-1673/qpid/cpp/src/qpid/cluster/Multicaster.h
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-1673/qpid/cpp/src/qpid/cluster/Multicaster.h?rev=752300&r1=752299&r2=752300&view=diff
==============================================================================
--- qpid/branches/qpid-1673/qpid/cpp/src/qpid/cluster/Multicaster.h (original)
+++ qpid/branches/qpid-1673/qpid/cpp/src/qpid/cluster/Multicaster.h Tue Mar 10 23:10:57 2009
@@ -50,6 +50,7 @@
boost::function<void()> onError
);
void mcastControl(const framing::AMQBody& controlBody, const ConnectionId&);
+ void mcastControl(const framing::AMQFrame& controlFrame, const ConnectionId&);
void mcastBuffer(const char*, size_t, const ConnectionId&);
void mcast(const Event& e);
/** End holding mode, held events are mcast */
Modified: qpid/branches/qpid-1673/qpid/cpp/src/qpid/cluster/OutputInterceptor.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-1673/qpid/cpp/src/qpid/cluster/OutputInterceptor.cpp?rev=752300&r1=752299&r2=752300&view=diff
==============================================================================
--- qpid/branches/qpid-1673/qpid/cpp/src/qpid/cluster/OutputInterceptor.cpp (original)
+++ qpid/branches/qpid-1673/qpid/cpp/src/qpid/cluster/OutputInterceptor.cpp Tue Mar 10 23:10:57 2009
@@ -70,17 +70,12 @@
// Called in write thread when the IO layer has no more data to write.
// We do nothing in the write thread, we run doOutput only on delivery
// of doOutput requests.
-bool OutputInterceptor::doOutput() {
- QPID_LOG(trace, parent << " write idle.");
- return false;
-}
+bool OutputInterceptor::doOutput() { return false; }
// Delivery of doOutput allows us to run the real connection doOutput()
// which tranfers frames to the codec for writing.
//
void OutputInterceptor::deliverDoOutput(size_t requested) {
- QPID_LATENCY_RECORD("deliver do-output", *this);
- QPID_LATENCY_CLEAR(*this);
size_t buf = getBuffered();
if (parent.isLocal())
writeEstimate.delivered(requested, sent, buf); // Update the estimate.
@@ -91,9 +86,7 @@
moreOutput = parent.getBrokerConnection().doOutput();
} while (sent < requested && moreOutput);
sent += buf; // Include buffered data in the sent total.
-
- QPID_LOG(trace, "Delivered doOutput: requested=" << requested << " output=" << sent << " more=" << moreOutput);
-
+ QPID_LOG(trace, parent << " delivereDoOutput: requested=" << requested << " sent=" << sent << " more=" << moreOutput);
if (parent.isLocal() && moreOutput) {
QPID_LOG(trace, parent << " deliverDoOutput - sending doOutput, more output available.");
sendDoOutput();
Modified: qpid/branches/qpid-1673/qpid/cpp/src/qpid/cluster/PollableQueue.h
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-1673/qpid/cpp/src/qpid/cluster/PollableQueue.h?rev=752300&r1=752299&r2=752300&view=diff
==============================================================================
--- qpid/branches/qpid-1673/qpid/cpp/src/qpid/cluster/PollableQueue.h (original)
+++ qpid/branches/qpid-1673/qpid/cpp/src/qpid/cluster/PollableQueue.h Tue Mar 10 23:10:57 2009
@@ -52,6 +52,8 @@
}
catch (const std::exception& e) {
QPID_LOG(error, message << ": " << e.what());
+ values.clear();
+ this->stop();
error();
}
}
Modified: qpid/branches/qpid-1673/qpid/cpp/src/qpid/cluster/UpdateClient.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-1673/qpid/cpp/src/qpid/cluster/UpdateClient.cpp?rev=752300&r1=752299&r2=752300&view=diff
==============================================================================
--- qpid/branches/qpid-1673/qpid/cpp/src/qpid/cluster/UpdateClient.cpp (original)
+++ qpid/branches/qpid-1673/qpid/cpp/src/qpid/cluster/UpdateClient.cpp Tue Mar 10 23:10:57 2009
@@ -22,6 +22,8 @@
#include "Cluster.h"
#include "ClusterMap.h"
#include "Connection.h"
+#include "Decoder.h"
+#include "ExpiryPolicy.h"
#include "qpid/client/SessionBase_0_10Access.h"
#include "qpid/client/ConnectionAccess.h"
#include "qpid/broker/Broker.h"
@@ -86,33 +88,40 @@
// TODO aconway 2008-09-24: optimization: update connections/sessions in parallel.
UpdateClient::UpdateClient(const MemberId& updater, const MemberId& updatee, const Url& url,
- broker::Broker& broker, const ClusterMap& m, uint64_t frameId_,
- const Cluster::Connections& cons,
+ broker::Broker& broker, const ClusterMap& m, ExpiryPolicy& expiry_,
+ const Cluster::ConnectionVector& cons, Decoder& decoder_,
const boost::function<void()>& ok,
const boost::function<void(const std::exception&)>& fail,
const client::ConnectionSettings& cs
)
: updaterId(updater), updateeId(updatee), updateeUrl(url), updaterBroker(broker), map(m),
- frameId(frameId_), connections(cons),
+ expiry(expiry_), connections(cons), decoder(decoder_),
connection(catchUpConnection()), shadowConnection(catchUpConnection()),
- done(ok), failed(fail)
+ done(ok), failed(fail), connectionSettings(cs)
{
connection.open(url, cs);
- session = connection.newSession("update_shared");
+ session = connection.newSession(UPDATE);
}
UpdateClient::~UpdateClient() {}
// Reserved exchange/queue name for catch-up, avoid clashes with user queues/exchanges.
-const std::string UpdateClient::UPDATE("qpid.qpid-update");
+const std::string UpdateClient::UPDATE("qpid.cluster-update");
+
+void UpdateClient::run() {
+ try {
+ update();
+ done();
+ } catch (const std::exception& e) {
+ failed(e);
+ }
+ delete this;
+}
void UpdateClient::update() {
QPID_LOG(debug, updaterId << " updating state to " << updateeId << " at " << updateeUrl);
Broker& b = updaterBroker;
b.getExchanges().eachExchange(boost::bind(&UpdateClient::updateExchange, this, _1));
-
- // Update exchange is used to route messages to the proper queue without modifying routing key.
- session.exchangeDeclare(arg::exchange=UPDATE, arg::type="fanout", arg::autoDelete=true);
b.getQueues().eachQueue(boost::bind(&UpdateClient::updateQueue, this, _1));
// Update queue is used to transfer acquired messages that are no longer on their original queue.
session.queueDeclare(arg::queue=UPDATE, arg::autoDelete=true);
@@ -121,25 +130,15 @@
std::for_each(connections.begin(), connections.end(), boost::bind(&UpdateClient::updateConnection, this, _1));
+ ClusterConnectionProxy(session).expiryId(expiry.getId());
ClusterConnectionMembershipBody membership;
map.toMethodBody(membership);
- membership.setFrameId(frameId);
AMQFrame frame(membership);
client::ConnectionAccess::getImpl(connection)->handle(frame);
connection.close();
QPID_LOG(debug, updaterId << " updated state to " << updateeId << " at " << updateeUrl);
}
-void UpdateClient::run() {
- try {
- update();
- done();
- } catch (const std::exception& e) {
- failed(e);
- }
- delete this;
-}
-
namespace {
template <class T> std::string encode(const T& t) {
std::string encoded;
@@ -152,8 +151,7 @@
void UpdateClient::updateExchange(const boost::shared_ptr<Exchange>& ex) {
QPID_LOG(debug, updaterId << " updating exchange " << ex->getName());
- ClusterConnectionProxy proxy(session);
- proxy.exchange(encode(*ex));
+ ClusterConnectionProxy(session).exchange(encode(*ex));
}
/** Bind a queue to the update exchange and update messges to it
@@ -164,24 +162,40 @@
bool haveLastPos;
framing::SequenceNumber lastPos;
client::AsyncSession session;
-
+ ExpiryPolicy& expiry;
+
public:
- MessageUpdater(const string& q, const client::AsyncSession s) : queue(q), haveLastPos(false), session(s) {
+ MessageUpdater(const string& q, const client::AsyncSession s, ExpiryPolicy& expiry_) : queue(q), haveLastPos(false), session(s), expiry(expiry_) {
session.exchangeBind(queue, UpdateClient::UPDATE);
}
~MessageUpdater() {
- session.exchangeUnbind(queue, UpdateClient::UPDATE);
+ try {
+ session.exchangeUnbind(queue, UpdateClient::UPDATE);
+ }
+ catch (const std::exception& e) {
+ // Don't throw in a destructor.
+ QPID_LOG(error, "Unbinding update queue " << queue << ": " << e.what());
+ }
}
void updateQueuedMessage(const broker::QueuedMessage& message) {
+ // Send the queue position if necessary.
if (!haveLastPos || message.position - lastPos != 1) {
ClusterConnectionProxy(session).queuePosition(queue, message.position.getValue()-1);
haveLastPos = true;
}
lastPos = message.position;
+
+ // Send the expiry ID if necessary.
+ if (message.payload->getProperties<DeliveryProperties>()->getTtl()) {
+ boost::optional<uint64_t> expiryId = expiry.getId(*message.payload);
+ if (!expiryId) return; // Message already expired, don't replicate.
+ ClusterConnectionProxy(session).expiryId(*expiryId);
+ }
+
SessionBase_0_10Access sb(session);
framing::MessageTransferBody transfer(
framing::ProtocolVersion(), UpdateClient::UPDATE, message::ACCEPT_MODE_NONE, message::ACQUIRE_MODE_PRE_ACQUIRED);
@@ -204,16 +218,13 @@
void updateMessage(const boost::intrusive_ptr<broker::Message>& message) {
updateQueuedMessage(broker::QueuedMessage(0, message, haveLastPos? lastPos.getValue()+1 : 1));
}
-
-
};
-
void UpdateClient::updateQueue(const boost::shared_ptr<Queue>& q) {
QPID_LOG(debug, updaterId << " updating queue " << q->getName());
ClusterConnectionProxy proxy(session);
proxy.queue(encode(*q));
- MessageUpdater updater(q->getName(), session);
+ MessageUpdater updater(q->getName(), session, expiry);
q->eachMessage(boost::bind(&MessageUpdater::updateQueuedMessage, &updater, _1));
q->eachBinding(boost::bind(&UpdateClient::updateBinding, this, q->getName(), _1));
}
@@ -228,13 +239,16 @@
shadowConnection = catchUpConnection();
broker::Connection& bc = updateConnection->getBrokerConnection();
- // FIXME aconway 2008-10-20: What authentication info to use on reconnect?
- shadowConnection.open(updateeUrl, bc.getUserId(), ""/*password*/, "/"/*vhost*/, bc.getFrameMax());
+ connectionSettings.maxFrameSize = bc.getFrameMax();
+ shadowConnection.open(updateeUrl, connectionSettings);
bc.eachSessionHandler(boost::bind(&UpdateClient::updateSession, this, _1));
+ // Safe to use decoder here because we are stalled for update.
+ std::pair<const char*, size_t> fragment = decoder.get(updateConnection->getId()).getFragment();
ClusterConnectionProxy(shadowConnection).shadowReady(
updateConnection->getId().getMember(),
- reinterpret_cast<uint64_t>(updateConnection->getId().getPointer()),
- updateConnection->getBrokerConnection().getUserId()
+ updateConnection->getId().getNumber(),
+ bc.getUserId(),
+ string(fragment.first, fragment.second)
);
shadowConnection.close();
QPID_LOG(debug, updaterId << " updated connection " << *updateConnection);
@@ -269,7 +283,7 @@
SequenceNumber received = ss->receiverGetReceived().command;
if (inProgress)
--received;
-
+
// Reset command-sequence state.
proxy.sessionState(
ss->senderGetReplayPoint().command,
@@ -285,9 +299,6 @@
if (inProgress) {
inProgress->getFrames().map(simpl->out);
}
-
- // FIXME aconway 2008-09-23: update session replay list.
-
QPID_LOG(debug, updaterId << " updated session " << sh.getSession()->getId());
}
@@ -322,7 +333,7 @@
// If the message is acquired then it is no longer on the
// updatees queue, put it on the update queue for updatee to pick up.
//
- MessageUpdater(UPDATE, shadowSession).updateQueuedMessage(dr.getMessage());
+ MessageUpdater(UPDATE, shadowSession, expiry).updateQueuedMessage(dr.getMessage());
}
ClusterConnectionProxy(shadowSession).deliveryRecord(
dr.getQueue()->getName(),
@@ -341,8 +352,8 @@
class TxOpUpdater : public broker::TxOpConstVisitor, public MessageUpdater {
public:
- TxOpUpdater(UpdateClient& dc, client::AsyncSession s)
- : MessageUpdater(UpdateClient::UPDATE, s), parent(dc), session(s), proxy(s) {}
+ TxOpUpdater(UpdateClient& dc, client::AsyncSession s, ExpiryPolicy& expiry)
+ : MessageUpdater(UpdateClient::UPDATE, s, expiry), parent(dc), session(s), proxy(s) {}
void operator()(const broker::DtxAck& ) {
throw InternalErrorException("DTX transactions not currently supported by cluster.");
@@ -385,7 +396,7 @@
broker::TxBuffer::shared_ptr txBuffer = s.getTxBuffer();
if (txBuffer) {
proxy.txStart();
- TxOpUpdater updater(*this, shadowSession);
+ TxOpUpdater updater(*this, shadowSession, expiry);
txBuffer->accept(updater);
proxy.txEnd();
}
Modified: qpid/branches/qpid-1673/qpid/cpp/src/qpid/cluster/UpdateClient.h
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-1673/qpid/cpp/src/qpid/cluster/UpdateClient.h?rev=752300&r1=752299&r2=752300&view=diff
==============================================================================
--- qpid/branches/qpid-1673/qpid/cpp/src/qpid/cluster/UpdateClient.h (original)
+++ qpid/branches/qpid-1673/qpid/cpp/src/qpid/cluster/UpdateClient.h Tue Mar 10 23:10:57 2009
@@ -46,6 +46,7 @@
class DeliveryRecord;
class SessionState;
class SemanticState;
+class Decoder;
} // namespace broker
@@ -54,6 +55,8 @@
class Cluster;
class Connection;
class ClusterMap;
+class Decoder;
+class ExpiryPolicy;
/**
* A client that updates the contents of a local broker to a remote one using AMQP.
@@ -63,8 +66,8 @@
static const std::string UPDATE; // Name for special update queue and exchange.
UpdateClient(const MemberId& updater, const MemberId& updatee, const Url&,
- broker::Broker& donor, const ClusterMap& map, uint64_t sequence,
- const std::vector<boost::intrusive_ptr<Connection> >& ,
+ broker::Broker& donor, const ClusterMap& map, ExpiryPolicy& expiry,
+ const std::vector<boost::intrusive_ptr<Connection> >&, Decoder&,
const boost::function<void()>& done,
const boost::function<void(const std::exception&)>& fail,
const client::ConnectionSettings&
@@ -92,12 +95,14 @@
Url updateeUrl;
broker::Broker& updaterBroker;
ClusterMap map;
- uint64_t frameId;
+ ExpiryPolicy& expiry;
std::vector<boost::intrusive_ptr<Connection> > connections;
+ Decoder& decoder;
client::Connection connection, shadowConnection;
client::AsyncSession session, shadowSession;
boost::function<void()> done;
boost::function<void(const std::exception& e)> failed;
+ client::ConnectionSettings connectionSettings;
};
}} // namespace qpid::cluster
Modified: qpid/branches/qpid-1673/qpid/cpp/src/qpid/cluster/types.h
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-1673/qpid/cpp/src/qpid/cluster/types.h?rev=752300&r1=752299&r2=752300&view=diff
==============================================================================
--- qpid/branches/qpid-1673/qpid/cpp/src/qpid/cluster/types.h (original)
+++ qpid/branches/qpid-1673/qpid/cpp/src/qpid/cluster/types.h Tue Mar 10 23:10:57 2009
@@ -68,16 +68,17 @@
std::ostream& operator<<(std::ostream&, const MemberId&);
-struct ConnectionId : public std::pair<MemberId, Connection*> {
- ConnectionId(const MemberId& m=MemberId(), Connection* c=0) : std::pair<MemberId, Connection*> (m,c) {}
- ConnectionId(uint64_t m, uint64_t c)
- : std::pair<MemberId, Connection*>(MemberId(m), reinterpret_cast<Connection*>(c)) {}
+struct ConnectionId : public std::pair<MemberId, uint64_t> {
+ ConnectionId(const MemberId& m=MemberId(), uint64_t c=0) : std::pair<MemberId, uint64_t> (m,c) {}
+ ConnectionId(uint64_t m, uint64_t c) : std::pair<MemberId, uint64_t>(MemberId(m), c) {}
MemberId getMember() const { return first; }
- Connection* getPointer() const { return second; }
+ uint64_t getNumber() const { return second; }
};
std::ostream& operator<<(std::ostream&, const ConnectionId&);
+std::ostream& operator<<(std::ostream&, EventType);
+
}} // namespace qpid::cluster
#endif /*!QPID_CLUSTER_TYPES_H*/
Modified: qpid/branches/qpid-1673/qpid/cpp/src/qpid/console/ClassKey.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-1673/qpid/cpp/src/qpid/console/ClassKey.cpp?rev=752300&r1=752299&r2=752300&view=diff
==============================================================================
--- qpid/branches/qpid-1673/qpid/cpp/src/qpid/console/ClassKey.cpp (original)
+++ qpid/branches/qpid-1673/qpid/cpp/src/qpid/console/ClassKey.cpp Tue Mar 10 23:10:57 2009
@@ -21,6 +21,7 @@
#include "ClassKey.h"
#include <string.h>
+#include <cstdio>
using namespace std;
using namespace qpid::console;
Modified: qpid/branches/qpid-1673/qpid/cpp/src/qpid/framing/AMQFrame.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-1673/qpid/cpp/src/qpid/framing/AMQFrame.cpp?rev=752300&r1=752299&r2=752300&view=diff
==============================================================================
--- qpid/branches/qpid-1673/qpid/cpp/src/qpid/framing/AMQFrame.cpp (original)
+++ qpid/branches/qpid-1673/qpid/cpp/src/qpid/framing/AMQFrame.cpp Tue Mar 10 23:10:57 2009
@@ -41,7 +41,7 @@
AMQFrame::AMQFrame(const AMQBody& b) : body(b.clone()) { init(); }
-AMQFrame::~AMQFrame() {}
+AMQFrame::~AMQFrame() { init(); }
AMQBody* AMQFrame::getBody() {
// Non-const AMQBody* may be used to modify the body.
Modified: qpid/branches/qpid-1673/qpid/cpp/src/qpid/framing/AMQFrame.h
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-1673/qpid/cpp/src/qpid/framing/AMQFrame.h?rev=752300&r1=752299&r2=752300&view=diff
==============================================================================
--- qpid/branches/qpid-1673/qpid/cpp/src/qpid/framing/AMQFrame.h (original)
+++ qpid/branches/qpid-1673/qpid/cpp/src/qpid/framing/AMQFrame.h Tue Mar 10 23:10:57 2009
@@ -93,9 +93,6 @@
/** Must point to at least DECODE_SIZE_MIN bytes of data */
static uint16_t decodeSize(char* data);
- uint64_t getClusterId() const { return clusterId; }
- void setClusterId(uint64_t id) { clusterId = id; }
-
private:
void init();
@@ -107,7 +104,6 @@
bool bos : 1;
bool eos : 1;
mutable uint32_t encodedSizeCache;
- uint64_t clusterId; // Used to identify frames in a clustered broekr.
};
QPID_COMMON_EXTERN std::ostream& operator<<(std::ostream&, const AMQFrame&);
Modified: qpid/branches/qpid-1673/qpid/cpp/src/qpid/framing/FrameDecoder.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-1673/qpid/cpp/src/qpid/framing/FrameDecoder.cpp?rev=752300&r1=752299&r2=752300&view=diff
==============================================================================
--- qpid/branches/qpid-1673/qpid/cpp/src/qpid/framing/FrameDecoder.cpp (original)
+++ qpid/branches/qpid-1673/qpid/cpp/src/qpid/framing/FrameDecoder.cpp Tue Mar 10 23:10:57 2009
@@ -21,8 +21,9 @@
#include "FrameDecoder.h"
#include "Buffer.h"
#include "qpid/log/Statement.h"
-#include <algorithm>
#include "qpid/framing/reply_exceptions.h"
+#include <algorithm>
+#include <string.h>
namespace qpid {
namespace framing {
@@ -67,4 +68,13 @@
return false;
}
+void FrameDecoder::setFragment(const char* data, size_t size) {
+ fragment.resize(size);
+ ::memcpy(&fragment[0], data, size);
+}
+
+std::pair<const char*, size_t> FrameDecoder::getFragment() const {
+ return std::pair<const char*, size_t>(&fragment[0], fragment.size());
+}
+
}} // namespace qpid::framing
Modified: qpid/branches/qpid-1673/qpid/cpp/src/qpid/framing/FrameDecoder.h
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-1673/qpid/cpp/src/qpid/framing/FrameDecoder.h?rev=752300&r1=752299&r2=752300&view=diff
==============================================================================
--- qpid/branches/qpid-1673/qpid/cpp/src/qpid/framing/FrameDecoder.h (original)
+++ qpid/branches/qpid-1673/qpid/cpp/src/qpid/framing/FrameDecoder.h Tue Mar 10 23:10:57 2009
@@ -35,9 +35,16 @@
{
public:
bool decode(Buffer& buffer);
- AMQFrame frame;
+ const AMQFrame& getFrame() const { return frame; }
+ AMQFrame& getFrame() { return frame; }
+
+ void setFragment(const char*, size_t);
+ std::pair<const char*, size_t> getFragment() const;
+
private:
std::vector<char> fragment;
+ AMQFrame frame;
+
};
}} // namespace qpid::framing
Modified: qpid/branches/qpid-1673/qpid/cpp/src/qpid/framing/Uuid.h
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-1673/qpid/cpp/src/qpid/framing/Uuid.h?rev=752300&r1=752299&r2=752300&view=diff
==============================================================================
--- qpid/branches/qpid-1673/qpid/cpp/src/qpid/framing/Uuid.h (original)
+++ qpid/branches/qpid-1673/qpid/cpp/src/qpid/framing/Uuid.h Tue Mar 10 23:10:57 2009
@@ -19,8 +19,9 @@
*
*/
-#include "qpid/sys/uuid.h"
#include "qpid/CommonImportExport.h"
+#include "qpid/sys/uuid.h"
+#include "qpid/sys/IntegerTypes.h"
#include <boost/array.hpp>
Modified: qpid/branches/qpid-1673/qpid/cpp/src/qpid/log/Selector.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-1673/qpid/cpp/src/qpid/log/Selector.cpp?rev=752300&r1=752299&r2=752300&view=diff
==============================================================================
--- qpid/branches/qpid-1673/qpid/cpp/src/qpid/log/Selector.cpp (original)
+++ qpid/branches/qpid-1673/qpid/cpp/src/qpid/log/Selector.cpp Tue Mar 10 23:10:57 2009
@@ -20,6 +20,7 @@
#include "Options.h"
#include <boost/bind.hpp>
#include <algorithm>
+#include <string.h>
namespace qpid {
namespace log {
Modified: qpid/branches/qpid-1673/qpid/cpp/src/qpid/log/posix/SinkOptions.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-1673/qpid/cpp/src/qpid/log/posix/SinkOptions.cpp?rev=752300&r1=752299&r2=752300&view=diff
==============================================================================
--- qpid/branches/qpid-1673/qpid/cpp/src/qpid/log/posix/SinkOptions.cpp (original)
+++ qpid/branches/qpid-1673/qpid/cpp/src/qpid/log/posix/SinkOptions.cpp Tue Mar 10 23:10:57 2009
@@ -42,10 +42,14 @@
struct NameValue { const char* name; int value; };
NameValue nameValue[] = {
{ "AUTH", LOG_AUTH },
+#ifdef HAVE_LOG_AUTHPRIV
{ "AUTHPRIV", LOG_AUTHPRIV },
+#endif
{ "CRON", LOG_CRON },
{ "DAEMON", LOG_DAEMON },
+#ifdef HAVE_LOG_FTP
{ "FTP", LOG_FTP },
+#endif
{ "KERN", LOG_KERN },
{ "LOCAL0", LOG_LOCAL0 },
{ "LOCAL1", LOG_LOCAL1 },
@@ -72,7 +76,7 @@
int value(const string& name) const {
string key(name);
- transform(key.begin(), key.end(), key.begin(), ::toupper);
+ std::transform(key.begin(), key.end(), key.begin(), ::toupper);
ByName::const_iterator i = byName.find(key);
if (i == byName.end())
throw Exception("Not a valid syslog facility: " + name);
Modified: qpid/branches/qpid-1673/qpid/cpp/src/qpid/management/ManagementBroker.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-1673/qpid/cpp/src/qpid/management/ManagementBroker.cpp?rev=752300&r1=752299&r2=752300&view=diff
==============================================================================
--- qpid/branches/qpid-1673/qpid/cpp/src/qpid/management/ManagementBroker.cpp (original)
+++ qpid/branches/qpid-1673/qpid/cpp/src/qpid/management/ManagementBroker.cpp Tue Mar 10 23:10:57 2009
@@ -80,7 +80,7 @@
}
ManagementBroker::ManagementBroker () :
- threadPoolSize(1), interval(10), broker(0)
+ threadPoolSize(1), interval(10), broker(0), startTime(uint64_t(Duration(now())))
{
nextObjectId = 1;
brokerBank = 1;
@@ -346,6 +346,9 @@
string routingKey;
list<pair<ObjectId, ManagementObject*> > deleteList;
+ uint64_t uptime = uint64_t(Duration(now())) - startTime;
+ static_cast<_qmf::Broker*>(broker->GetManagementObject())->set_uptime(uptime);
+
moveNewObjectsLH();
if (clientWasAdded) {
@@ -844,6 +847,9 @@
Buffer outBuffer (outputBuffer, MA_BUFFER_SIZE);
uint32_t outLen;
+ if (object->getConfigChanged() || object->getInstChanged())
+ object->setUpdateTime();
+
encodeHeader(outBuffer, 'g', sequence);
object->writeProperties(outBuffer);
object->writeStatistics(outBuffer, true);
@@ -865,6 +871,9 @@
Buffer outBuffer (outputBuffer, MA_BUFFER_SIZE);
uint32_t outLen;
+ if (object->getConfigChanged() || object->getInstChanged())
+ object->setUpdateTime();
+
encodeHeader(outBuffer, 'g', sequence);
object->writeProperties(outBuffer);
object->writeStatistics(outBuffer, true);
Modified: qpid/branches/qpid-1673/qpid/cpp/src/qpid/management/ManagementBroker.h
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-1673/qpid/cpp/src/qpid/management/ManagementBroker.h?rev=752300&r1=752299&r2=752300&view=diff
==============================================================================
--- qpid/branches/qpid-1673/qpid/cpp/src/qpid/management/ManagementBroker.h (original)
+++ qpid/branches/qpid-1673/qpid/cpp/src/qpid/management/ManagementBroker.h Tue Mar 10 23:10:57 2009
@@ -182,6 +182,7 @@
uint32_t nextRemoteBank;
uint32_t nextRequestSequence;
bool clientWasAdded;
+ const uint64_t startTime;
std::auto_ptr<IdAllocator> allocator;
Modified: qpid/branches/qpid-1673/qpid/cpp/src/qpid/replication/ReplicatingEventListener.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-1673/qpid/cpp/src/qpid/replication/ReplicatingEventListener.cpp?rev=752300&r1=752299&r2=752300&view=diff
==============================================================================
--- qpid/branches/qpid-1673/qpid/cpp/src/qpid/replication/ReplicatingEventListener.cpp (original)
+++ qpid/branches/qpid-1673/qpid/cpp/src/qpid/replication/ReplicatingEventListener.cpp Tue Mar 10 23:10:57 2009
@@ -57,7 +57,6 @@
{
FieldTable headers;
headers.setString(REPLICATION_TARGET_QUEUE, dequeued.queue->getName());
- headers.setInt(REPLICATION_EVENT_SEQNO, ++sequence);
headers.setInt(REPLICATION_EVENT_TYPE, DEQUEUE);
headers.setInt(DEQUEUED_MESSAGE_POSITION, dequeued.position);
boost::intrusive_ptr<Message> msg(createMessage(headers));
@@ -69,7 +68,6 @@
boost::intrusive_ptr<Message> msg(cloneMessage(*(enqueued.queue), enqueued.payload));
FieldTable& headers = msg->getProperties<MessageProperties>()->getApplicationHeaders();
headers.setString(REPLICATION_TARGET_QUEUE, enqueued.queue->getName());
- headers.setInt(REPLICATION_EVENT_SEQNO, ++sequence);
headers.setInt(REPLICATION_EVENT_TYPE, ENQUEUE);
queue->deliver(msg);
}
@@ -131,12 +129,14 @@
{
Broker* broker = dynamic_cast<broker::Broker*>(&target);
if (broker && !options.queue.empty()) {
+ broker->addFinalizer(boost::bind(&ReplicatingEventListener::shutdown, this));
if (options.createQueue) {
queue = broker->getQueues().declare(options.queue).first;
} else {
queue = broker->getQueues().find(options.queue);
}
if (queue) {
+ queue->insertSequenceNumbers(REPLICATION_EVENT_SEQNO);
QueueEvents::EventListener callback = boost::bind(&ReplicatingEventListener::handle, this, _1);
broker->getQueueEvents().registerListener(options.name, callback);
QPID_LOG(info, "Registered replicating queue event listener");
@@ -147,6 +147,7 @@
}
void ReplicatingEventListener::earlyInitialize(Target&) {}
+void ReplicatingEventListener::shutdown() { queue.reset(); }
ReplicatingEventListener::PluginOptions::PluginOptions() : Options("Queue Replication Options"),
name("replicator"),
Modified: qpid/branches/qpid-1673/qpid/cpp/src/qpid/replication/ReplicatingEventListener.h
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-1673/qpid/cpp/src/qpid/replication/ReplicatingEventListener.h?rev=752300&r1=752299&r2=752300&view=diff
==============================================================================
--- qpid/branches/qpid-1673/qpid/cpp/src/qpid/replication/ReplicatingEventListener.h (original)
+++ qpid/branches/qpid-1673/qpid/cpp/src/qpid/replication/ReplicatingEventListener.h Tue Mar 10 23:10:57 2009
@@ -58,10 +58,10 @@
PluginOptions options;
qpid::broker::Queue::shared_ptr queue;
- qpid::framing::SequenceNumber sequence;
void deliverDequeueMessage(const qpid::broker::QueuedMessage& enqueued);
void deliverEnqueueMessage(const qpid::broker::QueuedMessage& enqueued);
+ void shutdown();
boost::intrusive_ptr<qpid::broker::Message> createMessage(const qpid::framing::FieldTable& headers);
boost::intrusive_ptr<qpid::broker::Message> cloneMessage(qpid::broker::Queue& queue,
Modified: qpid/branches/qpid-1673/qpid/cpp/src/qpid/replication/ReplicationExchange.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-1673/qpid/cpp/src/qpid/replication/ReplicationExchange.cpp?rev=752300&r1=752299&r2=752300&view=diff
==============================================================================
--- qpid/branches/qpid-1673/qpid/cpp/src/qpid/replication/ReplicationExchange.cpp (original)
+++ qpid/branches/qpid-1673/qpid/cpp/src/qpid/replication/ReplicationExchange.cpp Tue Mar 10 23:10:57 2009
@@ -34,11 +34,13 @@
using namespace qpid::framing;
using namespace qpid::replication::constants;
+const std::string SEQUENCE_VALUE("qpid.replication-event.sequence");
ReplicationExchange::ReplicationExchange(const std::string& name, bool durable,
const FieldTable& args,
QueueRegistry& qr,
Manageable* parent)
- : Exchange(name, durable, args, parent), queues(qr), init(false) {}
+ : Exchange(name, durable, args, parent), queues(qr), sequence(args.getAsInt64(SEQUENCE_VALUE)), init(false)
+ {}
std::string ReplicationExchange::getType() const { return typeName; }
@@ -68,26 +70,33 @@
{
std::string queueName = args->getAsString(REPLICATION_TARGET_QUEUE);
Queue::shared_ptr queue = queues.find(queueName);
- FieldTable& headers = msg.getMessage().getProperties<MessageProperties>()->getApplicationHeaders();
- headers.erase(REPLICATION_TARGET_QUEUE);
- headers.erase(REPLICATION_EVENT_SEQNO);
- headers.erase(REPLICATION_EVENT_TYPE);
- msg.deliverTo(queue);
- QPID_LOG(debug, "Enqueued replicated message onto " << queue);
+ if (queue) {
+ FieldTable& headers = msg.getMessage().getProperties<MessageProperties>()->getApplicationHeaders();
+ headers.erase(REPLICATION_TARGET_QUEUE);
+ headers.erase(REPLICATION_EVENT_SEQNO);
+ headers.erase(REPLICATION_EVENT_TYPE);
+ msg.deliverTo(queue);
+ QPID_LOG(debug, "Enqueued replicated message onto " << queueName);
+ } else {
+ QPID_LOG(error, "Cannot enqueue replicated message. Queue " << queueName << " does not exist");
+ }
}
void ReplicationExchange::handleDequeueEvent(const FieldTable* args)
{
std::string queueName = args->getAsString(REPLICATION_TARGET_QUEUE);
Queue::shared_ptr queue = queues.find(queueName);
- SequenceNumber position(args->getAsInt(DEQUEUED_MESSAGE_POSITION));
-
- QueuedMessage dequeued;
- if (queue->acquireMessageAt(position, dequeued)) {
- queue->dequeue(0, dequeued);
- QPID_LOG(debug, "Processed replicated 'dequeue' event from " << queueName << " at position " << position);
+ if (queue) {
+ SequenceNumber position(args->getAsInt(DEQUEUED_MESSAGE_POSITION));
+ QueuedMessage dequeued;
+ if (queue->acquireMessageAt(position, dequeued)) {
+ queue->dequeue(0, dequeued);
+ QPID_LOG(debug, "Processed replicated 'dequeue' event from " << queueName << " at position " << position);
+ } else {
+ QPID_LOG(warning, "Could not acquire message " << position << " from " << queueName);
+ }
} else {
- QPID_LOG(warning, "Could not acquire message " << position << " from " << queueName);
+ QPID_LOG(error, "Cannot process replicated 'dequeue' event. Queue " << queueName << " does not exist");
}
}
@@ -128,6 +137,13 @@
const std::string ReplicationExchange::typeName("replication");
+void ReplicationExchange::encode(Buffer& buffer) const
+{
+ args.setInt64(std::string(SEQUENCE_VALUE), sequence);
+ Exchange::encode(buffer);
+}
+
+
struct ReplicationExchangePlugin : Plugin
{
Broker* broker;
Modified: qpid/branches/qpid-1673/qpid/cpp/src/qpid/replication/ReplicationExchange.h
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-1673/qpid/cpp/src/qpid/replication/ReplicationExchange.h?rev=752300&r1=752299&r2=752300&view=diff
==============================================================================
--- qpid/branches/qpid-1673/qpid/cpp/src/qpid/replication/ReplicationExchange.h (original)
+++ qpid/branches/qpid-1673/qpid/cpp/src/qpid/replication/ReplicationExchange.h Tue Mar 10 23:10:57 2009
@@ -22,6 +22,7 @@
*
*/
#include "qpid/broker/Exchange.h"
+#include "qpid/framing/Buffer.h"
#include "qpid/framing/SequenceNumber.h"
namespace qpid {
@@ -58,6 +59,7 @@
bool isDuplicate(const qpid::framing::FieldTable* args);
void handleEnqueueEvent(const qpid::framing::FieldTable* args, qpid::broker::Deliverable& msg);
void handleDequeueEvent(const qpid::framing::FieldTable* args);
+ void encode(framing::Buffer& buffer) const;
};
}} // namespace qpid::replication
Modified: qpid/branches/qpid-1673/qpid/cpp/src/qpid/sys/Codec.h
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-1673/qpid/cpp/src/qpid/sys/Codec.h?rev=752300&r1=752299&r2=752300&view=diff
==============================================================================
--- qpid/branches/qpid-1673/qpid/cpp/src/qpid/sys/Codec.h (original)
+++ qpid/branches/qpid-1673/qpid/cpp/src/qpid/sys/Codec.h Tue Mar 10 23:10:57 2009
@@ -38,11 +38,11 @@
* @return may be less than size if there was incomplete
* data at the end of the buffer.
*/
- virtual size_t decode(const char* buffer, size_t size) = 0;
+ virtual std::size_t decode(const char* buffer, std::size_t size) = 0;
/** Encode into buffer, return number of bytes encoded */
- virtual size_t encode(const char* buffer, size_t size) = 0;
+ virtual std::size_t encode(const char* buffer, std::size_t size) = 0;
/** Return true if we have data to encode */
virtual bool canEncode() = 0;
Modified: qpid/branches/qpid-1673/qpid/cpp/src/qpid/sys/ConnectionOutputHandlerPtr.h
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-1673/qpid/cpp/src/qpid/sys/ConnectionOutputHandlerPtr.h?rev=752300&r1=752299&r2=752300&view=diff
==============================================================================
--- qpid/branches/qpid-1673/qpid/cpp/src/qpid/sys/ConnectionOutputHandlerPtr.h (original)
+++ qpid/branches/qpid-1673/qpid/cpp/src/qpid/sys/ConnectionOutputHandlerPtr.h Tue Mar 10 23:10:57 2009
@@ -30,7 +30,7 @@
/**
* A ConnectionOutputHandler that delegates to another
* ConnectionOutputHandler. Allows the "real" ConnectionOutputHandler
- * to be changed modified without updating all the pointers/references
+ * to be changed without updating all the pointers/references
* using the ConnectionOutputHandlerPtr
*/
class ConnectionOutputHandlerPtr : public ConnectionOutputHandler
Modified: qpid/branches/qpid-1673/qpid/cpp/src/qpid/sys/CopyOnWriteArray.h
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-1673/qpid/cpp/src/qpid/sys/CopyOnWriteArray.h?rev=752300&r1=752299&r2=752300&view=diff
==============================================================================
--- qpid/branches/qpid-1673/qpid/cpp/src/qpid/sys/CopyOnWriteArray.h (original)
+++ qpid/branches/qpid-1673/qpid/cpp/src/qpid/sys/CopyOnWriteArray.h Tue Mar 10 23:10:57 2009
@@ -80,7 +80,7 @@
bool add_unless(T& t, F f)
{
Mutex::ScopedLock l(lock);
- if (array && find_if(array->begin(), array->end(), f) != array->end()) {
+ if (array && std::find_if(array->begin(), array->end(), f) != array->end()) {
return false;
} else {
ArrayPtr copy(array ? new std::vector<T>(*array) : new std::vector<T>());
Modified: qpid/branches/qpid-1673/qpid/cpp/src/qpid/sys/DispatchHandle.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-1673/qpid/cpp/src/qpid/sys/DispatchHandle.cpp?rev=752300&r1=752299&r2=752300&view=diff
==============================================================================
--- qpid/branches/qpid-1673/qpid/cpp/src/qpid/sys/DispatchHandle.cpp (original)
+++ qpid/branches/qpid-1673/qpid/cpp/src/qpid/sys/DispatchHandle.cpp Tue Mar 10 23:10:57 2009
@@ -21,6 +21,8 @@
#include "DispatchHandle.h"
+#include <algorithm>
+
#include <boost/cast.hpp>
#include <assert.h>
@@ -29,7 +31,6 @@
namespace sys {
DispatchHandle::~DispatchHandle() {
- stopWatch();
}
void DispatchHandle::startWatch(Poller::shared_ptr poller0) {
@@ -37,13 +38,21 @@
bool w = writableCallback;
ScopedLock<Mutex> lock(stateLock);
- assert(state == IDLE);
+ assert(state == IDLE || state == DELAYED_IDLE);
// If no callbacks set then do nothing (that is what we were asked to do!)
// TODO: Maybe this should be an assert instead
if (!r && !w) {
- state = INACTIVE;
- return;
+ switch (state) {
+ case IDLE:
+ state = INACTIVE;
+ return;
+ case DELAYED_IDLE:
+ state = DELAYED_INACTIVE;
+ return;
+ default:
+ assert(state == IDLE || state == DELAYED_IDLE);
+ }
}
Poller::Direction d = r ?
@@ -53,9 +62,20 @@
poller = poller0;
poller->addFd(*this, d);
- state = r ?
- (w ? ACTIVE_RW : ACTIVE_R) :
- ACTIVE_W;
+ switch (state) {
+ case IDLE:
+ state = r ?
+ (w ? ACTIVE_RW : ACTIVE_R) :
+ ACTIVE_W;
+ return;
+ case DELAYED_IDLE:
+ state = r ?
+ (w ? DELAYED_RW : DELAYED_R) :
+ DELAYED_W;
+ return;
+ default:
+ assert(state == IDLE || state == DELAYED_IDLE);
+ }
}
void DispatchHandle::rewatch() {
@@ -93,6 +113,8 @@
case ACTIVE_RW:
// Don't need to do anything already waiting for readable/writable
break;
+ case ACTIVE_DELETE:
+ assert(state != ACTIVE_DELETE);
}
}
@@ -130,6 +152,8 @@
poller->modFd(*this, Poller::INOUT);
state = ACTIVE_RW;
break;
+ case ACTIVE_DELETE:
+ assert(state != ACTIVE_DELETE);
}
}
@@ -167,6 +191,8 @@
case ACTIVE_RW:
// Nothing to do: already waiting for writable
break;
+ case ACTIVE_DELETE:
+ assert(state != ACTIVE_DELETE);
}
}
@@ -203,6 +229,8 @@
case ACTIVE_W:
case INACTIVE:
break;
+ case ACTIVE_DELETE:
+ assert(state != ACTIVE_DELETE);
}
}
@@ -239,6 +267,8 @@
case ACTIVE_R:
case INACTIVE:
break;
+ case ACTIVE_DELETE:
+ assert(state != ACTIVE_DELETE);
}
}
@@ -261,6 +291,8 @@
poller->modFd(*this, Poller::NONE);
state = INACTIVE;
break;
+ case ACTIVE_DELETE:
+ assert(state != ACTIVE_DELETE);
}
}
@@ -280,47 +312,72 @@
default:
state = IDLE;
break;
+ case ACTIVE_DELETE:
+ assert(state != ACTIVE_DELETE);
}
assert(poller);
poller->delFd(*this);
poller.reset();
}
+// If we are already in the IDLE state we can't do the callback as we might
+// race to delete and callback at the same time
+// TODO: might be able to fix this by adding a new state, but would make
+// the state machine even more complex
void DispatchHandle::call(Callback iCb) {
assert(iCb);
ScopedLock<Mutex> lock(stateLock);
- interruptedCallbacks.push(iCb);
-
- (void) poller->interrupt(*this);
+ switch (state) {
+ case IDLE:
+ case ACTIVE_DELETE:
+ assert(false);
+ return;
+ default:
+ interruptedCallbacks.push(iCb);
+ assert(poller);
+ (void) poller->interrupt(*this);
+ }
}
// The slightly strange switch structure
// is to ensure that the lock is released before
// we do the delete
void DispatchHandle::doDelete() {
- // Ensure that we're no longer watching anything
- stopWatch();
-
- // If we're in the middle of a callback defer the delete
{
ScopedLock<Mutex> lock(stateLock);
+ // Ensure that we're no longer watching anything
switch (state) {
+ case DELAYED_R:
+ case DELAYED_W:
+ case DELAYED_RW:
+ case DELAYED_INACTIVE:
+ assert(poller);
+ poller->delFd(*this);
+ poller.reset();
+ // Fallthrough
case DELAYED_IDLE:
- case DELAYED_DELETE:
state = DELAYED_DELETE;
+ // Fallthrough
+ case DELAYED_DELETE:
+ case ACTIVE_DELETE:
return;
case IDLE:
break;
default:
- // Can only get out of stopWatch() in DELAYED_IDLE/DELAYED_DELETE/IDLE states
- assert(false);
+ state = ACTIVE_DELETE;
+ assert(poller);
+ (void) poller->interrupt(*this);
+ poller->delFd(*this);
+ return;
}
}
- // If we're not then do it right away
+ // If we're IDLE we can do this right away
delete this;
}
void DispatchHandle::processEvent(Poller::EventType type) {
+ CallbackQueue callbacks;
+
// Note that we are now doing the callbacks
{
ScopedLock<Mutex> lock(stateLock);
@@ -336,6 +393,16 @@
case ACTIVE_RW:
state = DELAYED_RW;
break;
+ case ACTIVE_DELETE:
+ // Need to make sure we clean up any pending callbacks in this case
+ std::swap(callbacks, interruptedCallbacks);
+ goto saybyebye;
+ // Can get here in idle if we are stopped in a different thread
+ // just after we return with this handle in Poller::wait
+ case IDLE:
+ // Can get here in INACTIVE if a non connection thread unwatches
+ // whilst we were stuck in the above lock
+ case INACTIVE:
// Can only get here in a DELAYED_* state in the rare case
// that we're already here for reading and we get activated for
// writing and we can write (it might be possible the other way
@@ -348,9 +415,9 @@
case DELAYED_IDLE:
case DELAYED_DELETE:
return;
- default:
- assert(false);
}
+
+ std::swap(callbacks, interruptedCallbacks);
}
// Do callbacks - whilst we are doing the callbacks we are prevented from processing
@@ -378,8 +445,8 @@
break;
case Poller::INTERRUPTED:
{
- ScopedLock<Mutex> lock(stateLock);
- assert(interruptedCallbacks.size() > 0);
+ // We could only be interrupted if we also had a callback to do
+ assert(callbacks.size() > 0);
// We'll actually do the interrupt below
}
break;
@@ -387,16 +454,18 @@
assert(false);
}
- {
- ScopedLock<Mutex> lock(stateLock);
- // If we've got a pending interrupt do it now
- while (interruptedCallbacks.size() > 0) {
- Callback cb = interruptedCallbacks.front();
+ // If we have any callbacks do them now -
+ // (because we use a copy from before the previous callbacks we won't
+ // do anything yet that was just added)
+ while (callbacks.size() > 0) {
+ Callback cb = callbacks.front();
assert(cb);
cb(*this);
- interruptedCallbacks.pop();
+ callbacks.pop();
}
+ {
+ ScopedLock<Mutex> lock(stateLock);
// If any of the callbacks re-enabled reading/writing then actually
// do it now
switch (state) {
@@ -425,7 +494,9 @@
case DELAYED_DELETE:
break;
}
- }
+ }
+
+saybyebye:
delete this;
}
Modified: qpid/branches/qpid-1673/qpid/cpp/src/qpid/sys/DispatchHandle.h
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-1673/qpid/cpp/src/qpid/sys/DispatchHandle.h?rev=752300&r1=752299&r2=752300&view=diff
==============================================================================
--- qpid/branches/qpid-1673/qpid/cpp/src/qpid/sys/DispatchHandle.h (original)
+++ qpid/branches/qpid-1673/qpid/cpp/src/qpid/sys/DispatchHandle.h Tue Mar 10 23:10:57 2009
@@ -65,6 +65,7 @@
Mutex stateLock;
enum {
IDLE, INACTIVE, ACTIVE_R, ACTIVE_W, ACTIVE_RW,
+ ACTIVE_DELETE,
DELAYED_IDLE, DELAYED_INACTIVE, DELAYED_R, DELAYED_W, DELAYED_RW,
DELAYED_DELETE
} state;
Modified: qpid/branches/qpid-1673/qpid/cpp/src/qpid/sys/PollableQueue.h
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-1673/qpid/cpp/src/qpid/sys/PollableQueue.h?rev=752300&r1=752299&r2=752300&view=diff
==============================================================================
--- qpid/branches/qpid-1673/qpid/cpp/src/qpid/sys/PollableQueue.h (original)
+++ qpid/branches/qpid-1673/qpid/cpp/src/qpid/sys/PollableQueue.h Tue Mar 10 23:10:57 2009
@@ -53,7 +53,7 @@
* @param values Queue of values to process. Any items remaining
* on return from Callback are put back on the queue.
*/
- typedef boost::function<void (Queue& values)> Callback;
+ typedef boost::function<void (Queue&)> Callback;
/**
* Constructor; sets necessary parameters.
Modified: qpid/branches/qpid-1673/qpid/cpp/src/qpid/sys/Poller.h
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-1673/qpid/cpp/src/qpid/sys/Poller.h?rev=752300&r1=752299&r2=752300&view=diff
==============================================================================
--- qpid/branches/qpid-1673/qpid/cpp/src/qpid/sys/Poller.h (original)
+++ qpid/branches/qpid-1673/qpid/cpp/src/qpid/sys/Poller.h Tue Mar 10 23:10:57 2009
@@ -86,8 +86,9 @@
// with the handle and the INTERRUPTED event type
// if it returns false then the handle is not being monitored by the poller
// - This can either be because it has just received an event which has been
- // reported and has not been reenabled since. Or because it was removed
- // from the monitoring set
+ // reported and has not been reenabled since.
+ // - Because it was removed from the monitoring set
+ // - Or because it is already being interrupted
QPID_COMMON_EXTERN bool interrupt(PollerHandle& handle);
// Poller run loop
Modified: qpid/branches/qpid-1673/qpid/cpp/src/qpid/sys/Socket.h
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-1673/qpid/cpp/src/qpid/sys/Socket.h?rev=752300&r1=752299&r2=752300&view=diff
==============================================================================
--- qpid/branches/qpid-1673/qpid/cpp/src/qpid/sys/Socket.h (original)
+++ qpid/branches/qpid-1673/qpid/cpp/src/qpid/sys/Socket.h Tue Mar 10 23:10:57 2009
@@ -27,8 +27,6 @@
#include "qpid/CommonImportExport.h"
#include <string>
-struct sockaddr;
-
namespace qpid {
namespace sys {
@@ -93,7 +91,7 @@
/** Accept a connection from a socket that is already listening
* and has an incoming connection
*/
- QPID_COMMON_EXTERN Socket* accept(struct sockaddr *addr, socklen_t *addrlen) const;
+ QPID_COMMON_EXTERN Socket* accept() const;
// TODO The following are raw operations, maybe they need better wrapping?
QPID_COMMON_EXTERN int read(void *buf, size_t count) const;
Modified: qpid/branches/qpid-1673/qpid/cpp/src/qpid/sys/Thread.h
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-1673/qpid/cpp/src/qpid/sys/Thread.h?rev=752300&r1=752299&r2=752300&view=diff
==============================================================================
--- qpid/branches/qpid-1673/qpid/cpp/src/qpid/sys/Thread.h (original)
+++ qpid/branches/qpid-1673/qpid/cpp/src/qpid/sys/Thread.h Tue Mar 10 23:10:57 2009
@@ -28,6 +28,8 @@
# define QPID_TSS __declspec(thread)
#elif defined (__GNUC__)
# define QPID_TSS __thread
+#elif defined (__SUNPRO_CC)
+# define QPID_TSS __thread
#else
# error "Dont know how to define QPID_TSS for this platform"
#endif
Modified: qpid/branches/qpid-1673/qpid/cpp/src/qpid/sys/epoll/EpollPoller.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-1673/qpid/cpp/src/qpid/sys/epoll/EpollPoller.cpp?rev=752300&r1=752299&r2=752300&view=diff
==============================================================================
--- qpid/branches/qpid-1673/qpid/cpp/src/qpid/sys/epoll/EpollPoller.cpp (original)
+++ qpid/branches/qpid-1673/qpid/cpp/src/qpid/sys/epoll/EpollPoller.cpp Tue Mar 10 23:10:57 2009
@@ -54,17 +54,20 @@
INACTIVE,
HUNGUP,
MONITORED_HUNGUP,
+ INTERRUPTED,
DELETED
};
int fd;
::__uint32_t events;
+ PollerHandle* pollerHandle;
FDStat stat;
Mutex lock;
- PollerHandlePrivate(int f) :
+ PollerHandlePrivate(int f, PollerHandle* p) :
fd(f),
events(0),
+ pollerHandle(p),
stat(ABSENT) {
}
@@ -101,6 +104,14 @@
stat = HUNGUP;
}
+ bool isInterrupted() const {
+ return stat == INTERRUPTED;
+ }
+
+ void setInterrupted() {
+ stat = INTERRUPTED;
+ }
+
bool isDeleted() const {
return stat == DELETED;
}
@@ -111,7 +122,7 @@
};
PollerHandle::PollerHandle(const IOHandle& h) :
- impl(new PollerHandlePrivate(toFd(h.impl)))
+ impl(new PollerHandlePrivate(toFd(h.impl), this))
{}
PollerHandle::~PollerHandle() {
@@ -120,6 +131,10 @@
if (impl->isDeleted()) {
return;
}
+ if (impl->isInterrupted()) {
+ impl->setDeleted();
+ return;
+ }
if (impl->isActive()) {
impl->setDeleted();
}
@@ -243,23 +258,21 @@
::close(epollFd);
}
- void interrupt(bool all=false) {
+ void interrupt() {
::epoll_event epe;
- if (all) {
- // Not EPOLLONESHOT, so we eventually get all threads
- epe.events = ::EPOLLIN;
- epe.data.u64 = 0; // Keep valgrind happy
- } else {
- // Use EPOLLONESHOT so we only wake a single thread
- epe.events = ::EPOLLIN | ::EPOLLONESHOT;
- epe.data.u64 = 0; // Keep valgrind happy
- epe.data.ptr = &static_cast<PollerHandle&>(interruptHandle);
- }
+ // Use EPOLLONESHOT so we only wake a single thread
+ epe.events = ::EPOLLIN | ::EPOLLONESHOT;
+ epe.data.u64 = 0; // Keep valgrind happy
+ epe.data.ptr = &static_cast<PollerHandle&>(interruptHandle);
QPID_POSIX_CHECK(::epoll_ctl(epollFd, EPOLL_CTL_MOD, alwaysReadableFd, &epe));
}
void interruptAll() {
- interrupt(true);
+ ::epoll_event epe;
+ // Not EPOLLONESHOT, so we eventually get all threads
+ epe.events = ::EPOLLIN;
+ epe.data.u64 = 0; // Keep valgrind happy
+ QPID_POSIX_CHECK(::epoll_ctl(epollFd, EPOLL_CTL_MOD, alwaysReadableFd, &epe));
}
};
@@ -281,7 +294,7 @@
epe.events = eh.events | PollerPrivate::directionToEpollEvent(dir);
}
epe.data.u64 = 0; // Keep valgrind happy
- epe.data.ptr = &handle;
+ epe.data.ptr = &eh;
QPID_POSIX_CHECK(::epoll_ctl(impl->epollFd, op, eh.fd, &epe));
@@ -312,7 +325,7 @@
::epoll_event epe;
epe.events = PollerPrivate::directionToEpollEvent(dir) | ::EPOLLONESHOT;
epe.data.u64 = 0; // Keep valgrind happy
- epe.data.ptr = &handle;
+ epe.data.ptr = &eh;
QPID_POSIX_CHECK(::epoll_ctl(impl->epollFd, EPOLL_CTL_MOD, eh.fd, &epe));
@@ -329,7 +342,7 @@
::epoll_event epe;
epe.events = eh.events;
epe.data.u64 = 0; // Keep valgrind happy
- epe.data.ptr = &handle;
+ epe.data.ptr = &eh;
QPID_POSIX_CHECK(::epoll_ctl(impl->epollFd, EPOLL_CTL_MOD, eh.fd, &epe));
@@ -355,15 +368,14 @@
{
PollerHandlePrivate& eh = *handle.impl;
ScopedLock<Mutex> l(eh.lock);
- if (eh.isInactive()) {
+ if (!eh.isActive()) {
return false;
}
::epoll_event epe;
epe.events = 0;
epe.data.u64 = 0; // Keep valgrind happy
- epe.data.ptr = &eh;
QPID_POSIX_CHECK(::epoll_ctl(impl->epollFd, EPOLL_CTL_MOD, eh.fd, &epe));
- eh.setInactive();
+ eh.setInterrupted();
}
PollerPrivate::InterruptHandle& ih = impl->interruptHandle;
@@ -422,37 +434,54 @@
#else
int rc = ::epoll_pwait(impl->epollFd, &epe, 1, timeoutMs, &impl->sigMask);
#endif
- // Check for shutdown
- if (impl->isShutdown) {
- PollerHandleDeletionManager.markAllUnusedInThisThread();
- return Event(0, SHUTDOWN);
- }
if (rc ==-1 && errno != EINTR) {
QPID_POSIX_CHECK(rc);
} else if (rc > 0) {
assert(rc == 1);
- PollerHandle* handle = static_cast<PollerHandle*>(epe.data.ptr);
+ void* dataPtr = epe.data.ptr;
- PollerHandlePrivate& eh = *handle->impl;
+ // Check if this is an interrupt
+ PollerPrivate::InterruptHandle& interruptHandle = impl->interruptHandle;
+ if (dataPtr == &interruptHandle) {
+ PollerHandle* wrappedHandle = 0;
+ {
+ ScopedLock<Mutex> l(interruptHandle.impl->lock);
+ if (interruptHandle.impl->isActive()) {
+ wrappedHandle = interruptHandle.getHandle();
+ // If there is an interrupt queued behind this one we need to arm it
+ // We do it this way so that another thread can pick it up
+ if (interruptHandle.queuedHandles()) {
+ impl->interrupt();
+ interruptHandle.impl->setActive();
+ } else {
+ interruptHandle.impl->setInactive();
+ }
+ }
+ }
+ if (wrappedHandle) {
+ ScopedLock<Mutex> l(wrappedHandle->impl->lock);
+ if (!wrappedHandle->impl->isDeleted()) {
+ wrappedHandle->impl->setInactive();
+ return Event(wrappedHandle, INTERRUPTED);
+ }
+ PollerHandleDeletionManager.markForDeletion(wrappedHandle->impl);
+ }
+ continue;
+ }
+
+ // Check for shutdown
+ if (impl->isShutdown) {
+ PollerHandleDeletionManager.markAllUnusedInThisThread();
+ return Event(0, SHUTDOWN);
+ }
+
+ PollerHandlePrivate& eh = *static_cast<PollerHandlePrivate*>(dataPtr);
ScopedLock<Mutex> l(eh.lock);
// the handle could have gone inactive since we left the epoll_wait
if (eh.isActive()) {
-
- // Check if this is an interrupt
- if (handle == &impl->interruptHandle) {
- PollerHandle* wrappedHandle = impl->interruptHandle.getHandle();
- // If there is an interrupt queued behind this one we need to arm it
- // We do it this way so that another thread can pick it up
- if (impl->interruptHandle.queuedHandles()) {
- impl->interrupt();
- eh.setActive();
- } else {
- eh.setInactive();
- }
- return Event(wrappedHandle, INTERRUPTED);
- }
+ PollerHandle* handle = eh.pollerHandle;
// If the connection has been hungup we could still be readable
// (just not writable), allow us to readable until we get here again
Modified: qpid/branches/qpid-1673/qpid/cpp/src/qpid/sys/posix/AsynchIO.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-1673/qpid/cpp/src/qpid/sys/posix/AsynchIO.cpp?rev=752300&r1=752299&r2=752300&view=diff
==============================================================================
--- qpid/branches/qpid-1673/qpid/cpp/src/qpid/sys/posix/AsynchIO.cpp (original)
+++ qpid/branches/qpid-1673/qpid/cpp/src/qpid/sys/posix/AsynchIO.cpp Tue Mar 10 23:10:57 2009
@@ -123,7 +123,7 @@
// TODO: Currently we ignore the peers address, perhaps we should
// log it or use it for connection acceptance.
try {
- s = socket.accept(0, 0);
+ s = socket.accept();
if (s) {
acceptedCallback(*s);
} else {
@@ -474,7 +474,7 @@
break;
} else {
// Report error then just treat as a socket disconnect
- QPID_LOG(error, "Error reading socket: " << qpid::sys::strError(rc) << "(" << rc << ")" );
+ QPID_LOG(error, "Error reading socket: " << qpid::sys::strError(errno) << "(" << errno << ")" );
eofCallback(*this);
h.unwatchRead();
break;
Modified: qpid/branches/qpid-1673/qpid/cpp/src/qpid/sys/posix/Socket.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-1673/qpid/cpp/src/qpid/sys/posix/Socket.cpp?rev=752300&r1=752299&r2=752300&view=diff
==============================================================================
--- qpid/branches/qpid-1673/qpid/cpp/src/qpid/sys/posix/Socket.cpp (original)
+++ qpid/branches/qpid-1673/qpid/cpp/src/qpid/sys/posix/Socket.cpp Tue Mar 10 23:10:57 2009
@@ -108,7 +108,7 @@
{
int& socket = impl->fd;
if (socket != -1) Socket::close();
- int s = ::socket (PF_INET, SOCK_STREAM, 0);
+ int s = ::socket (AF_INET, SOCK_STREAM, 0);
if (s < 0) throw QPID_POSIX_ERROR(errno);
socket = s;
}
@@ -138,25 +138,30 @@
}
}
-void Socket::connect(const std::string& host, uint16_t port) const
+void Socket::connect(const std::string& host, uint16_t p) const
{
- std::stringstream namestream;
- namestream << host << ":" << port;
- connectname = namestream.str();
+ std::stringstream portstream;
+ portstream << p;
+ std::string port = portstream.str();
+ connectname = host + ":" + port;
const int& socket = impl->fd;
- struct sockaddr_in name;
- name.sin_family = AF_INET;
- name.sin_port = htons(port);
- // TODO: Be good to make this work for IPv6 as well as IPv4
- // Use more modern lookup functions
- struct hostent* hp = gethostbyname ( host.c_str() );
- if (hp == 0)
- throw Exception(QPID_MSG("Cannot resolve " << host << ": " << h_errstr(h_errno)));
- ::memcpy(&name.sin_addr.s_addr, hp->h_addr_list[0], hp->h_length);
- if ((::connect(socket, (struct sockaddr*)(&name), sizeof(name)) < 0) &&
- (errno != EINPROGRESS))
+
+ ::addrinfo *res;
+ ::addrinfo hints;
+ ::memset(&hints, 0, sizeof(hints));
+ hints.ai_family = AF_INET; // In order to allow AF_INET6 we'd have to change createTcp() as well
+ hints.ai_socktype = SOCK_STREAM;
+ int n = ::getaddrinfo(host.c_str(), port.c_str(), &hints, &res);
+ if (n != 0)
+ throw Exception(QPID_MSG("Cannot resolve " << host << ": " << ::gai_strerror(n)));
+ // TODO the correct thing to do here is loop on failure until you've used all the returned addresses
+ if ((::connect(socket, res->ai_addr, res->ai_addrlen) < 0) &&
+ (errno != EINPROGRESS)) {
+ ::freeaddrinfo(res);
throw qpid::Exception(QPID_MSG(strError(errno) << ": " << host << ":" << port));
+ }
+ ::freeaddrinfo(res);
}
void
@@ -189,9 +194,9 @@
return ntohs(name.sin_port);
}
-Socket* Socket::accept(struct sockaddr *addr, socklen_t *addrlen) const
+Socket* Socket::accept() const
{
- int afd = ::accept(impl->fd, addr, addrlen);
+ int afd = ::accept(impl->fd, 0, 0);
if ( afd >= 0)
return new Socket(new IOHandlePrivate(afd));
else if (errno == EAGAIN)
@@ -238,7 +243,7 @@
uint16_t Socket::getRemotePort() const
{
- return atoi(getService(impl->fd, true).c_str());
+ return std::atoi(getService(impl->fd, true).c_str());
}
int Socket::getError() const
---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project: http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org