You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ai...@apache.org on 2008/02/11 13:11:49 UTC

svn commit: r620468 [2/4] - in /incubator/qpid/branches/thegreatmerge: ./ qpid/bin/ qpid/cpp/ qpid/cpp/examples/ qpid/cpp/examples/examples/direct/ qpid/cpp/examples/examples/fanout/ qpid/cpp/examples/examples/pub-sub/ qpid/cpp/examples/examples/reques...

Modified: incubator/qpid/branches/thegreatmerge/qpid/cpp/src/qpid/broker/ConnectionHandler.h
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/cpp/src/qpid/broker/ConnectionHandler.h?rev=620468&r1=620467&r2=620468&view=diff
==============================================================================
--- incubator/qpid/branches/thegreatmerge/qpid/cpp/src/qpid/broker/ConnectionHandler.h (original)
+++ incubator/qpid/branches/thegreatmerge/qpid/cpp/src/qpid/broker/ConnectionHandler.h Mon Feb 11 04:11:03 2008
@@ -24,8 +24,10 @@
 #include <memory>
 #include "qpid/framing/amqp_types.h"
 #include "qpid/framing/AMQFrame.h"
-#include "qpid/framing/AMQP_ServerOperations.h"
+#include "qpid/framing/AMQP_ClientOperations.h"
 #include "qpid/framing/AMQP_ClientProxy.h"
+#include "qpid/framing/AMQP_ServerOperations.h"
+#include "qpid/framing/AMQP_ServerProxy.h"
 #include "qpid/framing/FrameHandler.h"
 #include "qpid/framing/ProtocolInitiation.h"
 #include "qpid/framing/ProtocolVersion.h"
@@ -39,10 +41,13 @@
 // TODO aconway 2007-09-18: Rename to ConnectionHandler
 class ConnectionHandler : public framing::FrameHandler
 {
-    struct Handler : public framing::AMQP_ServerOperations::ConnectionHandler
+    struct Handler : public framing::AMQP_ServerOperations::ConnectionHandler, 
+        public framing::AMQP_ClientOperations::ConnectionHandler
     {
         framing::AMQP_ClientProxy::Connection client;
+        framing::AMQP_ServerProxy::Connection server;
         Connection& connection;
+        bool serverMode;
     
         Handler(Connection& connection);
         void startOk(const qpid::framing::FieldTable& clientProperties,
@@ -55,6 +60,23 @@
         void close(uint16_t replyCode, const std::string& replyText,
                    uint16_t classId, uint16_t methodId); 
         void closeOk(); 
+
+
+        void start(uint8_t versionMajor,
+                   uint8_t versionMinor,
+                   const qpid::framing::FieldTable& serverProperties,
+                   const std::string& mechanisms,
+                   const std::string& locales);
+        
+        void secure(const std::string& challenge);
+        
+        void tune(uint16_t channelMax,
+                  uint32_t frameMax,
+                  uint16_t heartbeat);
+        
+        void openOk(const std::string& knownHosts);
+        
+        void redirect(const std::string& host, const std::string& knownHosts);        
     };
     std::auto_ptr<Handler> handler;
   public:

Modified: incubator/qpid/branches/thegreatmerge/qpid/cpp/src/qpid/broker/SessionHandler.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/cpp/src/qpid/broker/SessionHandler.cpp?rev=620468&r1=620467&r2=620468&view=diff
==============================================================================
--- incubator/qpid/branches/thegreatmerge/qpid/cpp/src/qpid/broker/SessionHandler.cpp (original)
+++ incubator/qpid/branches/thegreatmerge/qpid/cpp/src/qpid/broker/SessionHandler.cpp Mon Feb 11 04:11:03 2008
@@ -23,6 +23,7 @@
 #include "Connection.h"
 #include "qpid/framing/reply_exceptions.h"
 #include "qpid/framing/constants.h"
+#include "qpid/framing/ClientInvoker.h"
 #include "qpid/framing/ServerInvoker.h"
 #include "qpid/log/Statement.h"
 
@@ -57,17 +58,19 @@
     //
     AMQMethodBody* m = f.getBody()->getMethod();
     try {
-        if (m && invoke(*this, *m))
+        if (m && invoke(static_cast<AMQP_ServerOperations::SessionHandler&>(*this), *m)) {
             return;
-        else if (session.get()) {
+        } else if (session.get()) {
             boost::optional<SequenceNumber> ack=session->received(f);
             session->in.handle(f);
             if (ack)
                 peerSession.ack(*ack, SequenceNumberSet());
-        }
-        else if (!ignoring)
+        } else if (m && invoke(static_cast<AMQP_ClientOperations::SessionHandler&>(*this), *m)) {
+            return;
+        } else if (!ignoring) {
             throw ChannelErrorException(
                 QPID_MSG("Channel " << channel.get() << " is not open"));
+        }
     } catch(const ChannelException& e) {
         ignoring=true;          // Ignore trailing frames sent by client.
         session->detach();
@@ -186,6 +189,19 @@
 void  SessionHandler::solicitAck() {
     assertAttached("solicit-ack");
     peerSession.ack(session->sendingAck(), SequenceNumberSet());    
+}
+
+void SessionHandler::attached(const Uuid& /*sessionId*/, uint32_t detachedLifetime)
+{
+    std::auto_ptr<SessionState> state(
+        connection.broker.getSessionManager().open(*this, detachedLifetime));
+    session.reset(state.release());
+}
+
+void SessionHandler::detached()
+{
+    connection.broker.getSessionManager().suspend(session);
+    session.reset();
 }
 
 }} // namespace qpid::broker

Modified: incubator/qpid/branches/thegreatmerge/qpid/cpp/src/qpid/broker/SessionHandler.h
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/cpp/src/qpid/broker/SessionHandler.h?rev=620468&r1=620467&r2=620468&view=diff
==============================================================================
--- incubator/qpid/branches/thegreatmerge/qpid/cpp/src/qpid/broker/SessionHandler.h (original)
+++ incubator/qpid/branches/thegreatmerge/qpid/cpp/src/qpid/broker/SessionHandler.h Mon Feb 11 04:11:03 2008
@@ -23,6 +23,7 @@
  */
 
 #include "qpid/framing/FrameHandler.h"
+#include "qpid/framing/AMQP_ClientOperations.h"
 #include "qpid/framing/AMQP_ServerOperations.h"
 #include "qpid/framing/AMQP_ClientProxy.h"
 #include "qpid/framing/amqp_types.h"
@@ -43,6 +44,7 @@
  */
 class SessionHandler : public framing::FrameHandler::InOutHandler,
                        public framing::AMQP_ServerOperations::SessionHandler,
+                       public framing::AMQP_ClientOperations::SessionHandler,
                        private boost::noncopyable
 {
   public:
@@ -81,11 +83,16 @@
              const framing::SequenceNumberSet& seenFrameSet);
     void highWaterMark(uint32_t lastSentMark);
     void solicitAck();
+    
+    //extra methods required for assuming client role
+    void attached(const framing::Uuid& sessionId, uint32_t detachedLifetime);
+    void detached();
 
 
     void assertAttached(const char* method) const;
     void assertActive(const char* method) const;
     void assertClosed(const char* method) const;
+
 
     Connection& connection;
     framing::ChannelHandler channel;

Modified: incubator/qpid/branches/thegreatmerge/qpid/cpp/src/qpid/broker/SessionManager.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/cpp/src/qpid/broker/SessionManager.cpp?rev=620468&r1=620467&r2=620468&view=diff
==============================================================================
--- incubator/qpid/branches/thegreatmerge/qpid/cpp/src/qpid/broker/SessionManager.cpp (original)
+++ incubator/qpid/branches/thegreatmerge/qpid/cpp/src/qpid/broker/SessionManager.cpp Mon Feb 11 04:11:03 2008
@@ -43,13 +43,16 @@
 
 SessionManager::~SessionManager() {}
 
+// FIXME aconway 2008-02-01: pass handler*, allow open  unattached.
 std::auto_ptr<SessionState>  SessionManager::open(
     SessionHandler& h, uint32_t timeout_)
 {
     Mutex::ScopedLock l(lock);
     std::auto_ptr<SessionState> session(
-        new SessionState(*this, h, timeout_, ack));
+        new SessionState(this, &h, timeout_, ack));
     active.insert(session->getId());
+    for_each(observers.begin(), observers.end(),
+             boost::bind(&Observer::opened, _1,boost::ref(*session)));
     return session;
 }
 
@@ -100,6 +103,10 @@
             suspended.erase(suspended.begin(), keep);
         }
     }
+}
+
+void SessionManager::add(const intrusive_ptr<Observer>& o) {
+    observers.push_back(o);
 }
 
 }} // namespace qpid::broker

Modified: incubator/qpid/branches/thegreatmerge/qpid/cpp/src/qpid/broker/SessionManager.h
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/cpp/src/qpid/broker/SessionManager.h?rev=620468&r1=620467&r2=620468&view=diff
==============================================================================
--- incubator/qpid/branches/thegreatmerge/qpid/cpp/src/qpid/broker/SessionManager.h (original)
+++ incubator/qpid/branches/thegreatmerge/qpid/cpp/src/qpid/broker/SessionManager.h Mon Feb 11 04:11:03 2008
@@ -25,6 +25,7 @@
 #include <qpid/framing/Uuid.h>
 #include <qpid/sys/Time.h>
 #include <qpid/sys/Mutex.h>
+#include <qpid/RefCounted.h>
 
 #include <boost/noncopyable.hpp>
 #include <boost/ptr_container/ptr_vector.hpp>
@@ -44,8 +45,17 @@
  */
 class SessionManager : private boost::noncopyable {
   public:
+    /**
+     * Observer notified of SessionManager events.
+     */
+    struct Observer : public RefCounted {
+        virtual void opened(SessionState&) {}
+    };
+    
     SessionManager(uint32_t ack);
+    
     ~SessionManager();
+    
     /** Open a new active session, caller takes ownership */
     std::auto_ptr<SessionState> open(SessionHandler& h, uint32_t timeout_);
     
@@ -59,9 +69,13 @@
      */
     std::auto_ptr<SessionState> resume(const framing::Uuid&);
 
+    /** Add an Observer. */
+    void add(const intrusive_ptr<Observer>&);
+    
   private:
     typedef boost::ptr_vector<SessionState> Suspended;
     typedef std::set<framing::Uuid> Active;
+    typedef std::vector<intrusive_ptr<Observer> > Observers;
 
     void erase(const framing::Uuid&);             
     void eraseExpired();             
@@ -70,6 +84,7 @@
     Suspended suspended;
     Active active;
     uint32_t ack;
+    Observers observers;
     
   friend class SessionState; // removes deleted sessions from active set.
 };

Modified: incubator/qpid/branches/thegreatmerge/qpid/cpp/src/qpid/broker/SessionState.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/cpp/src/qpid/broker/SessionState.cpp?rev=620468&r1=620467&r2=620468&view=diff
==============================================================================
--- incubator/qpid/branches/thegreatmerge/qpid/cpp/src/qpid/broker/SessionState.cpp (original)
+++ incubator/qpid/branches/thegreatmerge/qpid/cpp/src/qpid/broker/SessionState.cpp Mon Feb 11 04:11:03 2008
@@ -36,23 +36,17 @@
 using qpid::management::Manageable;
 using qpid::management::Args;
 
-void SessionState::handleIn(AMQFrame& f) { semanticHandler->handle(f); }
-
-void SessionState::handleOut(AMQFrame& f) {
-    assert(handler);
-    handler->out.handle(f);
-}
-
 SessionState::SessionState(
-    SessionManager& f, SessionHandler& h, uint32_t timeout_, uint32_t ack) 
+    SessionManager* f, SessionHandler* h, uint32_t timeout_, uint32_t ack) 
     : framing::SessionState(ack, timeout_ > 0),
