You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ac...@apache.org on 2007/07/28 00:08:52 UTC

svn commit: r560404 - in /incubator/qpid/trunk/qpid/cpp/src: ./ qpid/broker/ qpid/cluster/ qpid/framing/

Author: aconway
Date: Fri Jul 27 15:08:51 2007
New Revision: 560404

URL: http://svn.apache.org/viewvc?view=rev&rev=560404
Log:

	* src/tests/ais_check, cluster.mk: Run AIS tests only if:
	 - CLUSTER makefile conditional set by configure.
	 - Effective gid == ais
	 - aisexec is running
	Otherwise print a warning. 

	* src/tests/EventChannelConnectionTest.cpp
	* src/qpid/cluster/doxygen_overview.h
	Removed unused files.

Removed:
    incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/SessionFrame.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/SessionFrame.h
Modified:
    incubator/qpid/trunk/qpid/cpp/src/cluster.mk
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Connection.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/SessionManager.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/SessionManager.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/framing/HandlerUpdater.h

Modified: incubator/qpid/trunk/qpid/cpp/src/cluster.mk
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/cluster.mk?view=diff&rev=560404&r1=560403&r2=560404
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/cluster.mk (original)
+++ incubator/qpid/trunk/qpid/cpp/src/cluster.mk Fri Jul 27 15:08:51 2007
@@ -14,8 +14,6 @@
   qpid/cluster/ClusterPlugin.cpp \
   qpid/cluster/ClassifierHandler.h \
   qpid/cluster/ClassifierHandler.cpp \
-  qpid/cluster/SessionFrame.h \
-  qpid/cluster/SessionFrame.cpp \
   qpid/cluster/SessionManager.h \
   qpid/cluster/SessionManager.cpp
 

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp?view=diff&rev=560404&r1=560403&r2=560404
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp Fri Jul 27 15:08:51 2007
@@ -47,6 +47,7 @@
 using qpid::sys::Acceptor;
 using qpid::framing::HandlerUpdater;
 using qpid::framing::FrameHandler;
+using qpid::framing::ChannelId;
 
 namespace qpid {
 namespace broker {
@@ -162,9 +163,10 @@
     handlerUpdaters.push_back(updater);
 }
 
-void Broker::update(FrameHandler::Chains& chains) {
+void Broker::update(ChannelId channel, FrameHandler::Chains& chains) {
     for_each(handlerUpdaters.begin(), handlerUpdaters.end(),
-             boost::bind(&HandlerUpdater::update, _1, boost::ref(chains)));
+             boost::bind(&HandlerUpdater::update, _1,
+                         channel, boost::ref(chains)));
 }
 
 }} // namespace qpid::broker

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.h?view=diff&rev=560404&r1=560403&r2=560404
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.h Fri Jul 27 15:08:51 2007
@@ -33,7 +33,6 @@
 #include "qpid/Plugin.h"
 #include "qpid/Url.h"
 #include "qpid/framing/FrameHandler.h"
-#include "qpid/framing/HandlerUpdater.h"
 #include "qpid/framing/OutputHandler.h"
 #include "qpid/framing/ProtocolInitiation.h"
 #include "qpid/sys/Acceptor.h"
@@ -96,7 +95,7 @@
     void add(const shared_ptr<framing::HandlerUpdater>&);
     
     /** Apply all handler updaters to a handler chain pair. */
-    void update(framing::FrameHandler::Chains&); 
+    void update(framing::ChannelId, framing::FrameHandler::Chains&); 
     
     MessageStore& getStore() { return *store; }
     QueueRegistry& getQueues() { return queues; }

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Connection.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Connection.cpp?view=diff&rev=560404&r1=560403&r2=560404
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Connection.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Connection.cpp Fri Jul 27 15:08:51 2007
@@ -108,7 +108,7 @@
         FrameHandler::Chains chains(
             new SemanticHandler(id, *this),
             new OutputHandlerFrameHandler(*out));
-        broker.update(chains);
+        broker.update(id, chains);
         i = channels.insert(ChannelMap::value_type(id, chains)).first;
     }        
     return i->second;

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp?view=diff&rev=560404&r1=560403&r2=560404
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp Fri Jul 27 15:08:51 2007
@@ -19,7 +19,6 @@
 #include "Cluster.h"
 #include "qpid/framing/AMQFrame.h"
 #include "qpid/framing/ClusterNotifyBody.h"
-#include "qpid/framing/Uuid.h"
 #include "qpid/log/Statement.h"
 #include <boost/bind.hpp>
 #include <algorithm>
