You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by sh...@apache.org on 2009/03/11 00:11:10 UTC

svn commit: r752300 [3/12] - in /qpid/branches/qpid-1673/qpid: cpp/ cpp/examples/ cpp/examples/direct/ cpp/examples/failover/ cpp/examples/fanout/ cpp/examples/pub-sub/ cpp/examples/qmf-console/ cpp/examples/request-response/ cpp/examples/tradedemo/ cp...

Modified: qpid/branches/qpid-1673/qpid/cpp/src/qpid/cluster/Connection.h
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-1673/qpid/cpp/src/qpid/cluster/Connection.h?rev=752300&r1=752299&r2=752300&view=diff
==============================================================================
--- qpid/branches/qpid-1673/qpid/cpp/src/qpid/cluster/Connection.h (original)
+++ qpid/branches/qpid-1673/qpid/cpp/src/qpid/cluster/Connection.h Tue Mar 10 23:10:57 2009
@@ -27,14 +27,15 @@
 #include "OutputInterceptor.h"
 #include "NoOpConnectionOutputHandler.h"
 #include "EventFrame.h"
+#include "McastFrameHandler.h"
 
 #include "qpid/broker/Connection.h"
 #include "qpid/amqp_0_10/Connection.h"
 #include "qpid/sys/AtomicValue.h"
 #include "qpid/sys/ConnectionInputHandler.h"
 #include "qpid/sys/ConnectionOutputHandler.h"
-#include "qpid/framing/FrameDecoder.h"
 #include "qpid/framing/SequenceNumber.h"
+#include "qpid/framing/FrameDecoder.h"
 
 #include <iosfwd>
 
@@ -63,10 +64,10 @@
   public:
     typedef sys::PollableQueue<EventFrame> PollableFrameQueue;
 
-    /** Local connection, use this in ConnectionId */
-    Connection(Cluster&, sys::ConnectionOutputHandler& out, const std::string& id, MemberId, bool catchUp, bool isLink);
-    /** Shadow connection */
-    Connection(Cluster&, sys::ConnectionOutputHandler& out, const std::string& id, ConnectionId);
+    /** Local connection. */
+    Connection(Cluster&, sys::ConnectionOutputHandler& out, const std::string& logId, MemberId, bool catchUp, bool isLink);
+    /** Shadow connection. */
+    Connection(Cluster&, sys::ConnectionOutputHandler& out, const std::string& logId, const ConnectionId& id);
     ~Connection();
     
     ConnectionId getId() const { return self; }
@@ -99,7 +100,7 @@
     /** Called if the connectors member has left the cluster */
     void left();
     
-    // ConnectionCodec methods
+    // ConnectionCodec methods - called by IO layer with a read buffer.
     size_t decode(const char* buffer, size_t size);
 
     // Called for data delivered from the cluster.
@@ -117,9 +118,9 @@
                       const framing::SequenceNumber& received,
                       const framing::SequenceSet& unknownCompleted, const SequenceSet& receivedIncomplete);
     
-    void shadowReady(uint64_t memberId, uint64_t connectionId, const std::string& username);
+    void shadowReady(uint64_t memberId, uint64_t connectionId, const std::string& username, const std::string& fragment);
 
-    void membership(const framing::FieldTable&, const framing::FieldTable&, uint64_t frameId);
+    void membership(const framing::FieldTable&, const framing::FieldTable&);
 
     void deliveryRecord(const std::string& queue,
                         const framing::SequenceNumber& position,
@@ -134,6 +135,7 @@
                         uint32_t credit);
 
     void queuePosition(const std::string&, const framing::SequenceNumber&);
+    void expiryId(uint64_t);
 
     void txStart();
     void txAccept(const framing::SequenceSet&);
@@ -148,8 +150,12 @@
     void exchange(const std::string& encoded);
 
     void giveReadCredit(int credit);
-    
+
   private:
+    struct NullFrameHandler : public framing::FrameHandler {
+        void handle(framing::AMQFrame&) {}
+    };
+    
     void init();
     bool checkUnsupported(const framing::AMQBody& body);
     void deliverClose();
@@ -174,6 +180,8 @@
     framing::ChannelId currentChannel;
     boost::shared_ptr<broker::TxBuffer> txBuffer;
     bool expectProtocolHeader;
+    McastFrameHandler mcastFrameHandler;
+    NullFrameHandler nullFrameHandler;
 
     static qpid::sys::AtomicValue<uint64_t> catchUpId;
     

Modified: qpid/branches/qpid-1673/qpid/cpp/src/qpid/cluster/ConnectionCodec.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-1673/qpid/cpp/src/qpid/cluster/ConnectionCodec.cpp?rev=752300&r1=752299&r2=752300&view=diff
==============================================================================
--- qpid/branches/qpid-1673/qpid/cpp/src/qpid/cluster/ConnectionCodec.cpp (original)
+++ qpid/branches/qpid-1673/qpid/cpp/src/qpid/cluster/ConnectionCodec.cpp Tue Mar 10 23:10:57 2009
@@ -44,18 +44,15 @@
     return 0;
 }
 
-// Used for outgoing Link connections, we don't care.
+// Used for outgoing Link connections
 sys::ConnectionCodec*
-ConnectionCodec::Factory::create(sys::OutputControl& out, const std::string& id) {
-    return new ConnectionCodec(out, id, cluster, false, true);
-    //return next->create(out, id);
+ConnectionCodec::Factory::create(sys::OutputControl& out, const std::string& logId) {
+    return new ConnectionCodec(out, logId, cluster, false, true);
 }
 
-ConnectionCodec::ConnectionCodec(sys::OutputControl& out, const std::string& id, Cluster& cluster, bool catchUp, bool isLink)
-    : codec(out, id, isLink),
-      interceptor(new Connection(cluster, codec, id, cluster.getId(), catchUp, isLink)),
-      id(interceptor->getId()),
-      localId(id)
+ConnectionCodec::ConnectionCodec(sys::OutputControl& out, const std::string& logId, Cluster& cluster, bool catchUp, bool isLink)
+    : codec(out, logId, isLink),
+      interceptor(new Connection(cluster, codec, logId, cluster.getId(), catchUp, isLink))
 {
     std::auto_ptr<sys::ConnectionInputHandler> ih(new ProxyInputHandler(interceptor));
     codec.setInputHandler(ih);

Modified: qpid/branches/qpid-1673/qpid/cpp/src/qpid/cluster/ConnectionCodec.h
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-1673/qpid/cpp/src/qpid/cluster/ConnectionCodec.h?rev=752300&r1=752299&r2=752300&view=diff
==============================================================================
--- qpid/branches/qpid-1673/qpid/cpp/src/qpid/cluster/ConnectionCodec.h (original)
+++ qpid/branches/qpid-1673/qpid/cpp/src/qpid/cluster/ConnectionCodec.h Tue Mar 10 23:10:57 2009
@@ -56,7 +56,7 @@
         sys::ConnectionCodec* create(sys::OutputControl&, const std::string& id);
     };
 
-    ConnectionCodec(sys::OutputControl& out, const std::string& id, Cluster& c, bool catchUp, bool isLink);
+    ConnectionCodec(sys::OutputControl& out, const std::string& logId, Cluster& c, bool catchUp, bool isLink);
     ~ConnectionCodec();
 
     // ConnectionCodec functions.
@@ -71,8 +71,6 @@
   private:
     amqp_0_10::Connection codec;
     boost::intrusive_ptr<cluster::Connection> interceptor;
-    cluster::ConnectionId id;
-    std::string localId;
 };
 
 }} // namespace qpid::cluster

Modified: qpid/branches/qpid-1673/qpid/cpp/src/qpid/cluster/Cpg.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-1673/qpid/cpp/src/qpid/cluster/Cpg.cpp?rev=752300&r1=752299&r2=752300&view=diff
==============================================================================
--- qpid/branches/qpid-1673/qpid/cpp/src/qpid/cluster/Cpg.cpp (original)
+++ qpid/branches/qpid-1673/qpid/cpp/src/qpid/cluster/Cpg.cpp Tue Mar 10 23:10:57 2009
@@ -107,17 +107,16 @@
     check(cpg_leave(handle, &group), cantLeaveMsg(group));
 }
 