-      factory(f), handler(&h), id(true), timeout(timeout_),
-      broker(h.getConnection().broker),
-      version(h.getConnection().getVersion()),
+      factory(f), handler(h), id(true), timeout(timeout_),
+      broker(h->getConnection().broker),
+      version(h->getConnection().getVersion()),
       semanticHandler(new SemanticHandler(*this))
 {
-    // TODO aconway 2007-09-20: SessionManager may add plugin
-    // handlers to the chain.
+    in.next = semanticHandler.get();
+    out.next = &handler->out;
+
     getConnection().outputTasks.addOutputTask(&semanticHandler->getSemanticState());
 
     Manageable* parent = broker.GetVhostObject ();
@@ -66,8 +60,8 @@
             mgmtObject = management::Session::shared_ptr
                 (new management::Session (this, parent, id.str ()));
             mgmtObject->set_attached (1);
-            mgmtObject->set_clientRef (h.getConnection().GetManagementObject()->getObjectId());
-            mgmtObject->set_channelId (h.getChannel());
+            mgmtObject->set_clientRef (h->getConnection().GetManagementObject()->getObjectId());
+            mgmtObject->set_channelId (h->getChannel());
             mgmtObject->set_detachedLifespan (getTimeout());
             agent->addObject (mgmtObject);
         }