@@ -52,9 +51,9 @@
 
 Cluster::Cluster(
     const std::string& name_, const std::string& url_,
-    const SessionFrameHandler::Chain& next
+    const FrameHandler::Chain& next
 ) :
-    SessionFrameHandler(next), 
+    FrameHandler(next), 
     cpg(new Cpg(*this)),
     name(name_),
     url(url_), 
@@ -85,7 +84,7 @@
     }
 }
 
-void Cluster::handle(SessionFrame& frame) {
+void Cluster::handle(AMQFrame& frame) {
     QPID_LOG(trace, *this << " SEND: " << frame);
     Buffer buf(frame.size());
     frame.encode(buf);
@@ -95,9 +94,9 @@
 }
 
 void Cluster::notify() {
-    SessionFrame sf;
-    sf.frame.setBody(make_shared_ptr(new ClusterNotifyBody(ProtocolVersion(), url)));
-    handle(sf);
+    AMQFrame frame(ProtocolVersion(), 0,
+                   new ClusterNotifyBody(ProtocolVersion(), url));
+    handle(frame);
 }
 
 size_t Cluster::size() const {
@@ -125,11 +124,11 @@
     assert(name == *group);
     Id from(nodeid, pid);
     Buffer buf(static_cast<char*>(msg), msg_len);
-    SessionFrame frame;
+    AMQFrame frame;
     frame.decode(buf);
     QPID_LOG(trace, *this << " RECV: " << frame << " from: " << from);
-    if (frame.uuid.isNull())
-        handleClusterFrame(from, frame.frame);
+    if (frame.getChannel() == 0)
+        handleClusterFrame(from, frame);
     else
         next->handle(frame);
 }

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h?view=diff&rev=560404&r1=560403&r2=560404
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h Fri Jul 27 15:08:51 2007
@@ -20,7 +20,6 @@
  */
 
 #include "qpid/cluster/Cpg.h"
-#include "qpid/cluster/SessionFrame.h"
 #include "qpid/framing/FrameHandler.h"
 #include "qpid/shared_ptr.h"
 #include "qpid/sys/Monitor.h"
@@ -40,13 +39,13 @@
  * Connection to the cluster. Maintains cluster membership
  * data.
  *
- * As SessionFrameHandler, handles frames by sending them to the
- * cluster, sends frames received from the cluster to the next
- * SessionFrameHandler.
+ * As FrameHandler, handles frames by sending them to the
+ * cluster. Frames received from the cluster are sent to the next
+ * FrameHandler in the chain.
  * 
  * 
  */
-class Cluster : public SessionFrameHandler,
+class Cluster : public framing::FrameHandler,
                 private sys::Runnable, private Cpg::Handler
 {
   public:
@@ -66,7 +65,7 @@
      * @param handler for frames received from the cluster.
      */
     Cluster(const std::string& name, const std::string& url,
-            const SessionFrameHandler::Chain& next);
+            const framing::FrameHandler::Chain& next);
 
     virtual ~Cluster();
 
@@ -87,7 +86,7 @@
               sys::Duration timeout=sys::TIME_INFINITE) const;
 
     /** Send frame to the cluster */
-    void handle(SessionFrame&);
+    void handle(framing::AMQFrame&);
     
   private:
     typedef Cpg::Id Id;

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/SessionManager.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/SessionManager.cpp?view=diff&rev=560404&r1=560403&r2=560404
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/SessionManager.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/SessionManager.cpp Fri Jul 27 15:08:51 2007
@@ -74,61 +74,38 @@
     virtual void redeliver(Message::shared_ptr&, DeliveryToken::shared_ptr, DeliveryId) {}
 };
 
-/** Wrap plain AMQFrames in SessionFrames */
-struct FrameWrapperHandler : public FrameHandler {
-
-    FrameWrapperHandler(const Uuid& id, bool dir, SessionFrameHandler::Chain next_)
-        : uuid(id), direction(dir), next(next_) {
-        assert(!uuid.isNull());
-    }
-    
-    void handle(AMQFrame& frame) {
-        SessionFrame sf(uuid, frame, direction);
-        assert(next);
-        next->handle(sf);
-    }
-
-    Uuid uuid;
-    bool direction;
-    SessionFrameHandler::Chain next;
-};
-
 SessionManager::SessionManager(Broker& b) : localBroker(new BrokerHandler(b)) {}
 