-bool Cpg::isFlowControlEnabled() {
-    cpg_flow_control_state_t flowState;
-    check(cpg_flow_control_state_get(handle, &flowState), "Cannot get CPG flow control status.");
-    return flowState == CPG_FLOW_CONTROL_ENABLED;
-}
+
+
 
 bool Cpg::mcast(const iovec* iov, int iovLen) {
-    if (isFlowControlEnabled()) {
-        QPID_LOG(debug, "CPG flow control enabled")
+    // Check for flow control
+    cpg_flow_control_state_t flowState;
+    check(cpg_flow_control_state_get(handle, &flowState), "Cannot get CPG flow control status.");
+    if (flowState == CPG_FLOW_CONTROL_ENABLED)
         return false;
-    }
+
     cpg_error_t result;
     do {
         result = cpg_mcast_joined(handle, CPG_TYPE_AGREED, const_cast<iovec*>(iov), iovLen);

Modified: qpid/branches/qpid-1673/qpid/cpp/src/qpid/cluster/Cpg.h
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-1673/qpid/cpp/src/qpid/cluster/Cpg.h?rev=752300&r1=752299&r2=752300&view=diff
==============================================================================
--- qpid/branches/qpid-1673/qpid/cpp/src/qpid/cluster/Cpg.h (original)
+++ qpid/branches/qpid-1673/qpid/cpp/src/qpid/cluster/Cpg.h Tue Mar 10 23:10:57 2009
@@ -114,8 +114,6 @@
 
     int getFd();
     
-    bool isFlowControlEnabled();
-    
   private:
     static std::string errorStr(cpg_error_t err, const std::string& msg);
     static std::string cantJoinMsg(const Name&);

Modified: qpid/branches/qpid-1673/qpid/cpp/src/qpid/cluster/Event.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-1673/qpid/cpp/src/qpid/cluster/Event.cpp?rev=752300&r1=752299&r2=752300&view=diff
==============================================================================
--- qpid/branches/qpid-1673/qpid/cpp/src/qpid/cluster/Event.cpp (original)
+++ qpid/branches/qpid-1673/qpid/cpp/src/qpid/cluster/Event.cpp Tue Mar 10 23:10:57 2009
@@ -23,6 +23,7 @@
 #include "Cpg.h"
 #include "qpid/framing/Buffer.h"
 #include "qpid/framing/AMQFrame.h"
+#include "qpid/assert.h"
 #include <ostream>
 #include <iterator>
 #include <algorithm>
@@ -31,6 +32,7 @@
 namespace cluster {
 
 using framing::Buffer;
+using framing::AMQFrame;
 
 const size_t EventHeader::HEADER_SIZE =
     sizeof(uint8_t) +  // type
@@ -42,7 +44,7 @@
     ;
 
 EventHeader::EventHeader(EventType t, const ConnectionId& c,  size_t s)
-    : type(t), connectionId(c), size(s), sequence(0) {}
+    : type(t), connectionId(c), size(s) {}
 
 
 Event::Event() {}
@@ -57,7 +59,7 @@
     type = (EventType)buf.getOctet();
     if(type != DATA && type != CONTROL)
         throw Exception("Invalid multicast event type");
-    connectionId = ConnectionId(m, reinterpret_cast<Connection*>(buf.getLongLong()));
+    connectionId = ConnectionId(m, buf.getLongLong());
     size = buf.getLong();
 #ifdef QPID_LATENCY_METRIC
     latency_metric_timestamp = buf.getLongLong();
@@ -74,14 +76,17 @@
     return e;
 }
 
-Event Event::control(const framing::AMQBody& body, const ConnectionId& cid) {
-    framing::AMQFrame f(body);
+Event Event::control(const framing::AMQFrame& f, const ConnectionId& cid) {
     Event e(CONTROL, cid, f.encodedSize());
     Buffer buf(e);
     f.encode(buf);
     return e;
 }
 
+Event Event::control(const framing::AMQBody& body, const ConnectionId& cid) {
+    return control(framing::AMQFrame(body), cid);
+}
+
 iovec Event::toIovec() {
     encodeHeader();
     iovec iov = { const_cast<char*>(getStore()), getStoreSize() };
@@ -90,7 +95,7 @@
 
 void EventHeader::encode(Buffer& b) const {
     b.putOctet(type);
-    b.putLongLong(reinterpret_cast<uint64_t>(connectionId.getPointer()));
+    b.putLongLong(connectionId.getNumber());
     b.putLong(size);
 #ifdef QPID_LATENCY_METRIC
     b.putLongLong(latency_metric_timestamp);
@@ -108,12 +113,22 @@
     return Buffer(const_cast<char*>(getData()), getSize());
 }
 
+AMQFrame Event::getFrame() const {
+    assert(type == CONTROL);
+    Buffer buf(*this);
+    AMQFrame frame;
+    QPID_ASSERT(frame.decode(buf));
+    return frame;
+}
+
 static const char* EVENT_TYPE_NAMES[] = { "data", "control" };
 
+std::ostream& operator << (std::ostream& o, EventType t) {
+    return o << EVENT_TYPE_NAMES[t];
+}
+
 std::ostream& operator << (std::ostream& o, const EventHeader& e) {
-    o << "[event " << e.getConnectionId()  << "/" << e.getSequence()
-      << " " << EVENT_TYPE_NAMES[e.getType()]
-      << " " << e.getSize() << " bytes]";
+    o << "Event[" << e.getConnectionId() << " " << e.getType() << " " << e.getSize() << " bytes]";
     return o;
 }
 

Modified: qpid/branches/qpid-1673/qpid/cpp/src/qpid/cluster/Event.h
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-1673/qpid/cpp/src/qpid/cluster/Event.h?rev=752300&r1=752299&r2=752300&view=diff
==============================================================================
--- qpid/branches/qpid-1673/qpid/cpp/src/qpid/cluster/Event.h (original)
+++ qpid/branches/qpid-1673/qpid/cpp/src/qpid/cluster/Event.h Tue Mar 10 23:10:57 2009
@@ -24,6 +24,7 @@
 
 #include "types.h"
 #include "qpid/RefCountedBuffer.h"
+#include "qpid/framing/AMQFrame.h"
 #include "qpid/sys/LatencyMetric.h"
 #include <sys/uio.h>            // For iovec
 #include <iosfwd>
@@ -34,6 +35,7 @@
 
 namespace framing {
 class AMQBody;
+class AMQFrame;
 class Buffer;
 }
 
@@ -55,11 +57,9 @@
     /** Size of header + payload. */ 
     size_t getStoreSize() { return size + HEADER_SIZE; }
 
-    uint64_t getSequence() const { return sequence; }
-    void setSequence(uint64_t n) { sequence = n; }
-
-    bool isCluster() const { return connectionId.getPointer() == 0; }
-    bool isConnection() const { return connectionId.getPointer() != 0; }
+    bool isCluster() const { return connectionId.getNumber() == 0; }
+    bool isConnection() const { return connectionId.getNumber() != 0; }
+    bool isControl() const { return type == CONTROL; }
 
   protected:
     static const size_t HEADER_SIZE;
@@ -67,7 +67,6 @@
     EventType type;
     ConnectionId connectionId;
     size_t size;
-    uint64_t sequence;
 };
 
 /**
@@ -83,8 +82,11 @@
     /** Create an event copied from delivered data. */
     static Event decodeCopy(const MemberId& m, framing::Buffer&);
 
-    /** Create an event containing a control */
+    /** Create a control event. */
     static Event control(const framing::AMQBody&, const ConnectionId&);
+
+    /** Create a control event. */
+    static Event control(const framing::AMQFrame&, const ConnectionId&);
     
     // Data excluding header.
     char* getData() { return store + HEADER_SIZE; }
@@ -93,6 +95,8 @@
     // Store including header
     char* getStore() { return store; }
     const char* getStore() const { return store; }
+
+    framing::AMQFrame getFrame() const;        
     
     operator framing::Buffer() const;
 
@@ -105,6 +109,7 @@
 };
 
 std::ostream& operator << (std::ostream&, const EventHeader&);
+
 }} // namespace qpid::cluster
 
 #endif  /*!QPID_CLUSTER_EVENT_H*/

Modified: qpid/branches/qpid-1673/qpid/cpp/src/qpid/cluster/EventFrame.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-1673/qpid/cpp/src/qpid/cluster/EventFrame.cpp?rev=752300&r1=752299&r2=752300&view=diff
==============================================================================
--- qpid/branches/qpid-1673/qpid/cpp/src/qpid/cluster/EventFrame.cpp (original)
+++ qpid/branches/qpid-1673/qpid/cpp/src/qpid/cluster/EventFrame.cpp Tue Mar 10 23:10:57 2009
@@ -24,16 +24,18 @@
 namespace qpid {
 namespace cluster {
 
-EventFrame::EventFrame() : sequence(0) {}
+EventFrame::EventFrame() {}
 
 EventFrame::EventFrame(const EventHeader& e, const framing::AMQFrame& f, int rc)
-    : connectionId(e.getConnectionId()), frame(f), sequence(e.getSequence()), readCredit(rc)
+    : connectionId(e.getConnectionId()), frame(f), readCredit(rc), type(e.getType())
 {
     QPID_LATENCY_INIT(frame);
 }
 
 std::ostream& operator<<(std::ostream& o, const EventFrame& e) {
-    return o << e.connectionId << "/" << e.sequence << " " << e.frame << " rc=" << e.readCredit;
+    return o << e.frame  << " " << e.type << " " << e.connectionId;
+    if (e.readCredit) o << " read-credit=" << e.readCredit;
+    return o;
 }
 
 }} // namespace qpid::cluster

Modified: qpid/branches/qpid-1673/qpid/cpp/src/qpid/cluster/EventFrame.h
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-1673/qpid/cpp/src/qpid/cluster/EventFrame.h?rev=752300&r1=752299&r2=752300&view=diff
==============================================================================
--- qpid/branches/qpid-1673/qpid/cpp/src/qpid/cluster/EventFrame.h (original)
+++ qpid/branches/qpid-1673/qpid/cpp/src/qpid/cluster/EventFrame.h Tue Mar 10 23:10:57 2009
@@ -42,22 +42,15 @@
 
     EventFrame(const EventHeader& e, const framing::AMQFrame& f, int rc=0);
 
-    bool isCluster() const { return !connectionId.getPointer(); }
-    bool isConnection() const { return connectionId.getPointer(); }
+    bool isCluster() const { return connectionId.getNumber() == 0; }
+    bool isConnection() const { return connectionId.getNumber() != 0; }
     bool isLastInEvent() const { return readCredit; }
 
 
-    // True if this frame follows immediately after frame e. 
-    bool follows(const EventFrame& e) const {
-        return sequence == e.sequence || (sequence == e.sequence+1 && e.readCredit);
-    }
-
-    bool operator<(const EventFrame& e) const { return sequence < e.sequence; }
-    
     ConnectionId connectionId;
     framing::AMQFrame frame;   
-    uint64_t sequence;
-    int readCredit;             // last frame in an event, give credit when processed.
+    int readCredit; ///< last frame in an event, give credit when processed.
+    EventType type;
 };
 
 std::ostream& operator<<(std::ostream& o, const EventFrame& e);