@@ -76,12 +70,10 @@
 
 SessionState::~SessionState() {
     // Remove ID from active session list.
-    factory.erase(getId());
-
+    if (factory)
+        factory->erase(getId());
     if (mgmtObject.get () != 0)
-    {
         mgmtObject->resourceDestroy ();
-    }
 }
 
 SessionHandler* SessionState::getHandler() {
@@ -101,7 +93,7 @@
 void SessionState::detach() {
     getConnection().outputTasks.removeOutputTask(&semanticHandler->getSemanticState());
     Mutex::ScopedLock l(lock);
-    handler = 0;
+    handler = 0; out.next = 0; 
     if (mgmtObject.get() != 0)
     {
         mgmtObject->set_attached  (0);
@@ -112,6 +104,7 @@
     {
         Mutex::ScopedLock l(lock);
         handler = &h;
+        out.next = &handler->out;
         if (mgmtObject.get() != 0)
         {
             mgmtObject->set_attached (1);

Modified: incubator/qpid/branches/thegreatmerge/qpid/cpp/src/qpid/broker/SessionState.h
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/cpp/src/qpid/broker/SessionState.h?rev=620468&r1=620467&r2=620468&view=diff
==============================================================================
--- incubator/qpid/branches/thegreatmerge/qpid/cpp/src/qpid/broker/SessionState.h (original)
+++ incubator/qpid/branches/thegreatmerge/qpid/cpp/src/qpid/broker/SessionState.h Mon Feb 11 04:11:03 2008
@@ -58,7 +58,7 @@
  * themselves have state. 
  */
 class SessionState : public framing::SessionState,
-    public framing::FrameHandler::InOutHandler,
+    public framing::FrameHandler::Chains,
     public sys::OutputControl,
     public management::Manageable
 {
@@ -90,18 +90,15 @@
     management::Manageable::status_t
         ManagementMethod (uint32_t methodId, management::Args& args);
 
-  protected:
-    void handleIn(framing::AMQFrame&);
-    void handleOut(framing::AMQFrame&);
-    
-  private:
-    // SessionManager creates sessions.
-    SessionState(SessionManager&,
-                 SessionHandler& out,
+    // Normally SessionManager creates sessions.
+    SessionState(SessionManager*,
+                 SessionHandler* out,
                  uint32_t timeout,
                  uint32_t ackInterval);
     
-    SessionManager& factory;
+
+  private:
+    SessionManager* factory;
     SessionHandler* handler;    
     framing::Uuid id;
     uint32_t timeout;

Modified: incubator/qpid/branches/thegreatmerge/qpid/cpp/src/qpid/broker/Timer.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/cpp/src/qpid/broker/Timer.cpp?rev=620468&r1=620467&r2=620468&view=diff
==============================================================================
--- incubator/qpid/branches/thegreatmerge/qpid/cpp/src/qpid/broker/Timer.cpp (original)
+++ incubator/qpid/branches/thegreatmerge/qpid/cpp/src/qpid/broker/Timer.cpp Mon Feb 11 04:11:03 2008
@@ -85,17 +85,13 @@
 
 void Timer::stop()
 {
-    signalStop();
-    runner.join();
-}
-
-void Timer::signalStop()
-{
-    Monitor::ScopedLock l(monitor);
-    if (active) {
+    {
+        Monitor::ScopedLock l(monitor);
+        if (!active) return;
         active = false;
         monitor.notifyAll();
     }
+    runner.join();
 }
 
 bool Later::operator()(const intrusive_ptr<TimerTask>& a,

Modified: incubator/qpid/branches/thegreatmerge/qpid/cpp/src/qpid/broker/Timer.h
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/cpp/src/qpid/broker/Timer.h?rev=620468&r1=620467&r2=620468&view=diff
==============================================================================
--- incubator/qpid/branches/thegreatmerge/qpid/cpp/src/qpid/broker/Timer.h (original)
+++ incubator/qpid/branches/thegreatmerge/qpid/cpp/src/qpid/broker/Timer.h Mon Feb 11 04:11:03 2008
@@ -59,7 +59,6 @@
     bool active;
 
     virtual void run();
-    void signalStop();
 
   public:
     Timer();

Modified: incubator/qpid/branches/thegreatmerge/qpid/cpp/src/qpid/client/Connector.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/cpp/src/qpid/client/Connector.cpp?rev=620468&r1=620467&r2=620468&view=diff
==============================================================================
--- incubator/qpid/branches/thegreatmerge/qpid/cpp/src/qpid/client/Connector.cpp (original)
+++ incubator/qpid/branches/thegreatmerge/qpid/cpp/src/qpid/client/Connector.cpp Mon Feb 11 04:11:03 2008
@@ -29,12 +29,15 @@
 #include "qpid/sys/Poller.h"
 #include "qpid/Msg.h"
 #include <boost/bind.hpp>
+#include <boost/format.hpp>
 
 namespace qpid {
 namespace client {
 
 using namespace qpid::sys;
 using namespace qpid::framing;
+using boost::format;
+using boost::str;
 
 Connector::Connector(
     ProtocolVersion ver, bool _debug, uint32_t buffer_size
@@ -59,6 +62,7 @@
     Mutex::ScopedLock l(closedLock);
     assert(closed);
     socket.connect(host, port);
+    identifier=str(format("[%1% %2%]") % socket.getLocalPort() % socket.getPeerAddress());
     closed = false;
     poller = Poller::shared_ptr(new Poller);
     aio = new AsynchIO(socket,
@@ -174,7 +178,9 @@
     ~Buff() { delete [] bytes;}
 };
 
-Connector::Writer::Writer() : aio(0), buffer(0), lastEof(0) {}
+Connector::Writer::Writer() : aio(0), buffer(0), lastEof(0) 
+{
+}
 
 Connector::Writer::~Writer() { delete buffer; }
 
@@ -182,6 +188,7 @@
     Mutex::ScopedLock l(lock);
     aio = a;
     newBuffer(l);
+    identifier = str(format("[%1% %2%]") % aio->getSocket().getLocalPort() % aio->getSocket().getPeerAddress());
 }
 
 void Connector::Writer::handle(framing::AMQFrame& frame) { 
@@ -191,7 +198,7 @@
         lastEof = frames.size();
         aio->notifyPendingWrite();
     }
-    QPID_LOG(trace, "SENT [" << this << "]: " << frame);
+    QPID_LOG(trace, "SENT " << identifier << ": " << frame);
 }
 
 void Connector::Writer::writeOne(const Mutex::ScopedLock& l) {
@@ -234,7 +241,7 @@
 
     AMQFrame frame;
     while(frame.decode(in)){
-        QPID_LOG(trace, "RECV [" << this << "]: " << frame);
+        QPID_LOG(trace, "RECV " << identifier << ": " << frame);
         input->received(frame);
     }
     // TODO: unreading needs to go away, and when we can cope

Modified: incubator/qpid/branches/thegreatmerge/qpid/cpp/src/qpid/client/Connector.h
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/cpp/src/qpid/client/Connector.h?rev=620468&r1=620467&r2=620468&view=diff
==============================================================================
--- incubator/qpid/branches/thegreatmerge/qpid/cpp/src/qpid/client/Connector.h (original)
+++ incubator/qpid/branches/thegreatmerge/qpid/cpp/src/qpid/client/Connector.h Mon Feb 11 04:11:03 2008
@@ -59,6 +59,7 @@
         size_t lastEof; // Position after last EOF in frames
         framing::Buffer encode;
         size_t framesEncoded;
+        std::string identifier;
         
         void writeOne(const sys::Mutex::ScopedLock&);
         void newBuffer(const sys::Mutex::ScopedLock&);

Modified: incubator/qpid/branches/thegreatmerge/qpid/cpp/src/qpid/client/LocalQueue.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/cpp/src/qpid/client/LocalQueue.cpp?rev=620468&r1=620467&r2=620468&view=diff
==============================================================================
--- incubator/qpid/branches/thegreatmerge/qpid/cpp/src/qpid/client/LocalQueue.cpp (original)
+++ incubator/qpid/branches/thegreatmerge/qpid/cpp/src/qpid/client/LocalQueue.cpp Mon Feb 11 04:11:03 2008
@@ -47,11 +47,18 @@
 
 void LocalQueue::setAckPolicy(AckPolicy a) { autoAck=a; }
 
-bool LocalQueue::empty() 
+bool LocalQueue::empty() const
 { 
     if (!queue)
         throw ClosedException();
-    return queue->isEmpty(); 
+    return queue->empty(); 
+}
+
+size_t LocalQueue::size() const
+{ 
+    if (!queue)
+        throw ClosedException();
+    return queue->size(); 
 }
 
 }} // namespace qpid::client

Modified: incubator/qpid/branches/thegreatmerge/qpid/cpp/src/qpid/client/LocalQueue.h
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/cpp/src/qpid/client/LocalQueue.h?rev=620468&r1=620467&r2=620468&view=diff
==============================================================================
--- incubator/qpid/branches/thegreatmerge/qpid/cpp/src/qpid/client/LocalQueue.h (original)
+++ incubator/qpid/branches/thegreatmerge/qpid/cpp/src/qpid/client/LocalQueue.h Mon Feb 11 04:11:03 2008
@@ -44,8 +44,8 @@
      *@exception ClosedException if subscription has been closed.
      */
     Message pop();
-    bool empty();
-
+    bool empty() const;
+    size_t size() const;
     void setAckPolicy(AckPolicy);
 
   private:

Modified: incubator/qpid/branches/thegreatmerge/qpid/cpp/src/qpid/cluster/Cluster.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/cpp/src/qpid/cluster/Cluster.cpp?rev=620468&r1=620467&r2=620468&view=diff
==============================================================================
--- incubator/qpid/branches/thegreatmerge/qpid/cpp/src/qpid/cluster/Cluster.cpp (original)
+++ incubator/qpid/branches/thegreatmerge/qpid/cpp/src/qpid/cluster/Cluster.cpp Mon Feb 11 04:11:03 2008
@@ -17,10 +17,12 @@
  */
 
 #include "Cluster.h"
+#include "qpid/broker/SessionState.h"
 #include "qpid/framing/AMQFrame.h"
 #include "qpid/framing/ClusterNotifyBody.h"
 #include "qpid/log/Statement.h"
 #include <boost/bind.hpp>
+#include <boost/scoped_array.hpp>
 #include <algorithm>
 #include <iterator>
 #include <map>
@@ -30,7 +32,70 @@
 using namespace qpid::framing;
 using namespace qpid::sys;
 using namespace std;
+using broker::SessionState;
 
+namespace {
+
+// Beginning of inbound chain: send to cluster.
+struct ClusterSendHandler : public FrameHandler {
+    SessionState& session;
+    Cluster& cluster;
+    bool busy;
+    Monitor lock;
+    
+    ClusterSendHandler(SessionState& s, Cluster& c) : session(s), cluster(c), busy(false) {}
+
+    void handle(AMQFrame& f) {
+        Mutex::ScopedLock l(lock);
+        assert(!busy);
+        // FIXME aconway 2008-01-29: refcount Sessions.
+        // session.addRef();             // Keep the session till the message is self delivered.
+        cluster.send(f, next);        // Indirectly send to next via cluster.
+
+        // FIXME aconway 2008-01-29: need to get this blocking out of the loop.
+        // But cluster needs to agree on order of side-effects on the shared model.
+        // OK for wiring to block, for messages use queue tokens?
+        // Both in & out transfers must be orderd per queue.
+        // May need out-of-order completion.
+        busy=true;
+        while (busy) lock.wait();
+    }
+};
+
+// Next in inbound chain, self delivered from cluster.
+struct ClusterDeliverHandler : public FrameHandler {
+    Cluster& cluster;
+    ClusterSendHandler& sender;
+
+    ClusterDeliverHandler(ClusterSendHandler& prev, Cluster& c) : cluster(c), sender(prev) {}
+    
+    void handle(AMQFrame& f) {
+        next->handle(f);
+        Mutex::ScopedLock l(sender.lock);
+        sender.busy=false;
+        sender.lock.notify();
+    }
+};
+
+// FIXME aconway 2008-01-29: IList
+void insert(FrameHandler::Chain& c, FrameHandler* h) {
+    h->next = c.next;
+    c.next = h;
+}
+
+struct SessionObserver : public broker::SessionManager::Observer {
+    Cluster& cluster;
+    SessionObserver(Cluster& c) : cluster(c) {}
+    
+    void opened(SessionState& s) {
+        // FIXME aconway 2008-01-29: IList for memory management.
+        ClusterSendHandler* sender=new ClusterSendHandler(s, cluster);
+        ClusterDeliverHandler* deliverer=new ClusterDeliverHandler(*sender, cluster);
+        insert(s.in, deliverer);
+        insert(s.in, sender);
+    }
+};
+}
 
 ostream& operator <<(ostream& out, const Cluster& cluster) {
     return out << "cluster[" << cluster.name.str() << " " << cluster.self << "]";
@@ -46,13 +111,11 @@
     return out;
 }
 
-Cluster::Cluster(const std::string& name_, const std::string& url_, broker::Broker& broker) :
-    FrameHandler(&sessions), 
+Cluster::Cluster(const std::string& name_, const Url& url_, broker::Broker&) :
     cpg(*this),
     name(name_),
-    url(url_), 
-    self(Id::self(cpg)),
-    sessions(broker, *this)
+    url(url_),
+    observer(new SessionObserver(*this))
 {
     QPID_LOG(trace, *this << " Joining cluster: " << name_);
     cpg.join(name);
@@ -78,18 +141,19 @@
     }
 }
 
-void Cluster::handle(AMQFrame& frame) {
+void Cluster::send(AMQFrame& frame, FrameHandler* next) {
     QPID_LOG(trace, *this << " SEND: " << frame);
-    Buffer buf(frame.size());
+    char data[65536]; // FIXME aconway 2008-01-29: Better buffer handling.
+    Buffer buf(data);
     frame.encode(buf);
-    buf.flip();
-    iovec iov = { buf.start(), frame.size() };
+    buf.putRawData((uint8_t*)&next, sizeof(next)); // Tag the frame with the next pointer.
+    iovec iov = { data, frame.size()+sizeof(next) };
     cpg.mcast(name, &iov, 1);
 }
 
 void Cluster::notify() {
-    AMQFrame frame(in_place<ClusterNotifyBody>(ProtocolVersion(), url));
-    handle(frame);
+    AMQFrame frame(in_place<ClusterNotifyBody>(ProtocolVersion(), url.str()));
+    send(frame, 0);
 }
 
 size_t Cluster::size() const {
@@ -113,15 +177,25 @@
     void* msg,
     int msg_len)
 {
-    Id from(nodeid, pid);
-    Buffer buf(static_cast<char*>(msg), msg_len);
-    AMQFrame frame;
-    frame.decode(buf);
-    QPID_LOG(trace, *this << " RECV: " << frame << " from: " << from);
-    if (frame.getChannel() == 0)
-        handleClusterFrame(from, frame);
-    else
-        next->handle(frame);
+    try {
+        Id from(nodeid, pid);
+        Buffer buf(static_cast<char*>(msg), msg_len);
+        AMQFrame frame;
+        frame.decode(buf);
+        QPID_LOG(trace, *this << " RECV: " << frame << " from: " << from);
+        if (frame.getChannel() == 0)
+            handleClusterFrame(from, frame);
+        else if (from == self) {
+            FrameHandler* next;
+            buf.getRawData((uint8_t*)&next, sizeof(next));
+            next->handle(frame);
+        }
+        // FIXME aconway 2008-01-30: apply frames from foreign sessions.
+    }
+    catch (const std::exception& e) {
+        // FIXME aconway 2008-01-30: exception handling.
+        QPID_LOG(error, "Error handling frame from cluster " << e.what());
+    }
 }
 
 bool Cluster::wait(boost::function<bool(const Cluster&)> predicate,
@@ -144,6 +218,8 @@
     {
         Mutex::ScopedLock l(lock);
         members[from].url=notifyIn->getUrl();
+        if (!self.id && notifyIn->getUrl() == url.str()) 
+            self=from;
         lock.notifyAll();
         QPID_LOG(trace, *this << ": members joined: " << members);
     }

Modified: incubator/qpid/branches/thegreatmerge/qpid/cpp/src/qpid/cluster/Cluster.h
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/cpp/src/qpid/cluster/Cluster.h?rev=620468&r1=620467&r2=620468&view=diff
==============================================================================
--- incubator/qpid/branches/thegreatmerge/qpid/cpp/src/qpid/cluster/Cluster.h (original)
+++ incubator/qpid/branches/thegreatmerge/qpid/cpp/src/qpid/cluster/Cluster.h Mon Feb 11 04:11:03 2008
@@ -19,13 +19,15 @@
  *
  */
 
-#include "SessionManager.h"
 #include "Cpg.h"
 
+#include "qpid/broker/Broker.h"
 #include "qpid/sys/Monitor.h"
 #include "qpid/sys/Runnable.h"
 #include "qpid/sys/Thread.h"
 #include "qpid/log/Logger.h"
+#include "qpid/Url.h"
+
 
 #include <boost/optional.hpp>
 #include <boost/function.hpp>
@@ -36,21 +38,16 @@
 namespace qpid { namespace cluster {
 
 /**
- * Connection to the cluster. Maintains cluster membership
- * data.
- *
- * As FrameHandler, handles frames by sending them to the
- * cluster. Frames received from the cluster are sent to the next
- * FrameHandler in the chain.
+ * Connection to the cluster.
+ * Keeps cluster membership data.
  */
-class Cluster : public framing::FrameHandler,
-                private sys::Runnable, private Cpg::Handler
+class Cluster : private sys::Runnable, private Cpg::Handler
 {
   public:
     /** Details of a cluster member */
     struct Member {
-        Member(const std::string& url_=std::string()) : url(url_) {}
-        std::string url;        ///< Broker address.
+        Member(const Url& url_=Url()) : url(url_) {}
+        Url url;        ///< Broker address.
     };
     
     typedef std::vector<Member> MemberList;
@@ -60,11 +57,12 @@
      * @param name of the cluster.
      * @param url of this broker, sent to the cluster.
      */
-    Cluster(const std::string& name, const std::string& url, broker::Broker&);
+    Cluster(const std::string& name, const Url& url, broker::Broker&);
 
     virtual ~Cluster();
 
-    framing::HandlerUpdater& getHandlerUpdater() { return sessions; }
+    // FIXME aconway 2008-01-29: 
+    intrusive_ptr<broker::SessionManager::Observer> getObserver() { return observer; }
     
     /** Get the current cluster membership. */
     MemberList getMembers() const;
@@ -83,7 +81,7 @@
               sys::Duration timeout=sys::TIME_INFINITE) const;
 
     /** Send frame to the cluster */
-    void handle(framing::AMQFrame&);
+    void send(framing::AMQFrame&, framing::FrameHandler*);
     
   private:
     typedef Cpg::Id Id;
@@ -113,12 +111,12 @@
     mutable sys::Monitor lock;
     Cpg cpg;
     Cpg::Name name;
-    std::string url;
+    Url url;
     Id self;
     MemberMap members;
     sys::Thread dispatcher;
     boost::function<void()> callback;
-    SessionManager sessions;
+    intrusive_ptr<broker::SessionManager::Observer> observer;
 
   friend std::ostream& operator <<(std::ostream&, const Cluster&);
   friend std::ostream& operator <<(std::ostream&, const MemberMap::value_type&);

Modified: incubator/qpid/branches/thegreatmerge/qpid/cpp/src/qpid/cluster/ClusterPlugin.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/cpp/src/qpid/cluster/ClusterPlugin.cpp?rev=620468&r1=620467&r2=620468&view=diff
==============================================================================
--- incubator/qpid/branches/thegreatmerge/qpid/cpp/src/qpid/cluster/ClusterPlugin.cpp (original)
+++ incubator/qpid/branches/thegreatmerge/qpid/cpp/src/qpid/cluster/ClusterPlugin.cpp Mon Feb 11 04:11:03 2008
@@ -15,9 +15,12 @@
  * limitations under the License.
  *
  */
+#include <boost/program_options/value_semantic.hpp>
+
+
+
 #include "qpid/broker/Broker.h"
 #include "qpid/cluster/Cluster.h"
-#include "qpid/cluster/SessionManager.h"
 #include "qpid/Plugin.h"
 #include "qpid/Options.h"
 #include "qpid/shared_ptr.h"
@@ -25,25 +28,34 @@
 #include <boost/optional.hpp>
 #include <boost/utility/in_place_factory.hpp>
 
+
 namespace qpid {
 namespace cluster {
 
 using namespace std;
 
-struct ClusterPlugin : public Plugin {
+struct ClusterOptions : public Options {
+    string name;
+    string url;
+
+    ClusterOptions() : Options("Cluster Options") {
+        addOptions()
+            ("cluster-name", optValue(name, "NAME"), "Name of cluster to join")
+            ("cluster-url", optValue(url,"URL"),
+             "URL of this broker, advertized to the cluster.\n"
+             "Defaults to a URL listing all the local IP addresses\n");
+    }
+
+    Url getUrl(uint16_t port) const {
+        if (url.empty()) return Url::getIpAddressesUrl(port);
+        return Url(url);
+    }
+};
 
-    struct ClusterOptions : public Options {
-        string clusterName;
-        ClusterOptions() : Options("Cluster Options") {
-            addOptions()
-                ("cluster", optValue(clusterName, "NAME"),
-                 "Joins the cluster named NAME");
-        }
-    };
+struct ClusterPlugin : public Plugin {
 
     ClusterOptions options;
     boost::optional<Cluster> cluster;
-    boost::optional<SessionManager> sessions;
 
     Options* getOptions() { return &options; }
 
@@ -52,10 +64,12 @@
     void initialize(Plugin::Target& target) {
         broker::Broker* broker = dynamic_cast<broker::Broker*>(&target);
         // Only provide to a Broker, and only if the --cluster config is set.
-        if (broker && !options.clusterName.empty()) {
+        if (broker && !options.name.empty()) {
             assert(!cluster); // A process can only belong to one cluster.
-            cluster = boost::in_place(options.clusterName, broker->getUrl(), boost::ref(*broker));
-            broker->add(make_shared_ptr(&cluster->getHandlerUpdater(), nullDeleter));
+            cluster = boost::in_place(options.name,
+                                      options.getUrl(broker->getPort()),
+                                      boost::ref(*broker));
+            broker->getSessionManager().add(cluster->getObserver());	
         }
     }
 };

Modified: incubator/qpid/branches/thegreatmerge/qpid/cpp/src/qpid/cluster/Cpg.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/cpp/src/qpid/cluster/Cpg.cpp?rev=620468&r1=620467&r2=620468&view=diff
==============================================================================
--- incubator/qpid/branches/thegreatmerge/qpid/cpp/src/qpid/cluster/Cpg.cpp (original)
+++ incubator/qpid/branches/thegreatmerge/qpid/cpp/src/qpid/cluster/Cpg.cpp Mon Feb 11 04:11:03 2008
@@ -145,13 +145,6 @@
     return "Cannot mcast to CPG group "+group.str();
 }
 
-uint32_t Cpg::getLocalNoideId() const {
-    unsigned int nodeid;
-    check(cpg_local_get(handle, &nodeid), "Cannot get local node ID");
-    assert(nodeid <= std::numeric_limits<uint32_t>::max());
-    return nodeid;
-}
-
 ostream& operator<<(ostream& o, std::pair<cpg_address*,int> a) {
     ostream_iterator<Cpg::Id> i(o, " ");
     std::copy(a.first, a.first+a.second, i);
@@ -176,10 +169,6 @@
     return out << string(name.value, name.length);
 }
 
-
-Cpg::Id Cpg::Id::self(Cpg& cpg) {
-    return Id(cpg.getLocalNoideId(), getpid());
-}
 
 }} // namespace qpid::cluster
 

Modified: incubator/qpid/branches/thegreatmerge/qpid/cpp/src/qpid/cluster/Cpg.h
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/cpp/src/qpid/cluster/Cpg.h?rev=620468&r1=620467&r2=620468&view=diff
==============================================================================
--- incubator/qpid/branches/thegreatmerge/qpid/cpp/src/qpid/cluster/Cpg.h (original)
+++ incubator/qpid/branches/thegreatmerge/qpid/cpp/src/qpid/cluster/Cpg.h Mon Feb 11 04:11:03 2008
@@ -57,12 +57,10 @@
     
     struct Id {
         uint64_t id;
-        Id() : id(0) {}
+        Id(uint64_t n=0) : id(n) {}
         Id(uint32_t nodeid, uint32_t pid) { id=(uint64_t(nodeid)<<32)+ pid; }
         Id(const cpg_address& addr) : id(Id(addr.nodeid, addr.pid)) {}
 
-        static Id self(Cpg& cpg);
-
         operator uint64_t() const { return id; }
         uint32_t nodeId() const { return id >> 32; }
         pid_t pid() const { return id & 0xFFFF; }
@@ -132,8 +130,6 @@
 
     cpg_handle_t getHandle() const { return handle; }
 
-    uint32_t getLocalNoideId() const;
-    
   private:
     class Handles;
     struct ClearHandleOnExit;

Modified: incubator/qpid/branches/thegreatmerge/qpid/cpp/src/qpid/framing/FrameDefaultVisitor.h
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/cpp/src/qpid/framing/FrameDefaultVisitor.h?rev=620468&r1=620467&r2=620468&view=diff
==============================================================================
--- incubator/qpid/branches/thegreatmerge/qpid/cpp/src/qpid/framing/FrameDefaultVisitor.h (original)
+++ incubator/qpid/branches/thegreatmerge/qpid/cpp/src/qpid/framing/FrameDefaultVisitor.h Mon Feb 11 04:11:03 2008
@@ -24,14 +24,12 @@
 #include "qpid/framing/MethodBodyDefaultVisitor.h"
 #include "qpid/framing/AMQBody.h"
 #include "qpid/framing/AMQMethodBody.h"
+#include "qpid/framing/AMQHeaderBody.h"
+#include "qpid/framing/AMQContentBody.h"
+#include "qpid/framing/AMQHeartbeatBody.h"
 
 namespace qpid {
 namespace framing {
-
-class AMQHeaderBody;
-class AMQContentBody;
-class AMQHeartbeatBody;
-
 /**
  * Visitor for all concrete frame body types, combines
  * AMQBodyConstVisitor and MethodBodyDefaultVisitor.
@@ -45,12 +43,12 @@
                              protected MethodBodyDefaultVisitor
 {
     virtual void defaultVisit(const AMQBody&) = 0;
+    void defaultVisit(const AMQMethodBody& method) { defaultVisit(static_cast<const AMQBody&>(method)); }
 
     void visit(const AMQHeaderBody& b) { defaultVisit(b); }
     void visit(const AMQContentBody& b) { defaultVisit(b); }
     void visit(const AMQHeartbeatBody& b) { defaultVisit(b); }
     void visit(const AMQMethodBody& b) { b.accept(static_cast<MethodBodyDefaultVisitor&>(*this)); }
-    void defaultVisit(const AMQMethodBody& method) { defaultVisit(static_cast<const AMQBody&>(method)); }
     
     using AMQBodyConstVisitor::visit;
     using MethodBodyDefaultVisitor::visit;

Modified: incubator/qpid/branches/thegreatmerge/qpid/cpp/src/qpid/log/Statement.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/cpp/src/qpid/log/Statement.cpp?rev=620468&r1=620467&r2=620468&view=diff
==============================================================================
--- incubator/qpid/branches/thegreatmerge/qpid/cpp/src/qpid/log/Statement.cpp (original)
+++ incubator/qpid/branches/thegreatmerge/qpid/cpp/src/qpid/log/Statement.cpp Mon Feb 11 04:11:03 2008
@@ -18,14 +18,42 @@
 
 #include "Statement.h"
 #include "Logger.h"
+#include <boost/bind.hpp>
 #include <stdexcept>
+#include <algorithm>
 #include <syslog.h>
 
 namespace qpid {
 namespace log {
 
+namespace {
+using namespace std;
+
+struct IsControl { bool operator()(unsigned char c) { return c < 32; } };
+
+bool isClean(const std::string& str) {
+    return std::find_if(str.begin(), str.end(), IsControl()) == str.end();
+}
+
+std::string quote(const std::string& str) {
+    IsControl isControl;
+    size_t n = std::count_if(str.begin(), str.end(), isControl);
+    std::string ret;
+    ret.reserve(str.size()+n); // Avoid extra allocations.
+    for (string::const_iterator i = str.begin(); i != str.end(); ++i) {
+        if (isControl(*i)) {
+            ret.push_back('^');
+            ret.push_back((*i)+64);
+        }
+        else ret.push_back(*i);
+    }
+    return ret;
+}
+
+}
+
 void Statement::log(const std::string& message) {
-    Logger::instance().log(*this,message);
+    Logger::instance().log(*this, isClean(message) ? message : quote(message));
 }
 
 Statement::Initializer::Initializer(Statement& s) : statement(s) {

Modified: incubator/qpid/branches/thegreatmerge/qpid/cpp/src/qpid/sys/Acceptor.h
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/cpp/src/qpid/sys/Acceptor.h?rev=620468&r1=620467&r2=620468&view=diff
==============================================================================
--- incubator/qpid/branches/thegreatmerge/qpid/cpp/src/qpid/sys/Acceptor.h (original)
+++ incubator/qpid/branches/thegreatmerge/qpid/cpp/src/qpid/sys/Acceptor.h Mon Feb 11 04:11:03 2008
@@ -38,6 +38,9 @@
     virtual uint16_t getPort() const = 0;
     virtual std::string getHost() const = 0;
     virtual void run(ConnectionInputHandlerFactory* factory) = 0;
+    virtual void connect(const std::string& host, int16_t port, ConnectionInputHandlerFactory* factory) = 0;
+
+    /** Note: this function is async-signal safe */
     virtual void shutdown() = 0;
 };
 

Modified: incubator/qpid/branches/thegreatmerge/qpid/cpp/src/qpid/sys/AsynchIOAcceptor.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/cpp/src/qpid/sys/AsynchIOAcceptor.cpp?rev=620468&r1=620467&r2=620468&view=diff
==============================================================================
--- incubator/qpid/branches/thegreatmerge/qpid/cpp/src/qpid/sys/AsynchIOAcceptor.cpp (original)
+++ incubator/qpid/branches/thegreatmerge/qpid/cpp/src/qpid/sys/AsynchIOAcceptor.cpp Mon Feb 11 04:11:03 2008
@@ -30,6 +30,7 @@
 #include "qpid/sys/ConnectionInputHandler.h"
 #include "qpid/sys/ConnectionInputHandlerFactory.h"
 #include "qpid/framing/reply_exceptions.h"
+#include "qpid/framing/AMQDataBlock.h"
 #include "qpid/framing/Buffer.h"
 #include "qpid/framing/AMQFrame.h"
 #include "qpid/log/Statement.h"
@@ -53,6 +54,7 @@
     AsynchIOAcceptor(int16_t port, int backlog, int threads);
     ~AsynchIOAcceptor() {}
     void run(ConnectionInputHandlerFactory* factory);
+    void connect(const std::string& host, int16_t port, ConnectionInputHandlerFactory* factory);
     void shutdown();
         
     uint16_t getPort() const;
@@ -92,13 +94,17 @@
     bool initiated;
     bool readError;
     std::string identifier;
+    bool isClient;
+
+    void write(const framing::AMQDataBlock&);
 
   public:
     AsynchIOHandler() :
         inputHandler(0),
         frameQueueClosed(false),
         initiated(false),
-        readError(false)
+        readError(false),
+        isClient(false)
     {}
 	
     ~AsynchIOHandler() {
@@ -107,6 +113,8 @@
         delete inputHandler;
     }
 
+    void setClient() { isClient = true; }
+
     void init(AsynchIO* a, ConnectionInputHandler* h) {
         aio = a;
         inputHandler = h;
@@ -179,11 +187,50 @@
         t[i].join();
     }
 }
+    
+void AsynchIOAcceptor::connect(const std::string& host, int16_t port, ConnectionInputHandlerFactory* f)
+{
+    Socket* socket = new Socket();//Should be deleted by handle when socket closes
+    socket->connect(host, port);
+    AsynchIOHandler* async = new AsynchIOHandler; 
+    async->setClient();
+    ConnectionInputHandler* handler = f->create(async, *socket);
+    AsynchIO* aio = new AsynchIO(*socket,
+                                 boost::bind(&AsynchIOHandler::readbuff, async, _1, _2),
+                                 boost::bind(&AsynchIOHandler::eof, async, _1),
+                                 boost::bind(&AsynchIOHandler::disconnect, async, _1),
+                                 boost::bind(&AsynchIOHandler::closedSocket, async, _1, _2),
+                                 boost::bind(&AsynchIOHandler::nobuffs, async, _1),
+                                 boost::bind(&AsynchIOHandler::idle, async, _1));
+    async->init(aio, handler);
+
+    // Give connection some buffers to use
+    for (int i = 0; i < 4; i++) {
+        aio->queueReadBuffer(new Buff);
+    }
+    aio->start(poller);
+
+}
+
 
 void AsynchIOAcceptor::shutdown() {
+    // NB: this function must be async-signal safe, it must not
+    // call any function that is not async-signal safe.
     poller->shutdown();
 }
 
+
+void AsynchIOHandler::write(const framing::AMQDataBlock& data)
+{
+    AsynchIO::BufferBase* buff = aio->getQueuedBuffer();
+    if (!buff)
+        buff = new Buff;
+    framing::Buffer out(buff->bytes, buff->byteCount);
+    data.encode(out);
+    buff->dataCount = data.size();
+    aio->queueWrite(buff);
+}
+
 // Output side
 void AsynchIOHandler::send(framing::AMQFrame& frame) {
     // TODO: Need to find out if we are in the callback context,
@@ -274,6 +321,12 @@
 }
 
 void AsynchIOHandler::idle(AsynchIO&){
+    if (isClient && !initiated) {
+        //get & write protocol header from upper layers
+        write(inputHandler->getInitiation());
+        initiated = true;
+        return;
+    }
     ScopedLock<Mutex> l(frameQueueLock);
 	
     if (frameQueue.empty()) {

Modified: incubator/qpid/branches/thegreatmerge/qpid/cpp/src/qpid/sys/BlockingQueue.h
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/cpp/src/qpid/sys/BlockingQueue.h?rev=620468&r1=620467&r2=620468&view=diff
==============================================================================
--- incubator/qpid/branches/thegreatmerge/qpid/cpp/src/qpid/sys/BlockingQueue.h (original)
+++ incubator/qpid/branches/thegreatmerge/qpid/cpp/src/qpid/sys/BlockingQueue.h Mon Feb 11 04:11:03 2008
@@ -103,9 +103,13 @@
         return closed;
     }
 
-    bool isEmpty() const {
+    bool empty() const {
         Waitable::ScopedLock l(lock);
         return queue.empty();
+    }    
+    size_t size() const {
+        Waitable::ScopedLock l(lock);
+        return queue.size();
     }    
 
   private:

Modified: incubator/qpid/branches/thegreatmerge/qpid/cpp/src/qpid/sys/ConnectionInputHandler.h
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/cpp/src/qpid/sys/ConnectionInputHandler.h?rev=620468&r1=620467&r2=620468&view=diff
==============================================================================
--- incubator/qpid/branches/thegreatmerge/qpid/cpp/src/qpid/sys/ConnectionInputHandler.h (original)
+++ incubator/qpid/branches/thegreatmerge/qpid/cpp/src/qpid/sys/ConnectionInputHandler.h Mon Feb 11 04:11:03 2008
@@ -36,6 +36,7 @@
         public TimeoutHandler, public OutputTask
     {
     public:
+        virtual qpid::framing::ProtocolInitiation getInitiation() = 0;
         virtual void closed() = 0;
     };
 

Modified: incubator/qpid/branches/thegreatmerge/qpid/cpp/src/qpid/sys/Poller.h
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/cpp/src/qpid/sys/Poller.h?rev=620468&r1=620467&r2=620468&view=diff
==============================================================================
--- incubator/qpid/branches/thegreatmerge/qpid/cpp/src/qpid/sys/Poller.h (original)
+++ incubator/qpid/branches/thegreatmerge/qpid/cpp/src/qpid/sys/Poller.h Mon Feb 11 04:11:03 2008
@@ -96,6 +96,7 @@
     
     Poller();
     ~Poller();
+    /** Note: this function is async-signal safe */
     void shutdown();
 
     void addFd(PollerHandle& handle, Direction dir);

Modified: incubator/qpid/branches/thegreatmerge/qpid/cpp/src/qpid/sys/Socket.h
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/cpp/src/qpid/sys/Socket.h?rev=620468&r1=620467&r2=620468&view=diff
==============================================================================
--- incubator/qpid/branches/thegreatmerge/qpid/cpp/src/qpid/sys/Socket.h (original)
+++ incubator/qpid/branches/thegreatmerge/qpid/cpp/src/qpid/sys/Socket.h Mon Feb 11 04:11:03 2008
@@ -87,7 +87,16 @@
      * socket
      */
     std::string getPeerAddress() const;
+    /** 
+     * Returns an address (host and port) for the local end of the
+     * socket
+     */
+    std::string getLocalAddress() const;
 
+    uint getLocalPort() const;
+    uint getRemotePort() const;
+
+    
     /** Accept a connection from a socket that is already listening
      * and has an incoming connection
      */

Modified: incubator/qpid/branches/thegreatmerge/qpid/cpp/src/qpid/sys/epoll/EpollPoller.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/cpp/src/qpid/sys/epoll/EpollPoller.cpp?rev=620468&r1=620467&r2=620468&view=diff
==============================================================================
--- incubator/qpid/branches/thegreatmerge/qpid/cpp/src/qpid/sys/epoll/EpollPoller.cpp (original)
+++ incubator/qpid/branches/thegreatmerge/qpid/cpp/src/qpid/sys/epoll/EpollPoller.cpp Mon Feb 11 04:11:03 2008
@@ -253,6 +253,9 @@
 }
 
 void Poller::shutdown() {
+    // NB: this function must be async-signal safe, it must not
+    // call any function that is not async-signal safe.
+
     // Allow sloppy code to shut us down more than once
     if (impl->isShutdown)
         return;

Modified: incubator/qpid/branches/thegreatmerge/qpid/cpp/src/qpid/sys/posix/Socket.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/cpp/src/qpid/sys/posix/Socket.cpp?rev=620468&r1=620467&r2=620468&view=diff
==============================================================================
--- incubator/qpid/branches/thegreatmerge/qpid/cpp/src/qpid/sys/posix/Socket.cpp (original)
+++ incubator/qpid/branches/thegreatmerge/qpid/cpp/src/qpid/sys/posix/Socket.cpp Mon Feb 11 04:11:03 2008
@@ -30,6 +30,7 @@
 #include <sys/errno.h>
 #include <netinet/in.h>
 #include <netdb.h>
+#include <cstdlib>
 
 #include <boost/format.hpp>
 
@@ -45,6 +46,7 @@
     int fd;
 
     std::string getName(bool local, bool includeService = false) const;   
+    std::string getService(bool local) const;   
 };
 
 std::string SocketPrivate::getName(bool local, bool includeService) const
@@ -77,6 +79,28 @@
     }
 }
 
+std::string SocketPrivate::getService(bool local) const
+{
+    ::sockaddr_storage name; // big enough for any socket address    
+    ::socklen_t namelen = sizeof(name);
+    
+    int result = -1;
+    if (local) {
+        result = ::getsockname(fd, (::sockaddr*)&name, &namelen);
+    } else {
+        result = ::getpeername(fd, (::sockaddr*)&name, &namelen);
+    }
+
+    QPID_POSIX_CHECK(result);
+
+    char servName[NI_MAXSERV];
+    if (int rc=::getnameinfo((::sockaddr*)&name, namelen, 0, 0, 
+                                 servName, sizeof(servName), 
+                                 NI_NUMERICHOST | NI_NUMERICSERV) != 0)
+        throw QPID_POSIX_ERROR(rc);
+    return servName;
+}
+
 Socket::Socket() :
 	impl(new SocketPrivate)
 {
@@ -229,6 +253,21 @@
 std::string Socket::getPeerAddress() const
 {
     return impl->getName(false, true);
+}
+
+std::string Socket::getLocalAddress() const
+{
+    return impl->getName(true, true);
+}
+
+uint Socket::getLocalPort() const
+{
+    return atoi(impl->getService(true).c_str());
+}
+
+uint Socket::getRemotePort() const
+{
+    return atoi(impl->getService(true).c_str());
 }
 
 int Socket::toFd() const {

Modified: incubator/qpid/branches/thegreatmerge/qpid/cpp/src/qpidd.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/cpp/src/qpidd.cpp?rev=620468&r1=620467&r2=620468&view=diff
==============================================================================
--- incubator/qpid/branches/thegreatmerge/qpid/cpp/src/qpidd.cpp (original)
+++ incubator/qpid/branches/thegreatmerge/qpid/cpp/src/qpidd.cpp Mon Feb 11 04:11:03 2008
@@ -120,8 +120,9 @@
 shared_ptr<Broker> brokerPtr;
 auto_ptr<QpiddOptions> options;
 
-void shutdownHandler(int signal){
-    QPID_LOG(notice, "Shutting down on signal " << signal);
+void shutdownHandler(int /*signal*/){
+    // Note: do not call any async-signal unsafe functions here.
+    // Do any extra shtudown actions in main() after broker->run()
     brokerPtr->shutdown();
 }
 
@@ -155,7 +156,7 @@
 
 void loadModuleDir (string dirname, bool isDefault)
 {
-    fs::path dirPath (dirname);
+    fs::path dirPath (dirname, fs::native);
 
     if (!fs::exists (dirPath))
     {
@@ -245,7 +246,8 @@
             brokerPtr.reset(new Broker(options->broker));
             if (options->broker.port == 0)
                 cout << uint16_t(brokerPtr->getPort()) << endl; 
-            brokerPtr->run(); 
+            brokerPtr->run();
+            QPID_LOG(notice, "Shutting down.");
         }
         return 0;
     }

Propchange: incubator/qpid/branches/thegreatmerge/qpid/cpp/src/tests/
------------------------------------------------------------------------------
--- svn:ignore (original)
+++ svn:ignore Mon Feb 11 04:11:03 2008
@@ -14,3 +14,5 @@
 qpidd.vglog
 txtest
 latencytest
+ais_test
+cluster.ports

Modified: incubator/qpid/branches/thegreatmerge/qpid/cpp/src/tests/BrokerFixture.h
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/cpp/src/tests/BrokerFixture.h?rev=620468&r1=620467&r2=620468&view=diff
==============================================================================
--- incubator/qpid/branches/thegreatmerge/qpid/cpp/src/tests/BrokerFixture.h (original)
+++ incubator/qpid/branches/thegreatmerge/qpid/cpp/src/tests/BrokerFixture.h Mon Feb 11 04:11:03 2008
@@ -43,7 +43,10 @@
     BrokerFixture() {
         Broker::Options opts;
         opts.port=0;
+        // Management doesn't play well with multiple in-process brokers.
+        opts.enableMgmt=false;  
         opts.workerThreads=1;
+        opts.dataDir="";
         broker = Broker::create(opts);
         // TODO aconway 2007-12-05: At one point BrokerFixture
         // tests could hang in Connection ctor if the following

Modified: incubator/qpid/branches/thegreatmerge/qpid/cpp/src/tests/ClientSessionTest.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/cpp/src/tests/ClientSessionTest.cpp?rev=620468&r1=620467&r2=620468&view=diff
==============================================================================
--- incubator/qpid/branches/thegreatmerge/qpid/cpp/src/tests/ClientSessionTest.cpp (original)
+++ incubator/qpid/branches/thegreatmerge/qpid/cpp/src/tests/ClientSessionTest.cpp Mon Feb 11 04:11:03 2008
@@ -18,35 +18,42 @@
  * under the License.
  *
  */
-#include "qpid_test_plugin.h"
+#include "unit_test.h"
 #include "BrokerFixture.h"
 #include "qpid/client/Dispatcher.h"
+#include "qpid/sys/Thread.h"
+#include "qpid/sys/Runnable.h"
 #include "qpid/client/Session_0_10.h"
 #include "qpid/framing/TransferContent.h"
 #include "qpid/framing/reply_exceptions.h"
 
 #include <boost/optional.hpp>
+#include <boost/lexical_cast.hpp>
 
-#include <list>
+#include <vector>
+
+QPID_AUTO_TEST_SUITE(ClientSessionTest)
 
 using namespace qpid::client;
 using namespace qpid::client::arg;
 using namespace qpid::framing;
 using namespace qpid;
+using std::string;
+using std::cout;
+using std::endl;
 using namespace boost;
 
-struct DummyListener : public MessageListener
-{
-    std::list<Message> messages;
-    std::string name;
+
+struct DummyListener : public sys::Runnable, public MessageListener {
+    std::vector<Message> messages;
+    string name;
     uint expected;
-    uint count;
     Dispatcher dispatcher;
 
-    DummyListener(Session_0_10& session, const std::string& _name, uint _expected) : name(_name), expected(_expected), count(0), 
-                                                                                dispatcher(session) {}
+    DummyListener(Session_0_10& session, const string& n, uint ex) :
+        name(n), expected(ex), dispatcher(session) {}
 
-    void listen()
+    void run()
     {
         dispatcher.listen(name, this);
         dispatcher.run();
@@ -55,117 +62,127 @@
     void received(Message& msg)
     {
         messages.push_back(msg);
-        if (++count == expected) {
+        if (--expected == 0)
             dispatcher.stop();
-        }
     }
 };
 
-class ClientSessionTest : public CppUnit::TestCase, public ProxySessionFixture
+struct ClientSessionFixture : public ProxySessionFixture
 {
-    CPPUNIT_TEST_SUITE(ClientSessionTest);
-    CPPUNIT_TEST(testQueueQuery);
-    CPPUNIT_TEST(testTransfer);
-    CPPUNIT_TEST(testDispatcher);
-    CPPUNIT_TEST(testResumeExpiredError);
-    CPPUNIT_TEST(testUseSuspendedError);
-    CPPUNIT_TEST(testSuspendResume);
-    CPPUNIT_TEST_SUITE_END();
-
-  public:
-
-    void declareSubscribe(const std::string& q="my-queue",
-                          const std::string& dest="my-dest")
+    void declareSubscribe(const string& q="my-queue",
+                          const string& dest="my-dest")
     {
         session.queueDeclare(queue=q);
         session.messageSubscribe(queue=q, destination=dest, acquireMode=1);
         session.messageFlow(destination=dest, unit=0, value=0xFFFFFFFF);//messages
         session.messageFlow(destination=dest, unit=1, value=0xFFFFFFFF);//bytes
     }
+};
 
-    void testQueueQuery() 
-    {
-        session =connection.newSession();
-        session.queueDeclare(queue="my-queue", alternateExchange="amq.fanout", exclusive=true, autoDelete=true);
-        TypedResult<QueueQueryResult> result = session.queueQuery(std::string("my-queue"));
-        CPPUNIT_ASSERT_EQUAL(false, result.get().getDurable());
-        CPPUNIT_ASSERT_EQUAL(true, result.get().getExclusive());
-        CPPUNIT_ASSERT_EQUAL(std::string("amq.fanout"),
-                             result.get().getAlternateExchange());
-    }
+BOOST_FIXTURE_TEST_CASE(testQueueQuery, ClientSessionFixture) {
+    session =connection.newSession();
+    session.queueDeclare(queue="my-queue", alternateExchange="amq.fanout", exclusive=true, autoDelete=true);
+    TypedResult<QueueQueryResult> result = session.queueQuery(string("my-queue"));
+    BOOST_CHECK_EQUAL(false, result.get().getDurable());
+    BOOST_CHECK_EQUAL(true, result.get().getExclusive());
+    BOOST_CHECK_EQUAL(string("amq.fanout"),
+                      result.get().getAlternateExchange());
+}
 
-    void testTransfer()
-    {
-        session =connection.newSession();
-        declareSubscribe();
-        session.messageTransfer(content=TransferContent("my-message", "my-queue"));
-        //get & test the message:
-        FrameSet::shared_ptr msg = session.get();
-        CPPUNIT_ASSERT(msg->isA<MessageTransferBody>());
-        CPPUNIT_ASSERT_EQUAL(std::string("my-message"), msg->getContent());
-        //confirm receipt:
-        session.getExecution().completed(msg->getId(), true, true);
-    }
+BOOST_FIXTURE_TEST_CASE(testTransfer, ClientSessionFixture)
+{
+    session=connection.newSession();
+    declareSubscribe();
+    session.messageTransfer(content=TransferContent("my-message", "my-queue"));
+    //get & test the message:
+    FrameSet::shared_ptr msg = session.get();
+    BOOST_CHECK(msg->isA<MessageTransferBody>());
+    BOOST_CHECK_EQUAL(string("my-message"), msg->getContent());
+    //confirm receipt:
+    session.getExecution().completed(msg->getId(), true, true);
+}
 
-    void testDispatcher()
-    {
-        session =connection.newSession();
-        declareSubscribe();
+BOOST_FIXTURE_TEST_CASE(testDispatcher, ClientSessionFixture)
+{
+    session =connection.newSession();
+    declareSubscribe();
+    size_t count = 100;
+    for (size_t i = 0; i < count; ++i) 
+        session.messageTransfer(content=TransferContent(lexical_cast<string>(i), "my-queue"));
+    DummyListener listener(session, "my-dest", count);
+    listener.run();
+    BOOST_REQUIRE_EQUAL(count, listener.messages.size());        
+    for (size_t i = 0; i < count; ++i) 
+        BOOST_CHECK_EQUAL(lexical_cast<string>(i), listener.messages[i].getData());
+}
 
-        TransferContent msg1("One");
-        msg1.getDeliveryProperties().setRoutingKey("my-queue");
-        session.messageTransfer(content=msg1);
-
-        TransferContent msg2("Two");
-        msg2.getDeliveryProperties().setRoutingKey("my-queue");
-        session.messageTransfer(content=msg2);
-
-        TransferContent msg3("Three");
-        msg3.getDeliveryProperties().setRoutingKey("my-queue");
-        session.messageTransfer(content=msg3);
-                
-        DummyListener listener(session, "my-dest", 3);
-        listener.listen();
-        CPPUNIT_ASSERT_EQUAL((size_t) 3, listener.messages.size());        
-        CPPUNIT_ASSERT_EQUAL(std::string("One"), listener.messages.front().getData());
-        listener.messages.pop_front();
-        CPPUNIT_ASSERT_EQUAL(std::string("Two"), listener.messages.front().getData());
-        listener.messages.pop_front();
-        CPPUNIT_ASSERT_EQUAL(std::string("Three"), listener.messages.front().getData());
-        listener.messages.pop_front();
+/* FIXME aconway 2008-01-28: hangs
+BOOST_FIXTURE_TEST_CASE(testDispatcherThread, ClientSessionFixture)
+{
+    session =connection.newSession();
+    declareSubscribe();
+    size_t count = 10000;
+    DummyListener listener(session, "my-dest", count);
+    sys::Thread t(listener);
+    for (size_t i = 0; i < count; ++i) {
+        session.messageTransfer(content=TransferContent(lexical_cast<string>(i), "my-queue"));
+        if (i%100 == 0) cout << "T" << i << std::flush;
+    }
+    t.join();
+    BOOST_REQUIRE_EQUAL(count, listener.messages.size());        
+    for (size_t i = 0; i < count; ++i) 
+        BOOST_CHECK_EQUAL(lexical_cast<string>(i), listener.messages[i].getData());
+}
+*/
 
-    }
+BOOST_FIXTURE_TEST_CASE(_FIXTURE, ClientSessionFixture)
+{
+    session =connection.newSession(0);
+    session.suspend();  // session has 0 timeout.
+    try {
+        connection.resume(session);
+        BOOST_FAIL("Expected InvalidArgumentException.");
+    } catch(const InternalErrorException&) {}
+}
 
-    void testResumeExpiredError() {
-        session =connection.newSession(0);
-        session.suspend();  // session has 0 timeout.
-        try {
-           connection.resume(session);
-            CPPUNIT_FAIL("Expected InvalidArgumentException.");
-        } catch(const InternalErrorException&) {}
-    }
+BOOST_FIXTURE_TEST_CASE(testUseSuspendedError, ClientSessionFixture)
+{
+    session =connection.newSession(60);
+    session.suspend();
+    try {
+        session.exchangeQuery(name="amq.fanout");
+        BOOST_FAIL("Expected session suspended exception");
+    } catch(const CommandInvalidException&) {}
+}
 
-    void testUseSuspendedError() {
-        session =connection.newSession(60);
-        session.suspend();
-        try {
-            session.exchangeQuery(name="amq.fanout");
-            CPPUNIT_FAIL("Expected session suspended exception");
-        } catch(const CommandInvalidException&) {}
+BOOST_FIXTURE_TEST_CASE(testSuspendResume, ClientSessionFixture)
+{
+    session =connection.newSession(60);
+    declareSubscribe();
+    session.suspend();
+    // Make sure we are still subscribed after resume.
+    connection.resume(session);
+    session.messageTransfer(content=TransferContent("my-message", "my-queue"));
+    FrameSet::shared_ptr msg = session.get();
+    BOOST_CHECK_EQUAL(string("my-message"), msg->getContent());
+}
+
+BOOST_FIXTURE_TEST_CASE(testSendToSelf, SessionFixture) {
+    // https://bugzilla.redhat.com/show_bug.cgi?id=410551
+    // Deadlock if SubscriptionManager  run() concurrent with session ack.
+    LocalQueue myq;
+    session.queueDeclare(queue="myq", exclusive=true, autoDelete=true);
+    subs.subscribe(myq, "myq");
+    string data("msg");
+    Message msg(data, "myq");
+    const int count=100;       // Verified with count=100000 in a loop.
+    for (int i = 0; i < count; ++i)
+        session.messageTransfer(content=msg);
+    for (int j = 0; j < count; ++j) {
+        Message m=myq.pop();
+        BOOST_CHECK_EQUAL(m.getData(), data);
     }
+}
 
-    void testSuspendResume() {
-        session =connection.newSession(60);
-        declareSubscribe();
-        session.suspend();
-        // Make sure we are still subscribed after resume.
-        connection.resume(session);
-        session.messageTransfer(content=TransferContent("my-message", "my-queue"));
-        FrameSet::shared_ptr msg = session.get();
-        CPPUNIT_ASSERT_EQUAL(string("my-message"), msg->getContent());
-    }
-};
+QPID_AUTO_TEST_SUITE_END()
 
-// Make this test suite a plugin.
-CPPUNIT_PLUGIN_IMPLEMENT();
-CPPUNIT_TEST_SUITE_REGISTRATION(ClientSessionTest);

Modified: incubator/qpid/branches/thegreatmerge/qpid/cpp/src/tests/Cpg.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/cpp/src/tests/Cpg.cpp?rev=620468&r1=620467&r2=620468&view=diff
==============================================================================
--- incubator/qpid/branches/thegreatmerge/qpid/cpp/src/tests/Cpg.cpp (original)
+++ incubator/qpid/branches/thegreatmerge/qpid/cpp/src/tests/Cpg.cpp Mon Feb 11 04:11:03 2008
@@ -78,12 +78,16 @@
         cpg_handle_t /*handle*/,
         struct cpg_name *grp,
         struct cpg_address */*members*/, int nMembers,
-        struct cpg_address */*left*/, int /*nLeft*/,
-        struct cpg_address */*joined*/, int /*nJoined*/
+        struct cpg_address */*left*/, int nLeft,
+        struct cpg_address */*joined*/, int nJoined
     )
     {
         BOOST_CHECK_EQUAL(group, Cpg::str(*grp));
         configChanges.push_back(nMembers);
+        BOOST_MESSAGE("configChange: "<<
+                      nLeft<<" left "<<
+                      nJoined<<" joined "<<
+                      nMembers<<" members.");
     }
 };
 

Modified: incubator/qpid/branches/thegreatmerge/qpid/cpp/src/tests/IList.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/cpp/src/tests/IList.cpp?rev=620468&r1=620467&r2=620468&view=diff
==============================================================================
--- incubator/qpid/branches/thegreatmerge/qpid/cpp/src/tests/IList.cpp (original)
+++ incubator/qpid/branches/thegreatmerge/qpid/cpp/src/tests/IList.cpp Mon Feb 11 04:11:03 2008
@@ -152,6 +152,10 @@
 }    
 
 
+BOOST_AUTO_TEST_CASE(testEmptyDtor) {
+    TestList l;
+}
+
 BOOST_FIXTURE_TEST_CASE(testOwnership, Fixture) {
     { 
         TestList l2;

Modified: incubator/qpid/branches/thegreatmerge/qpid/cpp/src/tests/Makefile.am
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/cpp/src/tests/Makefile.am?rev=620468&r1=620467&r2=620468&view=diff
==============================================================================
--- incubator/qpid/branches/thegreatmerge/qpid/cpp/src/tests/Makefile.am (original)
+++ incubator/qpid/branches/thegreatmerge/qpid/cpp/src/tests/Makefile.am Mon Feb 11 04:11:03 2008
@@ -36,7 +36,8 @@
 	Url.cpp Uuid.cpp \
 	Shlib.cpp FieldValue.cpp FieldTable.cpp Array.cpp \
 	InlineVector.cpp \
-	IList.cpp
+	IList.cpp \
+	ClientSessionTest.cpp
 
 check_LTLIBRARIES += libshlibtest.la
 libshlibtest_la_LDFLAGS = -module -rpath $(abs_builddir)
@@ -78,8 +79,7 @@
   TxAckTest		\
   TxBufferTest		\
   TxPublishTest		\
-  MessageBuilderTest    \
-  ClientSessionTest
+  MessageBuilderTest
 
 #client_unit_tests =	\
 #  ClientChannelTest     
@@ -109,7 +109,7 @@
 
 check_PROGRAMS += $(testprogs) interop_runner
 
-TESTS_ENVIRONMENT = VALGRIND=$(VALGRIND) srcdir=$(srcdir) $(srcdir)/run_test
+TESTS_ENVIRONMENT = VALGRIND=$(VALGRIND) srcdir=$(srcdir) QPID_DATA_DIR= $(srcdir)/run_test 
 
 system_tests = client_test quick_perftest quick_topictest
 TESTS += run-unit-tests start_broker $(system_tests) python_tests stop_broker 
@@ -123,7 +123,6 @@
   .valgrind.supp							\
   .valgrindrc								\
   MessageUtils.h							\
-  MockChannel.h								\
   MockConnectionInputHandler.h						\
   TxMocks.h								\
   qpid_test_plugin.h

Modified: incubator/qpid/branches/thegreatmerge/qpid/cpp/src/tests/MessageTest.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/cpp/src/tests/MessageTest.cpp?rev=620468&r1=620467&r2=620468&view=diff
==============================================================================
--- incubator/qpid/branches/thegreatmerge/qpid/cpp/src/tests/MessageTest.cpp (original)
+++ incubator/qpid/branches/thegreatmerge/qpid/cpp/src/tests/MessageTest.cpp Mon Feb 11 04:11:03 2008
@@ -22,7 +22,6 @@
 #include "qpid/framing/AMQP_HighestVersion.h"
 #include "qpid/framing/AMQFrame.h"
 #include "qpid/framing/FieldValue.h"
-#include "MockChannel.h"
 
 #include "qpid_test_plugin.h"
 

Modified: incubator/qpid/branches/thegreatmerge/qpid/cpp/src/tests/QueueTest.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/cpp/src/tests/QueueTest.cpp?rev=620468&r1=620467&r2=620468&view=diff
==============================================================================
--- incubator/qpid/branches/thegreatmerge/qpid/cpp/src/tests/QueueTest.cpp (original)
+++ incubator/qpid/branches/thegreatmerge/qpid/cpp/src/tests/QueueTest.cpp Mon Feb 11 04:11:03 2008
@@ -25,7 +25,6 @@
 #include "qpid/broker/QueueRegistry.h"
 #include "qpid_test_plugin.h"
 #include <iostream>
-#include "MockChannel.h"
 #include "boost/format.hpp"
 
 using namespace qpid;

Modified: incubator/qpid/branches/thegreatmerge/qpid/cpp/src/tests/TestOptions.h
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/cpp/src/tests/TestOptions.h?rev=620468&r1=620467&r2=620468&view=diff
==============================================================================
--- incubator/qpid/branches/thegreatmerge/qpid/cpp/src/tests/TestOptions.h (original)
+++ incubator/qpid/branches/thegreatmerge/qpid/cpp/src/tests/TestOptions.h Mon Feb 11 04:11:03 2008
@@ -37,8 +37,8 @@
     TestOptions(const std::string& helpText_=std::string()) :
         Options("Test Options"),
         host("localhost"), port(TcpAddress::DEFAULT_PORT),
-        clientid("cpp"), help(false),
-        helpText(helpText_)
+        clientid("cpp"), username("guest"), password("guest"),
+        help(false), helpText(helpText_)
     {
         addOptions()
             ("host,h", optValue(host, "HOST"), "Broker host to connect to")

Modified: incubator/qpid/branches/thegreatmerge/qpid/cpp/src/tests/TxAckTest.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/cpp/src/tests/TxAckTest.cpp?rev=620468&r1=620467&r2=620468&view=diff
==============================================================================
--- incubator/qpid/branches/thegreatmerge/qpid/cpp/src/tests/TxAckTest.cpp (original)
+++ incubator/qpid/branches/thegreatmerge/qpid/cpp/src/tests/TxAckTest.cpp Mon Feb 11 04:11:03 2008
@@ -25,7 +25,6 @@
 #include <iostream>
 #include <list>
 #include <vector>
-#include "MockChannel.h"
 
 using std::list;
 using std::vector;

Modified: incubator/qpid/branches/thegreatmerge/qpid/cpp/src/tests/TxPublishTest.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/cpp/src/tests/TxPublishTest.cpp?rev=620468&r1=620467&r2=620468&view=diff
==============================================================================
--- incubator/qpid/branches/thegreatmerge/qpid/cpp/src/tests/TxPublishTest.cpp (original)
+++ incubator/qpid/branches/thegreatmerge/qpid/cpp/src/tests/TxPublishTest.cpp Mon Feb 11 04:11:03 2008
@@ -25,7 +25,6 @@
 #include <iostream>
 #include <list>
 #include <vector>
-#include "MockChannel.h"
 #include "MessageUtils.h"
 
 using std::list;

Modified: incubator/qpid/branches/thegreatmerge/qpid/cpp/src/tests/Url.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/cpp/src/tests/Url.cpp?rev=620468&r1=620467&r2=620468&view=diff
==============================================================================
--- incubator/qpid/branches/thegreatmerge/qpid/cpp/src/tests/Url.cpp (original)
+++ incubator/qpid/branches/thegreatmerge/qpid/cpp/src/tests/Url.cpp Mon Feb 11 04:11:03 2008
@@ -33,7 +33,7 @@
     url.push_back(TcpAddress("foo.com"));
     url.push_back(TcpAddress("bar.com", 6789));
     BOOST_CHECK_EQUAL("amqp:tcp:foo.com:5672,tcp:bar.com:6789", url.str());
-    BOOST_CHECK_EQUAL("amqp:", Url().str());
+    BOOST_CHECK(Url().str().empty());
 }
 
 

Modified: incubator/qpid/branches/thegreatmerge/qpid/cpp/src/tests/ais_check
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/cpp/src/tests/ais_check?rev=620468&r1=620467&r2=620468&view=diff
==============================================================================
--- incubator/qpid/branches/thegreatmerge/qpid/cpp/src/tests/ais_check (original)
+++ incubator/qpid/branches/thegreatmerge/qpid/cpp/src/tests/ais_check Mon Feb 11 04:11:03 2008
@@ -2,10 +2,12 @@
 # Check for requirements, run AIS tests if found.
 #
 
-test `id -ng` = "ais" || BADGROUP=yes
-{ ps -u root | grep aisexec >/dev/null; } || NOAISEXEC=yes
+id -nG | grep '\<ais\>' || \
+    NOGROUP="You are not a member of the ais group."
+ps -u root | grep aisexec >/dev/null || \
+    NOAISEXEC="The aisexec daemon is not running as root"
 
-if test -n "$BADGROUP" -o -n "$NOAISEXEC"; then
+if test -n "$NOGROUP" -o -n "$NOAISEXEC"; then
     cat <<EOF
 
     =========== WARNING: NOT RUNNING AIS TESTS ==============
@@ -13,18 +15,8 @@
     Tests that depend on the openais library (used for clustering)
     will not be run because:
 
-EOF
-    test -n "$BADGROUP" && cat <<EOF
-    You do not appear to have you group ID set to "ais". Make ais your
-    primary group, or run "newgrp ais" before running the tests.
-
-EOF
-    test -n "$NOAISEXEC" && cat <<EOF    
-    The aisexec daemon is not running. Make sure /etc/ais/openais.conf
-    is a valid configuration and aisexec is run by root.
-EOF
-
-    cat <<EOF
+    $NOGROUP
+    $NOAISEXEC
 
     ==========================================================
     
@@ -32,8 +24,4 @@
     exit 0;			# A warning, not a failure.
 fi
 
-FAILED=0
-for test in `cat ais_tests`; do
-    ./$test || FAILED=`expr $FAILED + 1`
-done
-exit $FAILED
+echo ./ais_run | newgrp ais

Modified: incubator/qpid/branches/thegreatmerge/qpid/cpp/src/tests/cluster.mk
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/cpp/src/tests/cluster.mk?rev=620468&r1=620467&r2=620468&view=diff
==============================================================================
--- incubator/qpid/branches/thegreatmerge/qpid/cpp/src/tests/cluster.mk (original)
+++ incubator/qpid/branches/thegreatmerge/qpid/cpp/src/tests/cluster.mk Mon Feb 11 04:11:03 2008
@@ -1,55 +1,20 @@
-# FIXME aconway 2007-08-31: Disabled cluster compilation,
-# has not been kept up to date with recent commits.
+if CPG
+#
+# Cluster tests makefile fragment, to be included in Makefile.am
 # 
 
-# if CLUSTER
-# # Cluster tests makefile fragment, to be included in Makefile.am
-# # 
+lib_cluster = $(abs_builddir)/../libqpidcluster.la
 
-# lib_cluster = $(abs_builddir)/../libqpidcluster.la
-
-# # NOTE: Programs using the openais library must be run with gid=ais
-# # You should do "newgrp ais" before running the tests to run these.
-# # 
-
-# #
-# # Cluster tests.
-# # 
-
-# # ais_check runs ais if the conditions to run AIS tests
-# # are met, otherwise it prints a warning.
-# TESTS+=ais_check
-# EXTRA_DIST+=ais_check
-# AIS_TESTS=
-
-# ais_check: ais_tests
-# ais_tests:
-# 	echo $(AIS_TESTS)
-# 	echo "# AIS tests" >$@
-# 	for t in $(AIS_TESTS); do echo ./$$t >$@; done
-# 	chmod a+x $@
-
-# CLEANFILES+=ais_tests
-
-# AIS_TESTS+=Cpg
-# check_PROGRAMS+=Cpg
-# Cpg_SOURCES=Cpg.cpp
-# Cpg_LDADD=$(lib_cluster) -lboost_unit_test_framework
-
-# # TODO aconway 2007-07-26: Fix this test.
-# #AIS_TESTS+=Cluster
-# # check_PROGRAMS+=Cluster
-# # Cluster_SOURCES=Cluster.cpp Cluster.h
-# # Cluster_LDADD=$(lib_cluster) -lboost_unit_test_framework
-
-# check_PROGRAMS+=Cluster_child 
-# Cluster_child_SOURCES=Cluster_child.cpp Cluster.h
-# Cluster_child_LDADD=$(lib_cluster) -lboost_test_exec_monitor
+# NOTE: Programs using the openais library must be run with gid=ais
+# You should do "newgrp ais" before running the tests to run these.
+# 
 
-# # TODO aconway 2007-07-03: In progress
-# #AIS_TESTS+=cluster_client
-# check_PROGRAMS+=cluster_client
-# cluster_client_SOURCES=cluster_client.cpp
-# cluster_client_LDADD=$(lib_client) -lboost_unit_test_framework
+# ais_check checks conditions for AIS tests and runs if ok.
+TESTS+=ais_check
+EXTRA_DIST+=ais_check ais_run
+
+check_PROGRAMS+=ais_test
+ais_test_SOURCES=ais_test.cpp Cpg.cpp 
+ais_test_LDADD=$(lib_client) $(lib_cluster) -lboost_unit_test_framework
 
-# endif
+endif

Modified: incubator/qpid/branches/thegreatmerge/qpid/cpp/src/tests/cluster_client.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/cpp/src/tests/cluster_client.cpp?rev=620468&r1=620467&r2=620468&view=diff
==============================================================================
--- incubator/qpid/branches/thegreatmerge/qpid/cpp/src/tests/cluster_client.cpp (original)
+++ incubator/qpid/branches/thegreatmerge/qpid/cpp/src/tests/cluster_client.cpp Mon Feb 11 04:11:03 2008
@@ -16,21 +16,25 @@
  *
  */
 
-#include "qpid/client/Connection.h"
-#include "qpid/shared_ptr.h"
-
 #include "unit_test.h"
+#include "BrokerFixture.h"
+#include "qpid/client/Session.h"
 
 #include <fstream>
 #include <vector>
 #include <functional>
 
-
 QPID_AUTO_TEST_SUITE(cluster_clientTestSuite)
 
-using namespace std;
 using namespace qpid;
 using namespace qpid::client;
+using namespace qpid::framing;
+using namespace qpid::client::arg;
+using framing::TransferContent;
+using std::vector;
+using std::string;
+using std::ifstream;
+using std::ws;
 
 struct ClusterConnections : public vector<shared_ptr<Connection> > {
     ClusterConnections() {
@@ -58,25 +62,23 @@
     ClusterConnections cluster;
     BOOST_REQUIRE(cluster.size() > 1);
 
-    Exchange fooEx("FooEx", Exchange::TOPIC_EXCHANGE);
-    Queue fooQ("FooQ");
-    
-    Channel broker0;
-    cluster[0]->openChannel(broker0);
-    broker0.declareExchange(fooEx);
-    broker0.declareQueue(fooQ);
-    broker0.bind(fooEx, fooQ, "FooKey");
+    Session broker0 = cluster[0]->newSession();
+    broker0.exchangeDeclare(exchange="ex");
+    broker0.queueDeclare(queue="q");
+    broker0.queueBind(exchange="ex", queue="q", routingKey="key");
     broker0.close();
     
     for (size_t i = 1; i < cluster.size(); ++i) {
-        Channel ch;
-        cluster[i]->openChannel(ch);
-        ch.publish(Message("hello"), fooEx, "FooKey");
-        Message m;
-        BOOST_REQUIRE(ch.get(m, fooQ));
-        BOOST_REQUIRE_EQUAL(m.getData(), "hello");
-        ch.close();
-    }
+        Session s = cluster[i]->newSession();
+        s.messageTransfer(content=TransferContent("data", "key", "ex"));
+        s.messageSubscribe(queue="q", destination="q");
+        s.messageFlow(destination="q", unit=0, value=1);//messages
+        FrameSet::shared_ptr msg = s.get();
+        BOOST_CHECK(msg->isA<MessageTransferBody>());
+        BOOST_CHECK_EQUAL(string("data"), msg->getContent());
+        s.getExecution().completed(msg->getId(), true, true);
+        cluster[i]->close();
+    }    
 }
 
 QPID_AUTO_TEST_SUITE_END()

Modified: incubator/qpid/branches/thegreatmerge/qpid/cpp/src/tests/logging.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/cpp/src/tests/logging.cpp?rev=620468&r1=620467&r2=620468&view=diff
==============================================================================
--- incubator/qpid/branches/thegreatmerge/qpid/cpp/src/tests/logging.cpp (original)
+++ incubator/qpid/branches/thegreatmerge/qpid/cpp/src/tests/logging.cpp Mon Feb 11 04:11:03 2008
@@ -367,4 +367,24 @@
     unlink("logging.tmp");
 }
 
+BOOST_AUTO_TEST_CASE(testQuoteControlChars) {
+    Logger& l=Logger::instance();
+    l.clear();
+    Options opts;
+    opts.outputs.clear();
+    opts.outputs.push_back("logging.tmp");
+    opts.time=false;
+    l.configure(opts, "test");
+    char s[] = "null\0tab\tspace newline\nret\r";
+    string str(s, sizeof(s));
+    QPID_LOG(critical, str); 
+    ifstream log("logging.tmp");
+    string line;
+    getline(log, line);
+    string expect="critical null^@tab^Ispace newline^Jret^M^@";
+    BOOST_CHECK_EQUAL(expect, line);
+    log.close();
+    unlink("logging.tmp");
+}
+
 QPID_AUTO_TEST_SUITE_END()

Modified: incubator/qpid/branches/thegreatmerge/qpid/cpp/src/tests/perftest.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/cpp/src/tests/perftest.cpp?rev=620468&r1=620467&r2=620468&view=diff
==============================================================================
--- incubator/qpid/branches/thegreatmerge/qpid/cpp/src/tests/perftest.cpp (original)
+++ incubator/qpid/branches/thegreatmerge/qpid/cpp/src/tests/perftest.cpp Mon Feb 11 04:11:03 2008
@@ -38,6 +38,8 @@
 #include <sstream>
 #include <numeric>
 #include <algorithm>
+#include <unistd.h>
+
 
 using namespace std;
 using namespace qpid;
@@ -90,6 +92,8 @@
     size_t iterations;
     Mode mode;
     bool summary;
+	uint32_t intervalSub;
+	uint32_t intervalPub;
 
     static const std::string helpText;
     
@@ -98,7 +102,8 @@
         setup(false), control(false), publish(false), subscribe(false),
         pubs(1), count(500000), size(1024), confirm(true), durable(false), uniqueData(false),
         subs(1), ack(0),
-        qt(1), iterations(1), mode(SHARED), summary(false)
+        qt(1), iterations(1), mode(SHARED), summary(false),
+		intervalSub(0), intervalPub(0)
     {
         addOptions()
             ("setup", optValue(setup), "Create shared queues.")
@@ -127,7 +132,10 @@
             ("summary,s", optValue(summary), "Summary output: pubs/sec subs/sec transfers/sec Mbytes/sec")
 
             ("queue_max_count", optValue(queueMaxCount, "N"), "queue policy: count to trigger 'flow to disk'")
-            ("queue_max_size", optValue(queueMaxSize, "N"), "queue policy: accumulated size to trigger 'flow to disk'");
+            ("queue_max_size", optValue(queueMaxSize, "N"), "queue policy: accumulated size to trigger 'flow to disk'")
+
+            ("interval_sub", optValue(intervalSub, "ms"), ">=0 delay between msg consume")
+            ("interval_pub", optValue(intervalPub, "ms"), ">=0 delay between msg publish");
     }
 
     // Computed values
@@ -454,6 +462,7 @@
                         arg::destination=destination,
                         arg::content=msg,
                         arg::confirmMode=opts.confirm);
+		            if (opts.intervalPub) ::usleep(opts.intervalPub*1000);
                 }
                 if (opts.confirm) completion.sync();
                 AbsTime end=now();
@@ -523,6 +532,7 @@
                 size_t expect=0;
                 for (size_t i = 0; i < opts.subQuota; ++i) {
                     msg=lq.pop();
+		            if (opts.intervalSub) ::usleep(opts.intervalSub*1000);
                     // TODO aconway 2007-11-23: check message order for. 
                     // multiple publishers. Need an acorray of counters,
                     // one per publisher and a publisher ID in the

Modified: incubator/qpid/branches/thegreatmerge/qpid/cpp/src/tests/start_cluster
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/cpp/src/tests/start_cluster?rev=620468&r1=620467&r2=620468&view=diff
==============================================================================
--- incubator/qpid/branches/thegreatmerge/qpid/cpp/src/tests/start_cluster (original)
+++ incubator/qpid/branches/thegreatmerge/qpid/cpp/src/tests/start_cluster Mon Feb 11 04:11:03 2008
@@ -12,7 +12,7 @@
 OPTS=$*
 CLUSTER=`whoami`		# Cluster name=user name, avoid clashes.
 for (( i=0; i<SIZE; ++i )); do
-    PORT=`../qpidd -dp0 --log-output=cluster$i.log --cluster $CLUSTER $OPTS` || exit 1
+    PORT=`../qpidd --load-module ../.libs/libqpidcluster.so -dp0 --log-output=cluster$i.log --cluster-name $CLUSTER $OPTS` || exit 1
     echo $PORT >> cluster.ports
 done
     

Modified: incubator/qpid/branches/thegreatmerge/qpid/java/broker/bin/qpid-passwd
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/java/broker/bin/qpid-passwd?rev=620468&r1=620467&r2=620468&view=diff
==============================================================================
--- incubator/qpid/branches/thegreatmerge/qpid/java/broker/bin/qpid-passwd (original)
+++ incubator/qpid/branches/thegreatmerge/qpid/java/broker/bin/qpid-passwd Mon Feb 11 04:11:03 2008
@@ -1,30 +1,30 @@
-#!/bin/bash
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements.  See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership.  The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License.  You may obtain a copy of the License at
-#
-#   http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied.  See the License for the
-# specific language governing permissions and limitations
-# under the License.
-#
-
-# Set classpath to include Qpid jar with all required jars in manifest
-QPID_LIBS=$QPID_HOME/lib/qpid-incubating.jar
-
-# Set other variables used by the qpid-run script before calling
-export JAVA=java \
-       JAVA_VM=-server \
-       JAVA_MEM=-Xmx1024m \
-       QPID_CLASSPATH=$QPID_LIBS
-
-. qpid-run org.apache.qpid.tools.security.Passwd "$@"
+#!/bin/bash
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+# Set classpath to include Qpid jar with all required jars in manifest
+QPID_LIBS=$QPID_HOME/lib/qpid-incubating.jar
+
+# Set other variables used by the qpid-run script before calling
+export JAVA=java \
+       JAVA_VM=-server \
+       JAVA_MEM=-Xmx1024m \
+       QPID_CLASSPATH=$QPID_LIBS
+
+. qpid-run org.apache.qpid.tools.security.Passwd "$@"

Propchange: incubator/qpid/branches/thegreatmerge/qpid/java/broker/bin/qpid-passwd
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/qpid/branches/thegreatmerge/qpid/java/broker/bin/qpid-passwd
------------------------------------------------------------------------------
    svn:executable = *

Modified: incubator/qpid/branches/thegreatmerge/qpid/java/broker/bin/qpid-server
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/java/broker/bin/qpid-server?rev=620468&r1=620467&r2=620468&view=diff
==============================================================================
--- incubator/qpid/branches/thegreatmerge/qpid/java/broker/bin/qpid-server (original)
+++ incubator/qpid/branches/thegreatmerge/qpid/java/broker/bin/qpid-server Mon Feb 11 04:11:03 2008
@@ -1,31 +1,31 @@
-#!/bin/bash
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements.  See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership.  The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License.  You may obtain a copy of the License at
-#
-#   http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied.  See the License for the
-# specific language governing permissions and limitations
-# under the License.
-#
-
-# Set classpath to include Qpid jar with all required jars in manifest
-QPID_LIBS=$QPID_HOME/lib/qpid-incubating.jar:$QPID_HOME/lib/bdbstore-launch.jar
-
-# Set other variables used by the qpid-run script before calling
-export JAVA=java \
-       JAVA_VM=-server \
-       JAVA_MEM=-Xmx1024m \
-       JAVA_GC="-XX:-UseConcMarkSweepGC -XX:+HeapDumpOnOutOfMemoryError" \
-       QPID_CLASSPATH=$QPID_LIBS
-
-. qpid-run org.apache.qpid.server.Main "$@"
+#!/bin/bash
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+# Set classpath to include Qpid jar with all required jars in manifest
+QPID_LIBS=$QPID_HOME/lib/qpid-incubating.jar:$QPID_HOME/lib/bdbstore-launch.jar
+
+# Set other variables used by the qpid-run script before calling
+export JAVA=java \
+       JAVA_VM=-server \
+       JAVA_MEM=-Xmx1024m \
+       JAVA_GC="-XX:-UseConcMarkSweepGC -XX:+HeapDumpOnOutOfMemoryError" \
+       QPID_CLASSPATH=$QPID_LIBS
+
+. qpid-run org.apache.qpid.server.Main "$@"

Propchange: incubator/qpid/branches/thegreatmerge/qpid/java/broker/bin/qpid-server
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/qpid/branches/thegreatmerge/qpid/java/broker/bin/qpid-server
------------------------------------------------------------------------------
    svn:executable = *