-void SessionManager::update(FrameHandler::Chains& chains) {
+void SessionManager::update(ChannelId channel, FrameHandler::Chains& chains) {
     Mutex::ScopedLock l(lock);
     // Create a new local session, store local chains.
-    Uuid uuid(true);
-    sessions[uuid] = chains;
+    sessions[channel] = chains;
     
-    // Replace local in chain. Build from the back.
-    // TODO aconway 2007-07-05: Currently mcast wiring, bypass
-    // everythign else.
+    // Replace local "in" chain to mcast wiring and process other frames
+    // as normal.
     assert(clusterSend);
-    FrameHandler::Chain wiring(new FrameWrapperHandler(uuid, SessionFrame::IN, clusterSend));
-    FrameHandler::Chain classify(new ClassifierHandler(wiring, chains.in));
-    chains.in = classify;
-
-    // Leave out chain unmodified.
-    // TODO aconway 2007-07-05: Failover will require replication of
-    // outgoing frames to session replicas.
+    chains.in = make_shared_ptr(
+        new ClassifierHandler(clusterSend, chains.in));
 }
 
-void SessionManager::handle(SessionFrame& frame) {
+void SessionManager::handle(AMQFrame& frame) {
     // Incoming from cluster.
     {
         Mutex::ScopedLock l(lock);
-        assert(frame.isIncoming); // FIXME aconway 2007-07-24: Drop isIncoming?
-        SessionMap::iterator i = sessions.find(frame.uuid);
+        SessionMap::iterator i = sessions.find(frame.getChannel());
         if (i == sessions.end()) {
-            // Non local method frame, invoke.
-            localBroker->handle(frame.frame);
+            // Non-local wiring method frame, invoke locally.
+            localBroker->handle(frame);
         }
         else {
-            // Local frame, continue on local chain
-            i->second.in->handle(frame.frame);
+            // Local frame continuing on local chain
+            i->second.in->handle(frame);
         }
     }
+}
+
+void SessionManager::setClusterSend(const FrameHandler::Chain& send) {
+    clusterSend=send;
 }
 
 }} // namespace qpid::cluster

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/SessionManager.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/SessionManager.h?view=diff&rev=560404&r1=560403&r2=560404
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/SessionManager.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/SessionManager.h Fri Jul 27 15:08:51 2007
@@ -19,7 +19,6 @@
  *
  */
 
-#include "qpid/cluster/SessionFrame.h"
 #include "qpid/framing/HandlerUpdater.h"
 #include "qpid/framing/FrameHandler.h"
 #include "qpid/framing/Uuid.h"
@@ -41,30 +40,30 @@
  * Manage sessions and handler chains for the cluster.
  * 
  */
-class SessionManager : public framing::HandlerUpdater, public SessionFrameHandler,
+class SessionManager : public framing::HandlerUpdater, public framing::FrameHandler,
                        private boost::noncopyable
 {
   public:
     SessionManager(broker::Broker& broker);
 
     /** Set the handler to send to the cluster */
-    void setClusterSend(const SessionFrameHandler::Chain& send) { clusterSend=send; }
+    void setClusterSend(const framing::FrameHandler::Chain& send);
     
     /** As ChannelUpdater update the handler chains. */
-    void update(framing::FrameHandler::Chains& chains);
+    void update(framing::ChannelId, framing::FrameHandler::Chains&);
 
-    /** As SessionFrameHandler handle frames received from the cluster */
-    void handle(SessionFrame&);
+    /** As FrameHandler frames received from the cluster */
+    void handle(framing::AMQFrame&);
 
     /** Get ChannelID for UUID. Return 0 if no mapping */
     framing::ChannelId getChannelId(const framing::Uuid&) const;
     
   private:
     class SessionOperations;
-    typedef std::map<framing::Uuid,framing::FrameHandler::Chains> SessionMap;
+    typedef std::map<framing::ChannelId,framing::FrameHandler::Chains> SessionMap;
 
     sys::Mutex lock;
-    SessionFrameHandler::Chain clusterSend;
+    framing::FrameHandler::Chain clusterSend;
     framing::FrameHandler::Chain localBroker;
     SessionMap sessions;
 };

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/framing/HandlerUpdater.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/framing/HandlerUpdater.h?view=diff&rev=560404&r1=560403&r2=560404
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/framing/HandlerUpdater.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/framing/HandlerUpdater.h Fri Jul 27 15:08:51 2007
@@ -31,10 +31,10 @@
     virtual ~HandlerUpdater() {}
     
     /** Update the handler chains.
-     *@param id Unique identifier for channel or session.
+     *@param channel Id of associated channel.
      *@param chains Handler chains to be updated.
      */
-    virtual void update(FrameHandler::Chains& chains) = 0;
+    virtual void update(ChannelId channel, FrameHandler::Chains& chains) = 0;
 };
 
 }} // namespace qpid::framing