Modified: qpid/branches/qpid-1673/qpid/cpp/src/qpid/cluster/ExpiryPolicy.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-1673/qpid/cpp/src/qpid/cluster/ExpiryPolicy.cpp?rev=752300&r1=752299&r2=752300&view=diff
==============================================================================
--- qpid/branches/qpid-1673/qpid/cpp/src/qpid/cluster/ExpiryPolicy.cpp (original)
+++ qpid/branches/qpid-1673/qpid/cpp/src/qpid/cluster/ExpiryPolicy.cpp Tue Mar 10 23:10:57 2009
@@ -30,48 +30,46 @@
 namespace qpid {
 namespace cluster {
 
-ExpiryPolicy::ExpiryPolicy(const boost::function<bool()> & f, Multicaster& m, const MemberId& id, broker::Timer& t)
-    : expiredPolicy(new Expired), isLeader(f), mcast(m), memberId(id), timer(t) {}
-
-namespace {
-uint64_t clusterId(const broker::Message& m) {
-    assert(m.getFrames().begin() != m.getFrames().end());
-    return m.getFrames().begin()->getClusterId();
-}
+ExpiryPolicy::ExpiryPolicy(Multicaster& m, const MemberId& id, broker::Timer& t)
+    : expiryId(0), expiredPolicy(new Expired), mcast(m), memberId(id), timer(t) {}
 
 struct ExpiryTask : public broker::TimerTask {
     ExpiryTask(const boost::intrusive_ptr<ExpiryPolicy>& policy, uint64_t id, sys::AbsTime when)
-        : TimerTask(when), expiryPolicy(policy), messageId(id) {}
-    void fire() { expiryPolicy->sendExpire(messageId); }
+        : TimerTask(when), expiryPolicy(policy), expiryId(id) {}
+    void fire() { expiryPolicy->sendExpire(expiryId); }
     boost::intrusive_ptr<ExpiryPolicy> expiryPolicy;
-    const uint64_t messageId;
+    const uint64_t expiryId;
 };
-}
 
 void ExpiryPolicy::willExpire(broker::Message& m) {
-    timer.add(new ExpiryTask(this, clusterId(m), m.getExpiration()));
+    uint64_t id = expiryId++;
+    assert(unexpiredById.find(id) == unexpiredById.end());
+    assert(unexpiredByMessage.find(&m) == unexpiredByMessage.end());
+    unexpiredById[id] = &m;
+    unexpiredByMessage[&m] = id;
+    timer.add(new ExpiryTask(this, id, m.getExpiration()));
 }
 
 bool ExpiryPolicy::hasExpired(broker::Message& m) {
-    sys::Mutex::ScopedLock l(lock);
-    IdSet::iterator i = expired.find(clusterId(m));
-    if (i != expired.end()) {
-        expired.erase(i);
-        const_cast<broker::Message&>(m).setExpiryPolicy(expiredPolicy); // hasExpired() == true; 
-        return true;
-    }
-    return false;
+    return unexpiredByMessage.find(&m) == unexpiredByMessage.end();
 }
 
 void ExpiryPolicy::sendExpire(uint64_t id) {
-    sys::Mutex::ScopedLock l(lock);
-    if (isLeader()) 
-        mcast.mcastControl(framing::ClusterMessageExpiredBody(framing::ProtocolVersion(), id), memberId);
+    mcast.mcastControl(framing::ClusterMessageExpiredBody(framing::ProtocolVersion(), id), memberId);
 }
 
 void ExpiryPolicy::deliverExpire(uint64_t id) {
-    sys::Mutex::ScopedLock l(lock);
-    expired.insert(id);
+    IdMessageMap::iterator i = unexpiredById.find(id);
+    if (i != unexpiredById.end()) {
+        i->second->setExpiryPolicy(expiredPolicy); // hasExpired() == true; 
+        unexpiredByMessage.erase(i->second);
+        unexpiredById.erase(i);
+    }
+}
+
+boost::optional<uint64_t> ExpiryPolicy::getId(broker::Message& m) {
+    MessageIdMap::iterator i = unexpiredByMessage.find(&m);
+    return i == unexpiredByMessage.end() ? boost::optional<uint64_t>() : i->second;
 }
 
 bool ExpiryPolicy::Expired::hasExpired(broker::Message&) { return true; }

Modified: qpid/branches/qpid-1673/qpid/cpp/src/qpid/cluster/ExpiryPolicy.h
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-1673/qpid/cpp/src/qpid/cluster/ExpiryPolicy.h?rev=752300&r1=752299&r2=752300&view=diff
==============================================================================
--- qpid/branches/qpid-1673/qpid/cpp/src/qpid/cluster/ExpiryPolicy.h (original)
+++ qpid/branches/qpid-1673/qpid/cpp/src/qpid/cluster/ExpiryPolicy.h Tue Mar 10 23:10:57 2009
@@ -27,11 +27,15 @@
 #include "qpid/sys/Mutex.h"
 #include <boost/function.hpp>
 #include <boost/intrusive_ptr.hpp>
-#include <set>
+#include <boost/optional.hpp>
+#include <map>
 
 namespace qpid {
 
-namespace broker { class Timer; }
+namespace broker {
+class Timer;
+class Message;
+}
 
 namespace cluster {
 class Multicaster;
@@ -42,7 +46,7 @@
 class ExpiryPolicy : public broker::ExpiryPolicy
 {
   public:
-    ExpiryPolicy(const boost::function<bool()> & isLeader, Multicaster&, const MemberId&, broker::Timer&);
+    ExpiryPolicy(Multicaster&, const MemberId&, broker::Timer&);
 
     void willExpire(broker::Message&);
 
@@ -54,18 +58,24 @@
     // Cluster delivers expiry notice.
     void deliverExpire(uint64_t);
 
+    void setId(uint64_t id) { expiryId = id; }
+    uint64_t getId() const { return expiryId; }
+    
+    boost::optional<uint64_t> getId(broker::Message&);
+    
   private:
-    sys::Mutex lock;
-    typedef std::set<uint64_t> IdSet;
+    typedef std::map<broker::Message*,  uint64_t> MessageIdMap;
+    typedef std::map<uint64_t, broker::Message*> IdMessageMap;
 
     struct Expired : public broker::ExpiryPolicy {
         bool hasExpired(broker::Message&);
         void willExpire(broker::Message&);
     };
 
-    IdSet expired;
+    MessageIdMap unexpiredByMessage;
+    IdMessageMap unexpiredById;
+    uint64_t expiryId;
     boost::intrusive_ptr<Expired> expiredPolicy;
-    boost::function<bool()> isLeader;
     Multicaster& mcast;
     MemberId memberId;
     broker::Timer& timer;

Modified: qpid/branches/qpid-1673/qpid/cpp/src/qpid/cluster/Multicaster.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-1673/qpid/cpp/src/qpid/cluster/Multicaster.cpp?rev=752300&r1=752299&r2=752300&view=diff
==============================================================================
--- qpid/branches/qpid-1673/qpid/cpp/src/qpid/cluster/Multicaster.cpp (original)
+++ qpid/branches/qpid-1673/qpid/cpp/src/qpid/cluster/Multicaster.cpp Tue Mar 10 23:10:57 2009
@@ -24,6 +24,7 @@
 #include "qpid/log/Statement.h"
 #include "qpid/sys/LatencyMetric.h"
 #include "qpid/framing/AMQBody.h"
+#include "qpid/framing/AMQFrame.h"
 
 namespace qpid {
 namespace cluster {
@@ -43,6 +44,11 @@
     mcast(Event::control(body, id));
 }
 
+void Multicaster::mcastControl(const framing::AMQFrame& frame, const ConnectionId& id) {
+    QPID_LOG(trace, "MCAST " << id << ": " << frame);
+    mcast(Event::control(frame, id));
+}
+
 void Multicaster::mcastBuffer(const char* data, size_t size, const ConnectionId& id) {
     Event e(DATA, id, size);
     memcpy(e.getData(), data, size);

Modified: qpid/branches/qpid-1673/qpid/cpp/src/qpid/cluster/Multicaster.h
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-1673/qpid/cpp/src/qpid/cluster/Multicaster.h?rev=752300&r1=752299&r2=752300&view=diff
==============================================================================
--- qpid/branches/qpid-1673/qpid/cpp/src/qpid/cluster/Multicaster.h (original)
+++ qpid/branches/qpid-1673/qpid/cpp/src/qpid/cluster/Multicaster.h Tue Mar 10 23:10:57 2009
@@ -50,6 +50,7 @@
                 boost::function<void()> onError
     );
     void mcastControl(const framing::AMQBody& controlBody, const ConnectionId&);
+    void mcastControl(const framing::AMQFrame& controlFrame, const ConnectionId&);
     void mcastBuffer(const char*, size_t, const ConnectionId&);
     void mcast(const Event& e);
     /** End holding mode, held events are mcast */

Modified: qpid/branches/qpid-1673/qpid/cpp/src/qpid/cluster/OutputInterceptor.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-1673/qpid/cpp/src/qpid/cluster/OutputInterceptor.cpp?rev=752300&r1=752299&r2=752300&view=diff
==============================================================================
--- qpid/branches/qpid-1673/qpid/cpp/src/qpid/cluster/OutputInterceptor.cpp (original)
+++ qpid/branches/qpid-1673/qpid/cpp/src/qpid/cluster/OutputInterceptor.cpp Tue Mar 10 23:10:57 2009
@@ -70,17 +70,12 @@
 // Called in write thread when the IO layer has no more data to write.
 // We do nothing in the write thread, we run doOutput only on delivery
 // of doOutput requests.
-bool  OutputInterceptor::doOutput() {
-    QPID_LOG(trace, parent << " write idle.");
-    return false;
-}
+bool  OutputInterceptor::doOutput() { return false; }
 
 // Delivery of doOutput allows us to run the real connection doOutput()
 // which tranfers frames to the codec for writing.
 // 
 void OutputInterceptor::deliverDoOutput(size_t requested) {
-    QPID_LATENCY_RECORD("deliver do-output", *this);
-    QPID_LATENCY_CLEAR(*this);
     size_t buf = getBuffered();
     if (parent.isLocal())
         writeEstimate.delivered(requested, sent, buf); // Update the estimate.
@@ -91,9 +86,7 @@
         moreOutput = parent.getBrokerConnection().doOutput();
     } while (sent < requested && moreOutput);
     sent += buf;                // Include buffered data in the sent total.
-
-    QPID_LOG(trace, "Delivered doOutput: requested=" << requested << " output=" << sent << " more=" << moreOutput);
-
+    QPID_LOG(trace, parent << " delivereDoOutput: requested=" << requested << " sent=" << sent << " more=" << moreOutput);
     if (parent.isLocal() && moreOutput)  {
         QPID_LOG(trace,  parent << " deliverDoOutput - sending doOutput, more output available.");
         sendDoOutput();

Modified: qpid/branches/qpid-1673/qpid/cpp/src/qpid/cluster/PollableQueue.h
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-1673/qpid/cpp/src/qpid/cluster/PollableQueue.h?rev=752300&r1=752299&r2=752300&view=diff
==============================================================================
--- qpid/branches/qpid-1673/qpid/cpp/src/qpid/cluster/PollableQueue.h (original)
+++ qpid/branches/qpid-1673/qpid/cpp/src/qpid/cluster/PollableQueue.h Tue Mar 10 23:10:57 2009
@@ -52,6 +52,8 @@
         }
         catch (const std::exception& e) {
             QPID_LOG(error, message << ": " << e.what());
+            values.clear();
+            this->stop();
             error();
         }
     }

Modified: qpid/branches/qpid-1673/qpid/cpp/src/qpid/cluster/UpdateClient.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-1673/qpid/cpp/src/qpid/cluster/UpdateClient.cpp?rev=752300&r1=752299&r2=752300&view=diff
==============================================================================
--- qpid/branches/qpid-1673/qpid/cpp/src/qpid/cluster/UpdateClient.cpp (original)
+++ qpid/branches/qpid-1673/qpid/cpp/src/qpid/cluster/UpdateClient.cpp Tue Mar 10 23:10:57 2009
@@ -22,6 +22,8 @@
 #include "Cluster.h"
 #include "ClusterMap.h"
 #include "Connection.h"
+#include "Decoder.h"
+#include "ExpiryPolicy.h"
 #include "qpid/client/SessionBase_0_10Access.h" 
 #include "qpid/client/ConnectionAccess.h" 
 #include "qpid/broker/Broker.h"
@@ -86,33 +88,40 @@
 // TODO aconway 2008-09-24: optimization: update connections/sessions in parallel.
 
 UpdateClient::UpdateClient(const MemberId& updater, const MemberId& updatee, const Url& url,
-                           broker::Broker& broker, const ClusterMap& m, uint64_t frameId_,
-                           const Cluster::Connections& cons,
+                           broker::Broker& broker, const ClusterMap& m, ExpiryPolicy& expiry_, 
+                           const Cluster::ConnectionVector& cons, Decoder& decoder_,
                            const boost::function<void()>& ok,
                            const boost::function<void(const std::exception&)>& fail,
                            const client::ConnectionSettings& cs
 )
     : updaterId(updater), updateeId(updatee), updateeUrl(url), updaterBroker(broker), map(m),
-      frameId(frameId_), connections(cons), 
+      expiry(expiry_), connections(cons), decoder(decoder_),
       connection(catchUpConnection()), shadowConnection(catchUpConnection()),
-      done(ok), failed(fail) 
+      done(ok), failed(fail), connectionSettings(cs)
 {
     connection.open(url, cs);
-    session = connection.newSession("update_shared");
+    session = connection.newSession(UPDATE);
 }
 
 UpdateClient::~UpdateClient() {}
 
 // Reserved exchange/queue name for catch-up, avoid clashes with user queues/exchanges.
-const std::string UpdateClient::UPDATE("qpid.qpid-update");
+const std::string UpdateClient::UPDATE("qpid.cluster-update");
+
+void UpdateClient::run() {
+    try {
+        update();
+        done();
+    } catch (const std::exception& e) {
+        failed(e);
+    }
+    delete this;
+}
 
 void UpdateClient::update() {
     QPID_LOG(debug, updaterId << " updating state to " << updateeId << " at " << updateeUrl);
     Broker& b = updaterBroker;
     b.getExchanges().eachExchange(boost::bind(&UpdateClient::updateExchange, this, _1));
-
-    // Update exchange is used to route messages to the proper queue without modifying routing key.
-    session.exchangeDeclare(arg::exchange=UPDATE, arg::type="fanout", arg::autoDelete=true);
     b.getQueues().eachQueue(boost::bind(&UpdateClient::updateQueue, this, _1));
     // Update queue is used to transfer acquired messages that are no longer on their original queue.
     session.queueDeclare(arg::queue=UPDATE, arg::autoDelete=true);
@@ -121,25 +130,15 @@
 
     std::for_each(connections.begin(), connections.end(), boost::bind(&UpdateClient::updateConnection, this, _1));
 
+    ClusterConnectionProxy(session).expiryId(expiry.getId());
     ClusterConnectionMembershipBody membership;
     map.toMethodBody(membership);
-    membership.setFrameId(frameId);
     AMQFrame frame(membership);
     client::ConnectionAccess::getImpl(connection)->handle(frame);
     connection.close();
     QPID_LOG(debug,  updaterId << " updated state to " << updateeId << " at " << updateeUrl);
 }
 
-void UpdateClient::run() {
-    try {
-        update();
-        done();
-    } catch (const std::exception& e) {
-        failed(e);
-    }
-    delete this;
-}
-
 namespace {
 template <class T> std::string encode(const T& t) {
     std::string encoded;
@@ -152,8 +151,7 @@
 
 void UpdateClient::updateExchange(const boost::shared_ptr<Exchange>& ex) {
     QPID_LOG(debug, updaterId << " updating exchange " << ex->getName());
-    ClusterConnectionProxy proxy(session);
-    proxy.exchange(encode(*ex));
+    ClusterConnectionProxy(session).exchange(encode(*ex));
 }
 
 /** Bind a queue to the update exchange and update messges to it
@@ -164,24 +162,40 @@
     bool haveLastPos;
     framing::SequenceNumber lastPos;
     client::AsyncSession session;
-
+    ExpiryPolicy& expiry;
+    
   public:
 
-    MessageUpdater(const string& q, const client::AsyncSession s) : queue(q), haveLastPos(false), session(s) {
+    MessageUpdater(const string& q, const client::AsyncSession s, ExpiryPolicy& expiry_) : queue(q), haveLastPos(false), session(s), expiry(expiry_) {
         session.exchangeBind(queue, UpdateClient::UPDATE);
     }
 
     ~MessageUpdater() {
-        session.exchangeUnbind(queue, UpdateClient::UPDATE);
+        try {
+            session.exchangeUnbind(queue, UpdateClient::UPDATE);
+        }
+        catch (const std::exception& e) {
+            // Don't throw in a destructor.
+            QPID_LOG(error, "Unbinding update queue " << queue << ": " << e.what());
+        }
     }
 
 
     void updateQueuedMessage(const broker::QueuedMessage& message) {
+        // Send the queue position if necessary.
         if (!haveLastPos || message.position - lastPos != 1)  {
             ClusterConnectionProxy(session).queuePosition(queue, message.position.getValue()-1);
             haveLastPos = true;
         }
         lastPos = message.position;
+
+        // Send the expiry ID if necessary.
+        if (message.payload->getProperties<DeliveryProperties>()->getTtl()) {
+            boost::optional<uint64_t> expiryId = expiry.getId(*message.payload);
+            if (!expiryId) return; // Message already expired, don't replicate.
+            ClusterConnectionProxy(session).expiryId(*expiryId);
+        }
+
         SessionBase_0_10Access sb(session);
         framing::MessageTransferBody transfer(
             framing::ProtocolVersion(), UpdateClient::UPDATE, message::ACCEPT_MODE_NONE, message::ACQUIRE_MODE_PRE_ACQUIRED);
@@ -204,16 +218,13 @@
     void updateMessage(const boost::intrusive_ptr<broker::Message>& message) {
         updateQueuedMessage(broker::QueuedMessage(0, message, haveLastPos? lastPos.getValue()+1 : 1));
     }
-    
-   
 };
 
-
 void UpdateClient::updateQueue(const boost::shared_ptr<Queue>& q) {
     QPID_LOG(debug, updaterId << " updating queue " << q->getName());
     ClusterConnectionProxy proxy(session);
     proxy.queue(encode(*q));
-    MessageUpdater updater(q->getName(), session);
+    MessageUpdater updater(q->getName(), session, expiry);
     q->eachMessage(boost::bind(&MessageUpdater::updateQueuedMessage, &updater, _1));
     q->eachBinding(boost::bind(&UpdateClient::updateBinding, this, q->getName(), _1));
 }
@@ -228,13 +239,16 @@
     shadowConnection = catchUpConnection();
 
     broker::Connection& bc = updateConnection->getBrokerConnection();
-    // FIXME aconway 2008-10-20: What authentication info to use on reconnect?
-    shadowConnection.open(updateeUrl, bc.getUserId(), ""/*password*/, "/"/*vhost*/, bc.getFrameMax());
+    connectionSettings.maxFrameSize = bc.getFrameMax();
+    shadowConnection.open(updateeUrl, connectionSettings);
     bc.eachSessionHandler(boost::bind(&UpdateClient::updateSession, this, _1));
+    // Safe to use decoder here because we are stalled for update.
+    std::pair<const char*, size_t> fragment = decoder.get(updateConnection->getId()).getFragment();
     ClusterConnectionProxy(shadowConnection).shadowReady(
         updateConnection->getId().getMember(),
-        reinterpret_cast<uint64_t>(updateConnection->getId().getPointer()),
-        updateConnection->getBrokerConnection().getUserId()
+        updateConnection->getId().getNumber(),
+        bc.getUserId(),
+        string(fragment.first, fragment.second)
     );
     shadowConnection.close();
     QPID_LOG(debug, updaterId << " updated connection " << *updateConnection);
@@ -269,7 +283,7 @@
     SequenceNumber received = ss->receiverGetReceived().command;
     if (inProgress)  
         --received;
-
+             
     // Reset command-sequence state.
     proxy.sessionState(
         ss->senderGetReplayPoint().command,
@@ -285,9 +299,6 @@
     if (inProgress) {
         inProgress->getFrames().map(simpl->out);
     }
-
-    // FIXME aconway 2008-09-23: update session replay list.
-
     QPID_LOG(debug, updaterId << " updated session " << sh.getSession()->getId());
 }
 
@@ -322,7 +333,7 @@
         // If the message is acquired then it is no longer on the
         // updatees queue, put it on the update queue for updatee to pick up.
         //
-        MessageUpdater(UPDATE, shadowSession).updateQueuedMessage(dr.getMessage());
+        MessageUpdater(UPDATE, shadowSession, expiry).updateQueuedMessage(dr.getMessage());
     }
     ClusterConnectionProxy(shadowSession).deliveryRecord(
         dr.getQueue()->getName(),
@@ -341,8 +352,8 @@
 
 class TxOpUpdater : public broker::TxOpConstVisitor, public MessageUpdater {
   public:
-    TxOpUpdater(UpdateClient& dc, client::AsyncSession s)
-        : MessageUpdater(UpdateClient::UPDATE, s), parent(dc), session(s), proxy(s) {}
+    TxOpUpdater(UpdateClient& dc, client::AsyncSession s, ExpiryPolicy& expiry)
+        : MessageUpdater(UpdateClient::UPDATE, s, expiry), parent(dc), session(s), proxy(s) {}
 
     void operator()(const broker::DtxAck& ) {
         throw InternalErrorException("DTX transactions not currently supported by cluster.");
@@ -385,7 +396,7 @@
     broker::TxBuffer::shared_ptr txBuffer = s.getTxBuffer();
     if (txBuffer) {
         proxy.txStart();
-        TxOpUpdater updater(*this, shadowSession);
+        TxOpUpdater updater(*this, shadowSession, expiry);
         txBuffer->accept(updater);
         proxy.txEnd();
     }

Modified: qpid/branches/qpid-1673/qpid/cpp/src/qpid/cluster/UpdateClient.h
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-1673/qpid/cpp/src/qpid/cluster/UpdateClient.h?rev=752300&r1=752299&r2=752300&view=diff
==============================================================================
--- qpid/branches/qpid-1673/qpid/cpp/src/qpid/cluster/UpdateClient.h (original)
+++ qpid/branches/qpid-1673/qpid/cpp/src/qpid/cluster/UpdateClient.h Tue Mar 10 23:10:57 2009
@@ -46,6 +46,7 @@
 class DeliveryRecord;
 class SessionState;
 class SemanticState;
+class Decoder;
 
 } // namespace broker
 
@@ -54,6 +55,8 @@
 class Cluster;
 class Connection;
 class ClusterMap;
+class Decoder;
+class ExpiryPolicy;
 
 /**
  * A client that updates the contents of a local broker to a remote one using AMQP.
@@ -63,8 +66,8 @@
     static const std::string UPDATE; // Name for special update queue and exchange.
     
     UpdateClient(const MemberId& updater, const MemberId& updatee, const Url&,
-                 broker::Broker& donor, const ClusterMap& map, uint64_t sequence,
-                 const std::vector<boost::intrusive_ptr<Connection> >& ,
+                 broker::Broker& donor, const ClusterMap& map, ExpiryPolicy& expiry,
+                 const std::vector<boost::intrusive_ptr<Connection> >&, Decoder&,
                  const boost::function<void()>& done,
                  const boost::function<void(const std::exception&)>& fail,
                  const client::ConnectionSettings& 
@@ -92,12 +95,14 @@
     Url updateeUrl;
     broker::Broker& updaterBroker;
     ClusterMap map;
-    uint64_t frameId;
+    ExpiryPolicy& expiry;
     std::vector<boost::intrusive_ptr<Connection> > connections;
+    Decoder& decoder;
     client::Connection connection, shadowConnection;
     client::AsyncSession session, shadowSession;
     boost::function<void()> done;
     boost::function<void(const std::exception& e)> failed;
+    client::ConnectionSettings connectionSettings;
 };
 
 }} // namespace qpid::cluster

Modified: qpid/branches/qpid-1673/qpid/cpp/src/qpid/cluster/types.h
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-1673/qpid/cpp/src/qpid/cluster/types.h?rev=752300&r1=752299&r2=752300&view=diff
==============================================================================
--- qpid/branches/qpid-1673/qpid/cpp/src/qpid/cluster/types.h (original)
+++ qpid/branches/qpid-1673/qpid/cpp/src/qpid/cluster/types.h Tue Mar 10 23:10:57 2009
@@ -68,16 +68,17 @@
 
 std::ostream& operator<<(std::ostream&, const MemberId&);
 
-struct ConnectionId : public std::pair<MemberId, Connection*>  {
-    ConnectionId(const MemberId& m=MemberId(), Connection* c=0) :  std::pair<MemberId, Connection*> (m,c) {}
-    ConnectionId(uint64_t m, uint64_t c)
-        : std::pair<MemberId, Connection*>(MemberId(m), reinterpret_cast<Connection*>(c)) {}
+struct ConnectionId : public std::pair<MemberId, uint64_t>  {
+    ConnectionId(const MemberId& m=MemberId(), uint64_t c=0) :  std::pair<MemberId, uint64_t> (m,c) {}
+    ConnectionId(uint64_t m, uint64_t c) : std::pair<MemberId, uint64_t>(MemberId(m), c) {}
     MemberId getMember() const { return first; }
-    Connection* getPointer() const { return second; }
+    uint64_t getNumber() const { return second; }
 };
 
 std::ostream& operator<<(std::ostream&, const ConnectionId&);
 
+std::ostream& operator<<(std::ostream&, EventType);
+
 }} // namespace qpid::cluster
 
 #endif  /*!QPID_CLUSTER_TYPES_H*/

Modified: qpid/branches/qpid-1673/qpid/cpp/src/qpid/console/ClassKey.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-1673/qpid/cpp/src/qpid/console/ClassKey.cpp?rev=752300&r1=752299&r2=752300&view=diff
==============================================================================
--- qpid/branches/qpid-1673/qpid/cpp/src/qpid/console/ClassKey.cpp (original)
+++ qpid/branches/qpid-1673/qpid/cpp/src/qpid/console/ClassKey.cpp Tue Mar 10 23:10:57 2009
@@ -21,6 +21,7 @@
 
 #include "ClassKey.h"
 #include <string.h>
+#include <cstdio>
 
 using namespace std;
 using namespace qpid::console;

Modified: qpid/branches/qpid-1673/qpid/cpp/src/qpid/framing/AMQFrame.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-1673/qpid/cpp/src/qpid/framing/AMQFrame.cpp?rev=752300&r1=752299&r2=752300&view=diff
==============================================================================
--- qpid/branches/qpid-1673/qpid/cpp/src/qpid/framing/AMQFrame.cpp (original)
+++ qpid/branches/qpid-1673/qpid/cpp/src/qpid/framing/AMQFrame.cpp Tue Mar 10 23:10:57 2009
@@ -41,7 +41,7 @@
 
 AMQFrame::AMQFrame(const AMQBody& b) : body(b.clone()) { init(); }
 
-AMQFrame::~AMQFrame() {}
+AMQFrame::~AMQFrame() { init(); }
 
 AMQBody* AMQFrame::getBody() {
     // Non-const AMQBody* may be used to modify the body.

Modified: qpid/branches/qpid-1673/qpid/cpp/src/qpid/framing/AMQFrame.h
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-1673/qpid/cpp/src/qpid/framing/AMQFrame.h?rev=752300&r1=752299&r2=752300&view=diff
==============================================================================
--- qpid/branches/qpid-1673/qpid/cpp/src/qpid/framing/AMQFrame.h (original)
+++ qpid/branches/qpid-1673/qpid/cpp/src/qpid/framing/AMQFrame.h Tue Mar 10 23:10:57 2009
@@ -93,9 +93,6 @@
     /** Must point to at least DECODE_SIZE_MIN bytes of data */
     static uint16_t decodeSize(char* data);
 
-    uint64_t getClusterId() const { return clusterId; }
-    void setClusterId(uint64_t id) { clusterId = id; }
-    
   private:
     void init();
 
@@ -107,7 +104,6 @@
     bool bos : 1;
     bool eos : 1;
     mutable uint32_t encodedSizeCache;
-    uint64_t clusterId;         // Used to identify frames in a clustered broekr.
 };
 
 QPID_COMMON_EXTERN std::ostream& operator<<(std::ostream&, const AMQFrame&);

Modified: qpid/branches/qpid-1673/qpid/cpp/src/qpid/framing/FrameDecoder.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-1673/qpid/cpp/src/qpid/framing/FrameDecoder.cpp?rev=752300&r1=752299&r2=752300&view=diff
==============================================================================
--- qpid/branches/qpid-1673/qpid/cpp/src/qpid/framing/FrameDecoder.cpp (original)
+++ qpid/branches/qpid-1673/qpid/cpp/src/qpid/framing/FrameDecoder.cpp Tue Mar 10 23:10:57 2009
@@ -21,8 +21,9 @@
 #include "FrameDecoder.h"
 #include "Buffer.h"
 #include "qpid/log/Statement.h"
-#include <algorithm>
 #include "qpid/framing/reply_exceptions.h"
+#include <algorithm>
+#include <string.h>
 
 namespace qpid {
 namespace framing {
@@ -67,4 +68,13 @@
     return false;
 }
 
+void FrameDecoder::setFragment(const char* data, size_t size) {
+    fragment.resize(size);
+    ::memcpy(&fragment[0], data, size);
+}
+
+std::pair<const char*, size_t> FrameDecoder::getFragment() const {
+    return std::pair<const char*, size_t>(&fragment[0], fragment.size());
+}
+
 }} // namespace qpid::framing

Modified: qpid/branches/qpid-1673/qpid/cpp/src/qpid/framing/FrameDecoder.h
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-1673/qpid/cpp/src/qpid/framing/FrameDecoder.h?rev=752300&r1=752299&r2=752300&view=diff
==============================================================================
--- qpid/branches/qpid-1673/qpid/cpp/src/qpid/framing/FrameDecoder.h (original)
+++ qpid/branches/qpid-1673/qpid/cpp/src/qpid/framing/FrameDecoder.h Tue Mar 10 23:10:57 2009
@@ -35,9 +35,16 @@
 {
   public:
     bool decode(Buffer& buffer);
-    AMQFrame frame;
+    const AMQFrame& getFrame() const { return frame; }
+    AMQFrame& getFrame() { return frame; }
+
+    void setFragment(const char*, size_t);
+    std::pair<const char*, size_t> getFragment() const;
+
   private:
     std::vector<char> fragment;
+    AMQFrame frame;
+
 };
 }} // namespace qpid::framing
 

Modified: qpid/branches/qpid-1673/qpid/cpp/src/qpid/framing/Uuid.h
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-1673/qpid/cpp/src/qpid/framing/Uuid.h?rev=752300&r1=752299&r2=752300&view=diff
==============================================================================
--- qpid/branches/qpid-1673/qpid/cpp/src/qpid/framing/Uuid.h (original)
+++ qpid/branches/qpid-1673/qpid/cpp/src/qpid/framing/Uuid.h Tue Mar 10 23:10:57 2009
@@ -19,8 +19,9 @@
  *
  */
 
-#include "qpid/sys/uuid.h"
 #include "qpid/CommonImportExport.h"
+#include "qpid/sys/uuid.h"
+#include "qpid/sys/IntegerTypes.h"
 
 #include <boost/array.hpp>
 

Modified: qpid/branches/qpid-1673/qpid/cpp/src/qpid/log/Selector.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-1673/qpid/cpp/src/qpid/log/Selector.cpp?rev=752300&r1=752299&r2=752300&view=diff
==============================================================================
--- qpid/branches/qpid-1673/qpid/cpp/src/qpid/log/Selector.cpp (original)
+++ qpid/branches/qpid-1673/qpid/cpp/src/qpid/log/Selector.cpp Tue Mar 10 23:10:57 2009
@@ -20,6 +20,7 @@
 #include "Options.h"
 #include <boost/bind.hpp>
 #include <algorithm>
+#include <string.h>
 
 namespace qpid {
 namespace log {

Modified: qpid/branches/qpid-1673/qpid/cpp/src/qpid/log/posix/SinkOptions.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-1673/qpid/cpp/src/qpid/log/posix/SinkOptions.cpp?rev=752300&r1=752299&r2=752300&view=diff
==============================================================================
--- qpid/branches/qpid-1673/qpid/cpp/src/qpid/log/posix/SinkOptions.cpp (original)
+++ qpid/branches/qpid-1673/qpid/cpp/src/qpid/log/posix/SinkOptions.cpp Tue Mar 10 23:10:57 2009
@@ -42,10 +42,14 @@
         struct NameValue { const char* name; int value; };
         NameValue nameValue[] = {
             { "AUTH", LOG_AUTH },
+#ifdef HAVE_LOG_AUTHPRIV            
             { "AUTHPRIV", LOG_AUTHPRIV },
+#endif
             { "CRON", LOG_CRON },
             { "DAEMON", LOG_DAEMON },
+#ifdef HAVE_LOG_FTP
             { "FTP", LOG_FTP },
+#endif
             { "KERN", LOG_KERN },
             { "LOCAL0", LOG_LOCAL0 },
             { "LOCAL1", LOG_LOCAL1 },
@@ -72,7 +76,7 @@
     
     int value(const string& name) const {
         string key(name);
-        transform(key.begin(), key.end(), key.begin(), ::toupper);        
+        std::transform(key.begin(), key.end(), key.begin(), ::toupper);        
         ByName::const_iterator i = byName.find(key);
         if (i == byName.end())
             throw Exception("Not a valid syslog facility: " + name);

Modified: qpid/branches/qpid-1673/qpid/cpp/src/qpid/management/ManagementBroker.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-1673/qpid/cpp/src/qpid/management/ManagementBroker.cpp?rev=752300&r1=752299&r2=752300&view=diff
==============================================================================
--- qpid/branches/qpid-1673/qpid/cpp/src/qpid/management/ManagementBroker.cpp (original)
+++ qpid/branches/qpid-1673/qpid/cpp/src/qpid/management/ManagementBroker.cpp Tue Mar 10 23:10:57 2009
@@ -80,7 +80,7 @@
 }
 
 ManagementBroker::ManagementBroker () :
-    threadPoolSize(1), interval(10), broker(0)
+    threadPoolSize(1), interval(10), broker(0), startTime(uint64_t(Duration(now())))
 {
     nextObjectId   = 1;
     brokerBank     = 1;
@@ -346,6 +346,9 @@
     string              routingKey;
     list<pair<ObjectId, ManagementObject*> > deleteList;
 
+    uint64_t uptime = uint64_t(Duration(now())) - startTime;
+    static_cast<_qmf::Broker*>(broker->GetManagementObject())->set_uptime(uptime);
+
     moveNewObjectsLH();
 
     if (clientWasAdded) {
@@ -844,6 +847,9 @@
             Buffer   outBuffer (outputBuffer, MA_BUFFER_SIZE);
             uint32_t outLen;
 
+            if (object->getConfigChanged() || object->getInstChanged())
+                object->setUpdateTime();
+
             encodeHeader(outBuffer, 'g', sequence);
             object->writeProperties(outBuffer);
             object->writeStatistics(outBuffer, true);
@@ -865,6 +871,9 @@
             Buffer   outBuffer (outputBuffer, MA_BUFFER_SIZE);
             uint32_t outLen;
 
+            if (object->getConfigChanged() || object->getInstChanged())
+                object->setUpdateTime();
+
             encodeHeader(outBuffer, 'g', sequence);
             object->writeProperties(outBuffer);
             object->writeStatistics(outBuffer, true);

Modified: qpid/branches/qpid-1673/qpid/cpp/src/qpid/management/ManagementBroker.h
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-1673/qpid/cpp/src/qpid/management/ManagementBroker.h?rev=752300&r1=752299&r2=752300&view=diff
==============================================================================
--- qpid/branches/qpid-1673/qpid/cpp/src/qpid/management/ManagementBroker.h (original)
+++ qpid/branches/qpid-1673/qpid/cpp/src/qpid/management/ManagementBroker.h Tue Mar 10 23:10:57 2009
@@ -182,6 +182,7 @@
     uint32_t                     nextRemoteBank;
     uint32_t                     nextRequestSequence;
     bool                         clientWasAdded;
+    const uint64_t               startTime;
 
     std::auto_ptr<IdAllocator> allocator;
 

Modified: qpid/branches/qpid-1673/qpid/cpp/src/qpid/replication/ReplicatingEventListener.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-1673/qpid/cpp/src/qpid/replication/ReplicatingEventListener.cpp?rev=752300&r1=752299&r2=752300&view=diff
==============================================================================
--- qpid/branches/qpid-1673/qpid/cpp/src/qpid/replication/ReplicatingEventListener.cpp (original)
+++ qpid/branches/qpid-1673/qpid/cpp/src/qpid/replication/ReplicatingEventListener.cpp Tue Mar 10 23:10:57 2009
@@ -57,7 +57,6 @@
 {
     FieldTable headers;
     headers.setString(REPLICATION_TARGET_QUEUE, dequeued.queue->getName());
-    headers.setInt(REPLICATION_EVENT_SEQNO, ++sequence);
     headers.setInt(REPLICATION_EVENT_TYPE, DEQUEUE);
     headers.setInt(DEQUEUED_MESSAGE_POSITION, dequeued.position);
     boost::intrusive_ptr<Message> msg(createMessage(headers));
@@ -69,7 +68,6 @@
     boost::intrusive_ptr<Message> msg(cloneMessage(*(enqueued.queue), enqueued.payload));
     FieldTable& headers = msg->getProperties<MessageProperties>()->getApplicationHeaders();
     headers.setString(REPLICATION_TARGET_QUEUE, enqueued.queue->getName());
-    headers.setInt(REPLICATION_EVENT_SEQNO, ++sequence);
     headers.setInt(REPLICATION_EVENT_TYPE, ENQUEUE);
     queue->deliver(msg);
 }
@@ -131,12 +129,14 @@
 {
       Broker* broker = dynamic_cast<broker::Broker*>(&target);
       if (broker && !options.queue.empty()) {
+          broker->addFinalizer(boost::bind(&ReplicatingEventListener::shutdown, this)); 
           if (options.createQueue) {
               queue = broker->getQueues().declare(options.queue).first;
           } else {
               queue = broker->getQueues().find(options.queue);
           }
           if (queue) {
+              queue->insertSequenceNumbers(REPLICATION_EVENT_SEQNO);
               QueueEvents::EventListener callback = boost::bind(&ReplicatingEventListener::handle, this, _1);
               broker->getQueueEvents().registerListener(options.name, callback);
               QPID_LOG(info, "Registered replicating queue event listener");
@@ -147,6 +147,7 @@
 }
 
 void ReplicatingEventListener::earlyInitialize(Target&) {}
+void ReplicatingEventListener::shutdown() { queue.reset(); }
 
 ReplicatingEventListener::PluginOptions::PluginOptions() : Options("Queue Replication Options"), 
                                                            name("replicator"), 

Modified: qpid/branches/qpid-1673/qpid/cpp/src/qpid/replication/ReplicatingEventListener.h
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-1673/qpid/cpp/src/qpid/replication/ReplicatingEventListener.h?rev=752300&r1=752299&r2=752300&view=diff
==============================================================================
--- qpid/branches/qpid-1673/qpid/cpp/src/qpid/replication/ReplicatingEventListener.h (original)
+++ qpid/branches/qpid-1673/qpid/cpp/src/qpid/replication/ReplicatingEventListener.h Tue Mar 10 23:10:57 2009
@@ -58,10 +58,10 @@
 
     PluginOptions options;    
     qpid::broker::Queue::shared_ptr queue;
-    qpid::framing::SequenceNumber sequence;
 
     void deliverDequeueMessage(const qpid::broker::QueuedMessage& enqueued);
     void deliverEnqueueMessage(const qpid::broker::QueuedMessage& enqueued);
+    void shutdown();
 
     boost::intrusive_ptr<qpid::broker::Message> createMessage(const qpid::framing::FieldTable& headers);
     boost::intrusive_ptr<qpid::broker::Message> cloneMessage(qpid::broker::Queue& queue, 

Modified: qpid/branches/qpid-1673/qpid/cpp/src/qpid/replication/ReplicationExchange.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-1673/qpid/cpp/src/qpid/replication/ReplicationExchange.cpp?rev=752300&r1=752299&r2=752300&view=diff
==============================================================================
--- qpid/branches/qpid-1673/qpid/cpp/src/qpid/replication/ReplicationExchange.cpp (original)
+++ qpid/branches/qpid-1673/qpid/cpp/src/qpid/replication/ReplicationExchange.cpp Tue Mar 10 23:10:57 2009
@@ -34,11 +34,13 @@
 using namespace qpid::framing;
 using namespace qpid::replication::constants;
 
+const std::string SEQUENCE_VALUE("qpid.replication-event.sequence");
 ReplicationExchange::ReplicationExchange(const std::string& name, bool durable, 
                                          const FieldTable& args,
                                          QueueRegistry& qr,
                                          Manageable* parent) 
-    : Exchange(name, durable, args, parent), queues(qr), init(false) {}
+    : Exchange(name, durable, args, parent), queues(qr), sequence(args.getAsInt64(SEQUENCE_VALUE)), init(false)
+ {}
 
 std::string ReplicationExchange::getType() const { return typeName; }            
 
@@ -68,26 +70,33 @@
 {
     std::string queueName = args->getAsString(REPLICATION_TARGET_QUEUE);
     Queue::shared_ptr queue = queues.find(queueName);
-    FieldTable& headers = msg.getMessage().getProperties<MessageProperties>()->getApplicationHeaders();
-    headers.erase(REPLICATION_TARGET_QUEUE);
-    headers.erase(REPLICATION_EVENT_SEQNO);
-    headers.erase(REPLICATION_EVENT_TYPE);
-    msg.deliverTo(queue);
-    QPID_LOG(debug, "Enqueued replicated message onto " << queue);
+    if (queue) {
+        FieldTable& headers = msg.getMessage().getProperties<MessageProperties>()->getApplicationHeaders();
+        headers.erase(REPLICATION_TARGET_QUEUE);
+        headers.erase(REPLICATION_EVENT_SEQNO);
+        headers.erase(REPLICATION_EVENT_TYPE);
+        msg.deliverTo(queue);
+        QPID_LOG(debug, "Enqueued replicated message onto " << queueName);
+    } else {
+        QPID_LOG(error, "Cannot enqueue replicated message. Queue " << queueName << " does not exist");
+    }
 }
 
 void ReplicationExchange::handleDequeueEvent(const FieldTable* args)
 {
     std::string queueName = args->getAsString(REPLICATION_TARGET_QUEUE);
     Queue::shared_ptr queue = queues.find(queueName);
-    SequenceNumber position(args->getAsInt(DEQUEUED_MESSAGE_POSITION));
-    
-    QueuedMessage dequeued;
-    if (queue->acquireMessageAt(position, dequeued)) {
-        queue->dequeue(0, dequeued);
-        QPID_LOG(debug, "Processed replicated 'dequeue' event from " << queueName << " at position " << position);
+    if (queue) {
+        SequenceNumber position(args->getAsInt(DEQUEUED_MESSAGE_POSITION));        
+        QueuedMessage dequeued;
+        if (queue->acquireMessageAt(position, dequeued)) {
+            queue->dequeue(0, dequeued);
+            QPID_LOG(debug, "Processed replicated 'dequeue' event from " << queueName << " at position " << position);
+        } else {
+            QPID_LOG(warning, "Could not acquire message " << position << " from " << queueName);
+        }
     } else {
-        QPID_LOG(warning, "Could not acquire message " << position << " from " << queueName);
+        QPID_LOG(error, "Cannot process replicated 'dequeue' event. Queue " << queueName << " does not exist");
     }
 }
 
@@ -128,6 +137,13 @@
 const std::string ReplicationExchange::typeName("replication");
 
 
+void ReplicationExchange::encode(Buffer& buffer) const
+{
+    args.setInt64(std::string(SEQUENCE_VALUE), sequence);
+    Exchange::encode(buffer);
+}
+
+
 struct ReplicationExchangePlugin : Plugin
 {
     Broker* broker;

Modified: qpid/branches/qpid-1673/qpid/cpp/src/qpid/replication/ReplicationExchange.h
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-1673/qpid/cpp/src/qpid/replication/ReplicationExchange.h?rev=752300&r1=752299&r2=752300&view=diff
==============================================================================
--- qpid/branches/qpid-1673/qpid/cpp/src/qpid/replication/ReplicationExchange.h (original)
+++ qpid/branches/qpid-1673/qpid/cpp/src/qpid/replication/ReplicationExchange.h Tue Mar 10 23:10:57 2009
@@ -22,6 +22,7 @@
  *
  */
 #include "qpid/broker/Exchange.h"
+#include "qpid/framing/Buffer.h"
 #include "qpid/framing/SequenceNumber.h"
 
 namespace qpid {
@@ -58,6 +59,7 @@
     bool isDuplicate(const qpid::framing::FieldTable* args);
     void handleEnqueueEvent(const qpid::framing::FieldTable* args, qpid::broker::Deliverable& msg);
     void handleDequeueEvent(const qpid::framing::FieldTable* args);
+    void encode(framing::Buffer& buffer) const;
 };
 }} // namespace qpid::replication
 

Modified: qpid/branches/qpid-1673/qpid/cpp/src/qpid/sys/Codec.h
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-1673/qpid/cpp/src/qpid/sys/Codec.h?rev=752300&r1=752299&r2=752300&view=diff
==============================================================================
--- qpid/branches/qpid-1673/qpid/cpp/src/qpid/sys/Codec.h (original)
+++ qpid/branches/qpid-1673/qpid/cpp/src/qpid/sys/Codec.h Tue Mar 10 23:10:57 2009
@@ -38,11 +38,11 @@
      * @return may be less than size if there was incomplete
      * data at the end of the buffer.
      */
-    virtual size_t decode(const char* buffer, size_t size) = 0;
+    virtual std::size_t decode(const char* buffer, std::size_t size) = 0;
 
 
     /** Encode into buffer, return number of bytes encoded */
-    virtual size_t encode(const char* buffer, size_t size) = 0;
+    virtual std::size_t encode(const char* buffer, std::size_t size) = 0;
 
     /** Return true if we have data to encode */
     virtual bool canEncode() = 0;

Modified: qpid/branches/qpid-1673/qpid/cpp/src/qpid/sys/ConnectionOutputHandlerPtr.h
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-1673/qpid/cpp/src/qpid/sys/ConnectionOutputHandlerPtr.h?rev=752300&r1=752299&r2=752300&view=diff
==============================================================================
--- qpid/branches/qpid-1673/qpid/cpp/src/qpid/sys/ConnectionOutputHandlerPtr.h (original)
+++ qpid/branches/qpid-1673/qpid/cpp/src/qpid/sys/ConnectionOutputHandlerPtr.h Tue Mar 10 23:10:57 2009
@@ -30,7 +30,7 @@
 /**
  * A ConnectionOutputHandler that delegates to another
  * ConnectionOutputHandler.  Allows the "real" ConnectionOutputHandler
- * to be changed modified without updating all the pointers/references
+ * to be changed without updating all the pointers/references
  * using the ConnectionOutputHandlerPtr
  */
 class ConnectionOutputHandlerPtr : public ConnectionOutputHandler

Modified: qpid/branches/qpid-1673/qpid/cpp/src/qpid/sys/CopyOnWriteArray.h
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-1673/qpid/cpp/src/qpid/sys/CopyOnWriteArray.h?rev=752300&r1=752299&r2=752300&view=diff
==============================================================================
--- qpid/branches/qpid-1673/qpid/cpp/src/qpid/sys/CopyOnWriteArray.h (original)
+++ qpid/branches/qpid-1673/qpid/cpp/src/qpid/sys/CopyOnWriteArray.h Tue Mar 10 23:10:57 2009
@@ -80,7 +80,7 @@
     bool add_unless(T& t, F f)
     {
         Mutex::ScopedLock l(lock);
-        if (array && find_if(array->begin(), array->end(), f) != array->end()) {
+        if (array && std::find_if(array->begin(), array->end(), f) != array->end()) {
             return false;
         } else {
             ArrayPtr copy(array ? new std::vector<T>(*array) : new std::vector<T>());

Modified: qpid/branches/qpid-1673/qpid/cpp/src/qpid/sys/DispatchHandle.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-1673/qpid/cpp/src/qpid/sys/DispatchHandle.cpp?rev=752300&r1=752299&r2=752300&view=diff
==============================================================================
--- qpid/branches/qpid-1673/qpid/cpp/src/qpid/sys/DispatchHandle.cpp (original)
+++ qpid/branches/qpid-1673/qpid/cpp/src/qpid/sys/DispatchHandle.cpp Tue Mar 10 23:10:57 2009
@@ -21,6 +21,8 @@
 
 #include "DispatchHandle.h"
 
+#include <algorithm>
+
 #include <boost/cast.hpp>
 
 #include <assert.h>
@@ -29,7 +31,6 @@
 namespace sys {
 
 DispatchHandle::~DispatchHandle() {
-    stopWatch();
 }
 
 void DispatchHandle::startWatch(Poller::shared_ptr poller0) {
@@ -37,13 +38,21 @@
     bool w = writableCallback;
 
     ScopedLock<Mutex> lock(stateLock);
-    assert(state == IDLE);
+    assert(state == IDLE || state == DELAYED_IDLE);
 
     // If no callbacks set then do nothing (that is what we were asked to do!)
     // TODO: Maybe this should be an assert instead
     if (!r && !w) {
-        state = INACTIVE;
-        return;
+        switch (state) {
+        case IDLE:
+            state = INACTIVE;
+            return;
+        case DELAYED_IDLE:
+            state = DELAYED_INACTIVE;
+            return;
+        default:
+            assert(state == IDLE || state == DELAYED_IDLE);    
+        }        
     }
 
     Poller::Direction d = r ?
@@ -53,9 +62,20 @@
     poller = poller0;
     poller->addFd(*this, d);
     
-    state = r ?
-        (w ? ACTIVE_RW : ACTIVE_R) :
-        ACTIVE_W;
+    switch (state) {
+    case IDLE:
+        state = r ?
+            (w ? ACTIVE_RW : ACTIVE_R) :
+            ACTIVE_W;
+        return;
+    case DELAYED_IDLE:
+        state = r ?
+            (w ? DELAYED_RW : DELAYED_R) :
+            DELAYED_W;
+        return;
+        default:
+            assert(state == IDLE || state == DELAYED_IDLE);            
+    }   
 }
 
 void DispatchHandle::rewatch() {
@@ -93,6 +113,8 @@
     case ACTIVE_RW:
         // Don't need to do anything already waiting for readable/writable
         break;
+    case ACTIVE_DELETE:
+        assert(state != ACTIVE_DELETE);
     }
 }
 
@@ -130,6 +152,8 @@
         poller->modFd(*this, Poller::INOUT);
         state = ACTIVE_RW;
         break;
+    case ACTIVE_DELETE:
+        assert(state != ACTIVE_DELETE);
     }
 }
 
@@ -167,6 +191,8 @@
     case ACTIVE_RW:
         // Nothing to do: already waiting for writable
         break;
+    case ACTIVE_DELETE:
+        assert(state != ACTIVE_DELETE);
    }
 }
 
@@ -203,6 +229,8 @@
     case ACTIVE_W:
     case INACTIVE:
         break;
+    case ACTIVE_DELETE:
+        assert(state != ACTIVE_DELETE);
     }
 }
 
@@ -239,6 +267,8 @@
     case ACTIVE_R:
     case INACTIVE:
         break;
+    case ACTIVE_DELETE:
+        assert(state != ACTIVE_DELETE);
    }
 }
 
@@ -261,6 +291,8 @@
         poller->modFd(*this, Poller::NONE);
         state = INACTIVE;
         break;
+    case ACTIVE_DELETE:
+        assert(state != ACTIVE_DELETE);
     }            
 }
 
@@ -280,47 +312,72 @@
     default:
         state = IDLE;
         break;
+    case ACTIVE_DELETE:
+        assert(state != ACTIVE_DELETE);
     }
     assert(poller);
     poller->delFd(*this);
     poller.reset();
 }
 
+// If we are already in the IDLE state we can't do the callback as we might
+// race to delete and callback at the same time
+// TODO: might be able to fix this by adding a new state, but would make
+// the state machine even more complex
 void DispatchHandle::call(Callback iCb) {
     assert(iCb);
     ScopedLock<Mutex> lock(stateLock);
-    interruptedCallbacks.push(iCb);
-    
-    (void) poller->interrupt(*this);
+    switch (state) {
+    case IDLE:
+    case ACTIVE_DELETE:
+        assert(false);
+        return;
+    default:
+        interruptedCallbacks.push(iCb);
+        assert(poller);
+        (void) poller->interrupt(*this);
+    }
 }
 
 // The slightly strange switch structure
 // is to ensure that the lock is released before
 // we do the delete
 void DispatchHandle::doDelete() {
-    // Ensure that we're no longer watching anything
-    stopWatch();
-
-    // If we're in the middle of a callback defer the delete
     {
     ScopedLock<Mutex> lock(stateLock);
+    // Ensure that we're no longer watching anything
     switch (state) {
+    case DELAYED_R:
+    case DELAYED_W:
+    case DELAYED_RW:
+    case DELAYED_INACTIVE:
+        assert(poller);
+        poller->delFd(*this);
+        poller.reset();
+        // Fallthrough
     case DELAYED_IDLE:
-    case DELAYED_DELETE:
         state = DELAYED_DELETE;
+        // Fallthrough
+    case DELAYED_DELETE:
+    case ACTIVE_DELETE:
         return;
     case IDLE:
         break;
     default:
-        // Can only get out of stopWatch() in DELAYED_IDLE/DELAYED_DELETE/IDLE states
-        assert(false);
+        state = ACTIVE_DELETE;
+        assert(poller);
+        (void) poller->interrupt(*this);
+        poller->delFd(*this);
+        return;
     }
     }
-    // If we're not then do it right away
+    // If we're IDLE we can do this right away
     delete this;
 }
 
 void DispatchHandle::processEvent(Poller::EventType type) {
+    CallbackQueue callbacks;
+
     // Note that we are now doing the callbacks
     {
     ScopedLock<Mutex> lock(stateLock);
@@ -336,6 +393,16 @@
     case ACTIVE_RW:
         state = DELAYED_RW;
         break;
+    case ACTIVE_DELETE:
+        // Need to make sure we clean up any pending callbacks in this case
+        std::swap(callbacks, interruptedCallbacks);
+        goto saybyebye;
+    // Can get here in idle if we are stopped in a different thread
+    // just after we return with this handle in Poller::wait
+    case IDLE:
+    // Can get here in INACTIVE if a non connection thread unwatches
+    // whilst we were stuck in the above lock 
+    case INACTIVE:
     // Can only get here in a DELAYED_* state in the rare case
     // that we're already here for reading and we get activated for
     // writing and we can write (it might be possible the other way
@@ -348,9 +415,9 @@
     case DELAYED_IDLE:
     case DELAYED_DELETE:
         return;
-    default:
-        assert(false);
     }
+    
+    std::swap(callbacks, interruptedCallbacks);
     }
 
     // Do callbacks - whilst we are doing the callbacks we are prevented from processing
@@ -378,8 +445,8 @@
         break;
     case Poller::INTERRUPTED:
         {
-        ScopedLock<Mutex> lock(stateLock);
-        assert(interruptedCallbacks.size() > 0);
+        // We could only be interrupted if we also had a callback to do
+        assert(callbacks.size() > 0);
         // We'll actually do the interrupt below
         }
         break;
@@ -387,16 +454,18 @@
         assert(false);
     }
 
-    {
-    ScopedLock<Mutex> lock(stateLock);
-    // If we've got a pending interrupt do it now
-    while (interruptedCallbacks.size() > 0) {
-        Callback cb = interruptedCallbacks.front();
+    // If we have any callbacks do them now -
+    // (because we use a copy from before the previous callbacks we won't
+    //  do anything yet that was just added) 
+    while (callbacks.size() > 0) {
+        Callback cb = callbacks.front();
         assert(cb);
         cb(*this);
-        interruptedCallbacks.pop();
+        callbacks.pop();
     }
 
+    {
+    ScopedLock<Mutex> lock(stateLock);
     // If any of the callbacks re-enabled reading/writing then actually
     // do it now
     switch (state) {
@@ -425,7 +494,9 @@
     case DELAYED_DELETE:
         break;
     }
-    }      
+    }
+
+saybyebye:
     delete this;
 }
 

Modified: qpid/branches/qpid-1673/qpid/cpp/src/qpid/sys/DispatchHandle.h
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-1673/qpid/cpp/src/qpid/sys/DispatchHandle.h?rev=752300&r1=752299&r2=752300&view=diff
==============================================================================
--- qpid/branches/qpid-1673/qpid/cpp/src/qpid/sys/DispatchHandle.h (original)
+++ qpid/branches/qpid-1673/qpid/cpp/src/qpid/sys/DispatchHandle.h Tue Mar 10 23:10:57 2009
@@ -65,6 +65,7 @@
     Mutex stateLock;
     enum {
         IDLE, INACTIVE, ACTIVE_R, ACTIVE_W, ACTIVE_RW,
+        ACTIVE_DELETE,
         DELAYED_IDLE, DELAYED_INACTIVE, DELAYED_R, DELAYED_W, DELAYED_RW,
         DELAYED_DELETE
     } state;

Modified: qpid/branches/qpid-1673/qpid/cpp/src/qpid/sys/PollableQueue.h
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-1673/qpid/cpp/src/qpid/sys/PollableQueue.h?rev=752300&r1=752299&r2=752300&view=diff
==============================================================================
--- qpid/branches/qpid-1673/qpid/cpp/src/qpid/sys/PollableQueue.h (original)
+++ qpid/branches/qpid-1673/qpid/cpp/src/qpid/sys/PollableQueue.h Tue Mar 10 23:10:57 2009
@@ -53,7 +53,7 @@
      * @param values  Queue of values to process. Any items remaining
      *                on return from Callback are put back on the queue.
      */
-    typedef boost::function<void (Queue& values)> Callback;
+    typedef boost::function<void (Queue&)> Callback;
 
     /**
      * Constructor; sets necessary parameters.

Modified: qpid/branches/qpid-1673/qpid/cpp/src/qpid/sys/Poller.h
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-1673/qpid/cpp/src/qpid/sys/Poller.h?rev=752300&r1=752299&r2=752300&view=diff
==============================================================================
--- qpid/branches/qpid-1673/qpid/cpp/src/qpid/sys/Poller.h (original)
+++ qpid/branches/qpid-1673/qpid/cpp/src/qpid/sys/Poller.h Tue Mar 10 23:10:57 2009
@@ -86,8 +86,9 @@
     //   with the handle and the INTERRUPTED event type
     // if it returns false then the handle is not being monitored by the poller
     // - This can either be because it has just received an event which has been
-    //   reported and has not been reenabled since. Or because it was removed
-    //   from the monitoring set
+    //   reported and has not been reenabled since.
+    // - Because it was removed from the monitoring set
+    // - Or because it is already being interrupted 
     QPID_COMMON_EXTERN bool interrupt(PollerHandle& handle);
     
     // Poller run loop

Modified: qpid/branches/qpid-1673/qpid/cpp/src/qpid/sys/Socket.h
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-1673/qpid/cpp/src/qpid/sys/Socket.h?rev=752300&r1=752299&r2=752300&view=diff
==============================================================================
--- qpid/branches/qpid-1673/qpid/cpp/src/qpid/sys/Socket.h (original)
+++ qpid/branches/qpid-1673/qpid/cpp/src/qpid/sys/Socket.h Tue Mar 10 23:10:57 2009
@@ -27,8 +27,6 @@
 #include "qpid/CommonImportExport.h"
 #include <string>
 
-struct sockaddr;
-
 namespace qpid {
 namespace sys {
 
@@ -93,7 +91,7 @@
     /** Accept a connection from a socket that is already listening
      * and has an incoming connection
      */
-    QPID_COMMON_EXTERN Socket* accept(struct sockaddr *addr, socklen_t *addrlen) const;
+    QPID_COMMON_EXTERN Socket* accept() const;
 
     // TODO The following are raw operations, maybe they need better wrapping? 
     QPID_COMMON_EXTERN int read(void *buf, size_t count) const;

Modified: qpid/branches/qpid-1673/qpid/cpp/src/qpid/sys/Thread.h
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-1673/qpid/cpp/src/qpid/sys/Thread.h?rev=752300&r1=752299&r2=752300&view=diff
==============================================================================
--- qpid/branches/qpid-1673/qpid/cpp/src/qpid/sys/Thread.h (original)
+++ qpid/branches/qpid-1673/qpid/cpp/src/qpid/sys/Thread.h Tue Mar 10 23:10:57 2009
@@ -28,6 +28,8 @@
 #  define QPID_TSS __declspec(thread)
 #elif defined (__GNUC__)
 #  define QPID_TSS __thread
+#elif defined (__SUNPRO_CC)
+#  define QPID_TSS __thread
 #else
 #  error "Dont know how to define QPID_TSS for this platform"
 #endif

Modified: qpid/branches/qpid-1673/qpid/cpp/src/qpid/sys/epoll/EpollPoller.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-1673/qpid/cpp/src/qpid/sys/epoll/EpollPoller.cpp?rev=752300&r1=752299&r2=752300&view=diff
==============================================================================
--- qpid/branches/qpid-1673/qpid/cpp/src/qpid/sys/epoll/EpollPoller.cpp (original)
+++ qpid/branches/qpid-1673/qpid/cpp/src/qpid/sys/epoll/EpollPoller.cpp Tue Mar 10 23:10:57 2009
@@ -54,17 +54,20 @@
         INACTIVE,
         HUNGUP,
         MONITORED_HUNGUP,
+        INTERRUPTED,
         DELETED
     };
 
     int fd;
     ::__uint32_t events;
+    PollerHandle* pollerHandle;
     FDStat stat;
     Mutex lock;
 
-    PollerHandlePrivate(int f) :
+    PollerHandlePrivate(int f, PollerHandle* p) :
       fd(f),
       events(0),
+      pollerHandle(p),
       stat(ABSENT) {
     }
 
@@ -101,6 +104,14 @@
         stat = HUNGUP;
     }
 
