You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ac...@apache.org on 2007/07/28 00:08:52 UTC
svn commit: r560404 - in /incubator/qpid/trunk/qpid/cpp/src: ./ qpid/broker/
qpid/cluster/ qpid/framing/
Author: aconway
Date: Fri Jul 27 15:08:51 2007
New Revision: 560404
URL: http://svn.apache.org/viewvc?view=rev&rev=560404
Log:
* src/tests/ais_check, cluster.mk: Run AIS tests only if:
- CLUSTER makefile conditional set by configure.
- Effective gid == ais
- aisexec is running
Otherwise print a warning.
* src/tests/EventChannelConnectionTest.cpp
* src/qpid/cluster/doxygen_overview.h
Removed unused files.
Removed:
incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/SessionFrame.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/SessionFrame.h
Modified:
incubator/qpid/trunk/qpid/cpp/src/cluster.mk
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.h
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Connection.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h
incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/SessionManager.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/SessionManager.h
incubator/qpid/trunk/qpid/cpp/src/qpid/framing/HandlerUpdater.h
Modified: incubator/qpid/trunk/qpid/cpp/src/cluster.mk
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/cluster.mk?view=diff&rev=560404&r1=560403&r2=560404
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/cluster.mk (original)
+++ incubator/qpid/trunk/qpid/cpp/src/cluster.mk Fri Jul 27 15:08:51 2007
@@ -14,8 +14,6 @@
qpid/cluster/ClusterPlugin.cpp \
qpid/cluster/ClassifierHandler.h \
qpid/cluster/ClassifierHandler.cpp \
- qpid/cluster/SessionFrame.h \
- qpid/cluster/SessionFrame.cpp \
qpid/cluster/SessionManager.h \
qpid/cluster/SessionManager.cpp
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp?view=diff&rev=560404&r1=560403&r2=560404
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp Fri Jul 27 15:08:51 2007
@@ -47,6 +47,7 @@
using qpid::sys::Acceptor;
using qpid::framing::HandlerUpdater;
using qpid::framing::FrameHandler;
+using qpid::framing::ChannelId;
namespace qpid {
namespace broker {
@@ -162,9 +163,10 @@
handlerUpdaters.push_back(updater);
}
-void Broker::update(FrameHandler::Chains& chains) {
+void Broker::update(ChannelId channel, FrameHandler::Chains& chains) {
for_each(handlerUpdaters.begin(), handlerUpdaters.end(),
- boost::bind(&HandlerUpdater::update, _1, boost::ref(chains)));
+ boost::bind(&HandlerUpdater::update, _1,
+ channel, boost::ref(chains)));
}
}} // namespace qpid::broker
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.h?view=diff&rev=560404&r1=560403&r2=560404
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.h Fri Jul 27 15:08:51 2007
@@ -33,7 +33,6 @@
#include "qpid/Plugin.h"
#include "qpid/Url.h"
#include "qpid/framing/FrameHandler.h"
-#include "qpid/framing/HandlerUpdater.h"
#include "qpid/framing/OutputHandler.h"
#include "qpid/framing/ProtocolInitiation.h"
#include "qpid/sys/Acceptor.h"
@@ -96,7 +95,7 @@
void add(const shared_ptr<framing::HandlerUpdater>&);
/** Apply all handler updaters to a handler chain pair. */
- void update(framing::FrameHandler::Chains&);
+ void update(framing::ChannelId, framing::FrameHandler::Chains&);
MessageStore& getStore() { return *store; }
QueueRegistry& getQueues() { return queues; }
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Connection.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Connection.cpp?view=diff&rev=560404&r1=560403&r2=560404
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Connection.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Connection.cpp Fri Jul 27 15:08:51 2007
@@ -108,7 +108,7 @@
FrameHandler::Chains chains(
new SemanticHandler(id, *this),
new OutputHandlerFrameHandler(*out));
- broker.update(chains);
+ broker.update(id, chains);
i = channels.insert(ChannelMap::value_type(id, chains)).first;
}
return i->second;
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp?view=diff&rev=560404&r1=560403&r2=560404
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp Fri Jul 27 15:08:51 2007
@@ -19,7 +19,6 @@
#include "Cluster.h"
#include "qpid/framing/AMQFrame.h"
#include "qpid/framing/ClusterNotifyBody.h"
-#include "qpid/framing/Uuid.h"
#include "qpid/log/Statement.h"
#include <boost/bind.hpp>
#include <algorithm>
@@ -52,9 +51,9 @@
Cluster::Cluster(
const std::string& name_, const std::string& url_,
- const SessionFrameHandler::Chain& next
+ const FrameHandler::Chain& next
) :
- SessionFrameHandler(next),
+ FrameHandler(next),
cpg(new Cpg(*this)),
name(name_),
url(url_),
@@ -85,7 +84,7 @@
}
}
-void Cluster::handle(SessionFrame& frame) {
+void Cluster::handle(AMQFrame& frame) {
QPID_LOG(trace, *this << " SEND: " << frame);
Buffer buf(frame.size());
frame.encode(buf);
@@ -95,9 +94,9 @@
}
void Cluster::notify() {
- SessionFrame sf;
- sf.frame.setBody(make_shared_ptr(new ClusterNotifyBody(ProtocolVersion(), url)));
- handle(sf);
+ AMQFrame frame(ProtocolVersion(), 0,
+ new ClusterNotifyBody(ProtocolVersion(), url));
+ handle(frame);
}
size_t Cluster::size() const {
@@ -125,11 +124,11 @@
assert(name == *group);
Id from(nodeid, pid);
Buffer buf(static_cast<char*>(msg), msg_len);
- SessionFrame frame;
+ AMQFrame frame;
frame.decode(buf);
QPID_LOG(trace, *this << " RECV: " << frame << " from: " << from);
- if (frame.uuid.isNull())
- handleClusterFrame(from, frame.frame);
+ if (frame.getChannel() == 0)
+ handleClusterFrame(from, frame);
else
next->handle(frame);
}
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h?view=diff&rev=560404&r1=560403&r2=560404
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h Fri Jul 27 15:08:51 2007
@@ -20,7 +20,6 @@
*/
#include "qpid/cluster/Cpg.h"
-#include "qpid/cluster/SessionFrame.h"
#include "qpid/framing/FrameHandler.h"
#include "qpid/shared_ptr.h"
#include "qpid/sys/Monitor.h"
@@ -40,13 +39,13 @@
* Connection to the cluster. Maintains cluster membership
* data.
*
- * As SessionFrameHandler, handles frames by sending them to the
- * cluster, sends frames received from the cluster to the next
- * SessionFrameHandler.
+ * As FrameHandler, handles frames by sending them to the
+ * cluster. Frames received from the cluster are sent to the next
+ * FrameHandler in the chain.
*
*
*/
-class Cluster : public SessionFrameHandler,
+class Cluster : public framing::FrameHandler,
private sys::Runnable, private Cpg::Handler
{
public:
@@ -66,7 +65,7 @@
* @param handler for frames received from the cluster.
*/
Cluster(const std::string& name, const std::string& url,
- const SessionFrameHandler::Chain& next);
+ const framing::FrameHandler::Chain& next);
virtual ~Cluster();
@@ -87,7 +86,7 @@
sys::Duration timeout=sys::TIME_INFINITE) const;
/** Send frame to the cluster */
- void handle(SessionFrame&);
+ void handle(framing::AMQFrame&);
private:
typedef Cpg::Id Id;
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/SessionManager.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/SessionManager.cpp?view=diff&rev=560404&r1=560403&r2=560404
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/SessionManager.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/SessionManager.cpp Fri Jul 27 15:08:51 2007
@@ -74,61 +74,38 @@
virtual void redeliver(Message::shared_ptr&, DeliveryToken::shared_ptr, DeliveryId) {}
};
-/** Wrap plain AMQFrames in SessionFrames */
-struct FrameWrapperHandler : public FrameHandler {
-
- FrameWrapperHandler(const Uuid& id, bool dir, SessionFrameHandler::Chain next_)
- : uuid(id), direction(dir), next(next_) {
- assert(!uuid.isNull());
- }
-
- void handle(AMQFrame& frame) {
- SessionFrame sf(uuid, frame, direction);
- assert(next);
- next->handle(sf);
- }
-
- Uuid uuid;
- bool direction;
- SessionFrameHandler::Chain next;
-};
-
SessionManager::SessionManager(Broker& b) : localBroker(new BrokerHandler(b)) {}
-void SessionManager::update(FrameHandler::Chains& chains) {
+void SessionManager::update(ChannelId channel, FrameHandler::Chains& chains) {
Mutex::ScopedLock l(lock);
// Create a new local session, store local chains.
- Uuid uuid(true);
- sessions[uuid] = chains;
+ sessions[channel] = chains;
- // Replace local in chain. Build from the back.
- // TODO aconway 2007-07-05: Currently mcast wiring, bypass
- // everythign else.
+ // Replace local "in" chain to mcast wiring and process other frames
+ // as normal.
assert(clusterSend);
- FrameHandler::Chain wiring(new FrameWrapperHandler(uuid, SessionFrame::IN, clusterSend));
- FrameHandler::Chain classify(new ClassifierHandler(wiring, chains.in));
- chains.in = classify;
-
- // Leave out chain unmodified.
- // TODO aconway 2007-07-05: Failover will require replication of
- // outgoing frames to session replicas.
+ chains.in = make_shared_ptr(
+ new ClassifierHandler(clusterSend, chains.in));
}
-void SessionManager::handle(SessionFrame& frame) {
+void SessionManager::handle(AMQFrame& frame) {
// Incoming from cluster.
{
Mutex::ScopedLock l(lock);
- assert(frame.isIncoming); // FIXME aconway 2007-07-24: Drop isIncoming?
- SessionMap::iterator i = sessions.find(frame.uuid);
+ SessionMap::iterator i = sessions.find(frame.getChannel());
if (i == sessions.end()) {
- // Non local method frame, invoke.
- localBroker->handle(frame.frame);
+ // Non-local wiring method frame, invoke locally.
+ localBroker->handle(frame);
}
else {
- // Local frame, continue on local chain
- i->second.in->handle(frame.frame);
+ // Local frame continuing on local chain
+ i->second.in->handle(frame);
}
}
+}
+
+void SessionManager::setClusterSend(const FrameHandler::Chain& send) {
+ clusterSend=send;
}
}} // namespace qpid::cluster
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/SessionManager.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/SessionManager.h?view=diff&rev=560404&r1=560403&r2=560404
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/SessionManager.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/SessionManager.h Fri Jul 27 15:08:51 2007
@@ -19,7 +19,6 @@
*
*/
-#include "qpid/cluster/SessionFrame.h"
#include "qpid/framing/HandlerUpdater.h"
#include "qpid/framing/FrameHandler.h"
#include "qpid/framing/Uuid.h"
@@ -41,30 +40,30 @@
* Manage sessions and handler chains for the cluster.
*
*/
-class SessionManager : public framing::HandlerUpdater, public SessionFrameHandler,
+class SessionManager : public framing::HandlerUpdater, public framing::FrameHandler,
private boost::noncopyable
{
public:
SessionManager(broker::Broker& broker);
/** Set the handler to send to the cluster */
- void setClusterSend(const SessionFrameHandler::Chain& send) { clusterSend=send; }
+ void setClusterSend(const framing::FrameHandler::Chain& send);
/** As ChannelUpdater update the handler chains. */
- void update(framing::FrameHandler::Chains& chains);
+ void update(framing::ChannelId, framing::FrameHandler::Chains&);
- /** As SessionFrameHandler handle frames received from the cluster */
- void handle(SessionFrame&);
+ /** As FrameHandler frames received from the cluster */
+ void handle(framing::AMQFrame&);
/** Get ChannelID for UUID. Return 0 if no mapping */
framing::ChannelId getChannelId(const framing::Uuid&) const;
private:
class SessionOperations;
- typedef std::map<framing::Uuid,framing::FrameHandler::Chains> SessionMap;
+ typedef std::map<framing::ChannelId,framing::FrameHandler::Chains> SessionMap;
sys::Mutex lock;
- SessionFrameHandler::Chain clusterSend;
+ framing::FrameHandler::Chain clusterSend;
framing::FrameHandler::Chain localBroker;
SessionMap sessions;
};
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/framing/HandlerUpdater.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/framing/HandlerUpdater.h?view=diff&rev=560404&r1=560403&r2=560404
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/framing/HandlerUpdater.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/framing/HandlerUpdater.h Fri Jul 27 15:08:51 2007
@@ -31,10 +31,10 @@
virtual ~HandlerUpdater() {}
/** Update the handler chains.
- *@param id Unique identifier for channel or session.
+ *@param channel Id of associated channel.
*@param chains Handler chains to be updated.
*/
- virtual void update(FrameHandler::Chains& chains) = 0;
+ virtual void update(ChannelId channel, FrameHandler::Chains& chains) = 0;
};
}} // namespace qpid::framing