+    bool isInterrupted() const {
+        return stat == INTERRUPTED;
+    }
+
+    void setInterrupted() {
+        stat = INTERRUPTED;
+    }
+
     bool isDeleted() const {
         return stat == DELETED;
     }
@@ -111,7 +122,7 @@
 };
 
 PollerHandle::PollerHandle(const IOHandle& h) :
-    impl(new PollerHandlePrivate(toFd(h.impl)))
+    impl(new PollerHandlePrivate(toFd(h.impl), this))
 {}
 
 PollerHandle::~PollerHandle() {
@@ -120,6 +131,10 @@
     if (impl->isDeleted()) {
     	return;
     }
+    if (impl->isInterrupted()) {
+        impl->setDeleted();
+        return;
+    }
     if (impl->isActive()) {
         impl->setDeleted();
     }
@@ -243,23 +258,21 @@
         ::close(epollFd);
     }
     
-    void interrupt(bool all=false) {
+    void interrupt() {
 	    ::epoll_event epe;
-	    if (all) {
-	        // Not EPOLLONESHOT, so we eventually get all threads
-		    epe.events = ::EPOLLIN;
-		    epe.data.u64 = 0; // Keep valgrind happy
-	    } else {
-	    	// Use EPOLLONESHOT so we only wake a single thread
-		    epe.events = ::EPOLLIN | ::EPOLLONESHOT;
-		    epe.data.u64 = 0; // Keep valgrind happy
-		    epe.data.ptr = &static_cast<PollerHandle&>(interruptHandle);	    
-	    }
+    	// Use EPOLLONESHOT so we only wake a single thread
+	    epe.events = ::EPOLLIN | ::EPOLLONESHOT;
+	    epe.data.u64 = 0; // Keep valgrind happy
+	    epe.data.ptr = &static_cast<PollerHandle&>(interruptHandle);
 	    QPID_POSIX_CHECK(::epoll_ctl(epollFd, EPOLL_CTL_MOD, alwaysReadableFd, &epe));	
     }
     
     void interruptAll() {
-    	interrupt(true);
+        ::epoll_event epe;
+        // Not EPOLLONESHOT, so we eventually get all threads
+        epe.events = ::EPOLLIN;
+        epe.data.u64 = 0; // Keep valgrind happy
+        QPID_POSIX_CHECK(::epoll_ctl(epollFd, EPOLL_CTL_MOD, alwaysReadableFd, &epe));  
     }
 };
 
@@ -281,7 +294,7 @@
         epe.events = eh.events | PollerPrivate::directionToEpollEvent(dir);
     }
     epe.data.u64 = 0; // Keep valgrind happy
-    epe.data.ptr = &handle;
+    epe.data.ptr = &eh;
 
     QPID_POSIX_CHECK(::epoll_ctl(impl->epollFd, op, eh.fd, &epe));
 
@@ -312,7 +325,7 @@
     ::epoll_event epe;
     epe.events = PollerPrivate::directionToEpollEvent(dir) | ::EPOLLONESHOT;
     epe.data.u64 = 0; // Keep valgrind happy
-    epe.data.ptr = &handle;
+    epe.data.ptr = &eh;
 
     QPID_POSIX_CHECK(::epoll_ctl(impl->epollFd, EPOLL_CTL_MOD, eh.fd, &epe));
 
@@ -329,7 +342,7 @@
     ::epoll_event epe;
     epe.events = eh.events;
     epe.data.u64 = 0; // Keep valgrind happy
-    epe.data.ptr = &handle;
+    epe.data.ptr = &eh;
 
     QPID_POSIX_CHECK(::epoll_ctl(impl->epollFd, EPOLL_CTL_MOD, eh.fd, &epe));
 
@@ -355,15 +368,14 @@
 	{
 	    PollerHandlePrivate& eh = *handle.impl;
 	    ScopedLock<Mutex> l(eh.lock);
-	    if (eh.isInactive()) {
+	    if (!eh.isActive()) {
 	    	return false;
 	    }
 	    ::epoll_event epe;
 	    epe.events = 0;
 	    epe.data.u64 = 0; // Keep valgrind happy
-	    epe.data.ptr = &eh;
 	    QPID_POSIX_CHECK(::epoll_ctl(impl->epollFd, EPOLL_CTL_MOD, eh.fd, &epe));
-	    eh.setInactive();
+	    eh.setInterrupted();
 	}
 
 	PollerPrivate::InterruptHandle& ih = impl->interruptHandle;
@@ -422,37 +434,54 @@
 #else
         int rc = ::epoll_pwait(impl->epollFd, &epe, 1, timeoutMs, &impl->sigMask);
 #endif
-        // Check for shutdown
-        if (impl->isShutdown) {
-            PollerHandleDeletionManager.markAllUnusedInThisThread();
-            return Event(0, SHUTDOWN);
-        }
 
         if (rc ==-1 && errno != EINTR) {
             QPID_POSIX_CHECK(rc);
         } else if (rc > 0) {
             assert(rc == 1);
-            PollerHandle* handle = static_cast<PollerHandle*>(epe.data.ptr);
+            void* dataPtr = epe.data.ptr;
 
-            PollerHandlePrivate& eh = *handle->impl;
+            // Check if this is an interrupt
+            PollerPrivate::InterruptHandle& interruptHandle = impl->interruptHandle;
+            if (dataPtr == &interruptHandle) {
+                PollerHandle* wrappedHandle = 0;
+                {
+                ScopedLock<Mutex> l(interruptHandle.impl->lock);
+                if (interruptHandle.impl->isActive()) {
+                    wrappedHandle = interruptHandle.getHandle();
+                    // If there is an interrupt queued behind this one we need to arm it
+                    // We do it this way so that another thread can pick it up
+                    if (interruptHandle.queuedHandles()) {
+                        impl->interrupt();
+                        interruptHandle.impl->setActive();
+                    } else {
+                        interruptHandle.impl->setInactive();
+                    }
+                }
+                }
+                if (wrappedHandle) {
+                    ScopedLock<Mutex> l(wrappedHandle->impl->lock);
+                    if (!wrappedHandle->impl->isDeleted()) {
+                        wrappedHandle->impl->setInactive();
+                        return Event(wrappedHandle, INTERRUPTED);
+                    }
+                    PollerHandleDeletionManager.markForDeletion(wrappedHandle->impl);
+                }
+                continue;
+            }
+
+            // Check for shutdown
+            if (impl->isShutdown) {
+                PollerHandleDeletionManager.markAllUnusedInThisThread();
+                return Event(0, SHUTDOWN);
+            }
+
+            PollerHandlePrivate& eh = *static_cast<PollerHandlePrivate*>(dataPtr);
             ScopedLock<Mutex> l(eh.lock);
             
             // the handle could have gone inactive since we left the epoll_wait
             if (eh.isActive()) {
-
-                // Check if this is an interrupt
-                if (handle == &impl->interruptHandle) {
-                	PollerHandle* wrappedHandle = impl->interruptHandle.getHandle();
-                	// If there is an interrupt queued behind this one we need to arm it
-                	// We do it this way so that another thread can pick it up
-                	if (impl->interruptHandle.queuedHandles()) {
-                		impl->interrupt();
-                		eh.setActive();
-                	} else {
-                		eh.setInactive();
-                	}
-                	return Event(wrappedHandle, INTERRUPTED);
-                }
+                PollerHandle* handle = eh.pollerHandle;
 
                 // If the connection has been hungup we could still be readable
                 // (just not writable), allow us to readable until we get here again

Modified: qpid/branches/qpid-1673/qpid/cpp/src/qpid/sys/posix/AsynchIO.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-1673/qpid/cpp/src/qpid/sys/posix/AsynchIO.cpp?rev=752300&r1=752299&r2=752300&view=diff
==============================================================================
--- qpid/branches/qpid-1673/qpid/cpp/src/qpid/sys/posix/AsynchIO.cpp (original)
+++ qpid/branches/qpid-1673/qpid/cpp/src/qpid/sys/posix/AsynchIO.cpp Tue Mar 10 23:10:57 2009
@@ -123,7 +123,7 @@
         // TODO: Currently we ignore the peers address, perhaps we should
         // log it or use it for connection acceptance.
         try {
-            s = socket.accept(0, 0);
+            s = socket.accept();
             if (s) {
                 acceptedCallback(*s);
             } else {
@@ -474,7 +474,7 @@
                     break;
                 } else {
                     // Report error then just treat as a socket disconnect
-                    QPID_LOG(error, "Error reading socket: " << qpid::sys::strError(rc) << "(" << rc << ")" );
+                    QPID_LOG(error, "Error reading socket: " << qpid::sys::strError(errno) << "(" << errno << ")" );
                     eofCallback(*this);
                     h.unwatchRead();
                     break;

Modified: qpid/branches/qpid-1673/qpid/cpp/src/qpid/sys/posix/Socket.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-1673/qpid/cpp/src/qpid/sys/posix/Socket.cpp?rev=752300&r1=752299&r2=752300&view=diff
==============================================================================
--- qpid/branches/qpid-1673/qpid/cpp/src/qpid/sys/posix/Socket.cpp (original)
+++ qpid/branches/qpid-1673/qpid/cpp/src/qpid/sys/posix/Socket.cpp Tue Mar 10 23:10:57 2009
@@ -108,7 +108,7 @@
 {
     int& socket = impl->fd;
     if (socket != -1) Socket::close();
-    int s = ::socket (PF_INET, SOCK_STREAM, 0);
+    int s = ::socket (AF_INET, SOCK_STREAM, 0);
     if (s < 0) throw QPID_POSIX_ERROR(errno);
     socket = s;
 }
@@ -138,25 +138,30 @@
 }
 }
 
-void Socket::connect(const std::string& host, uint16_t port) const
+void Socket::connect(const std::string& host, uint16_t p) const
 {
-    std::stringstream namestream;
-    namestream << host << ":" << port;
-    connectname = namestream.str();
+    std::stringstream portstream;
+    portstream << p;
+    std::string port = portstream.str();
+    connectname = host + ":" + port;
 
     const int& socket = impl->fd;
-    struct sockaddr_in name;
-    name.sin_family = AF_INET;
-    name.sin_port = htons(port);
-    // TODO: Be good to make this work for IPv6 as well as IPv4
-    // Use more modern lookup functions
-    struct hostent* hp = gethostbyname ( host.c_str() );
-    if (hp == 0)
-        throw Exception(QPID_MSG("Cannot resolve " << host << ": " << h_errstr(h_errno)));
-    ::memcpy(&name.sin_addr.s_addr, hp->h_addr_list[0], hp->h_length);
-    if ((::connect(socket, (struct sockaddr*)(&name), sizeof(name)) < 0) &&
-        (errno != EINPROGRESS))
+
+    ::addrinfo *res;
+    ::addrinfo hints;
+    ::memset(&hints, 0, sizeof(hints));
+    hints.ai_family = AF_INET; // In order to allow AF_INET6 we'd have to change createTcp() as well
+    hints.ai_socktype = SOCK_STREAM;
+    int n = ::getaddrinfo(host.c_str(), port.c_str(), &hints, &res);
+    if (n != 0)
+        throw Exception(QPID_MSG("Cannot resolve " << host << ": " << ::gai_strerror(n)));
+    // TODO the correct thing to do here is loop on failure until you've used all the returned addresses
+    if ((::connect(socket, res->ai_addr, res->ai_addrlen) < 0) &&
+        (errno != EINPROGRESS)) {
+        ::freeaddrinfo(res);
         throw qpid::Exception(QPID_MSG(strError(errno) << ": " << host << ":" << port));
+    }
+    ::freeaddrinfo(res);
 }
 
 void
@@ -189,9 +194,9 @@
     return ntohs(name.sin_port);
 }
 
-Socket* Socket::accept(struct sockaddr *addr, socklen_t *addrlen) const
+Socket* Socket::accept() const
 {
-    int afd = ::accept(impl->fd, addr, addrlen);
+    int afd = ::accept(impl->fd, 0, 0);
     if ( afd >= 0)
         return new Socket(new IOHandlePrivate(afd));
     else if (errno == EAGAIN)
@@ -238,7 +243,7 @@
 
 uint16_t Socket::getRemotePort() const
 {
-    return atoi(getService(impl->fd, true).c_str());
+    return std::atoi(getService(impl->fd, true).c_str());
 }
 
 int Socket::getError() const



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org