You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rg...@apache.org on 2009/10/05 12:47:56 UTC
svn commit: r821749 [1/3] - in /qpid/branches/java-broker-0-10/qpid: ./
cpp/src/qpid/amqp_0_10/ cpp/src/qpid/broker/ cpp/src/qpid/client/
cpp/src/qpid/cluster/ cpp/src/qpid/framing/ cpp/src/qpid/management/
cpp/src/qpid/replication/ cpp/src/qpid/sys/ c...
Author: rgodfrey
Date: Mon Oct 5 10:47:52 2009
New Revision: 821749
URL: http://svn.apache.org/viewvc?rev=821749&view=rev
Log:
Merged from trunk up to r800440
Added:
qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/subjects/MessagesStoreLogSubject.java
- copied unchanged from r800440, qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/subjects/MessagesStoreLogSubject.java
qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/logging/subjects/MessageStoreLogSubjectTest.java
- copied unchanged from r800440, qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/logging/subjects/MessageStoreLogSubjectTest.java
qpid/branches/java-broker-0-10/qpid/java/systests/src/main/java/org/apache/qpid/server/logging/
- copied from r800440, qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/logging/
qpid/branches/java-broker-0-10/qpid/java/systests/src/main/java/org/apache/qpid/server/logging/AbstractTestLogging.java
- copied unchanged from r800440, qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/logging/AbstractTestLogging.java
qpid/branches/java-broker-0-10/qpid/java/systests/src/main/java/org/apache/qpid/server/logging/AlertingTest.java
- copied unchanged from r800440, qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/logging/AlertingTest.java
qpid/branches/java-broker-0-10/qpid/java/systests/src/main/java/org/apache/qpid/server/logging/ChannelLoggingTest.java
- copied unchanged from r800440, qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/logging/ChannelLoggingTest.java
qpid/branches/java-broker-0-10/qpid/java/systests/src/main/java/org/apache/qpid/server/logging/ConnectionLoggingTest.java
- copied unchanged from r800440, qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/logging/ConnectionLoggingTest.java
qpid/branches/java-broker-0-10/qpid/java/systests/src/main/java/org/apache/qpid/util/LogMonitor.java
- copied unchanged from r800440, qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/util/LogMonitor.java
qpid/branches/java-broker-0-10/qpid/java/systests/src/main/java/org/apache/qpid/util/LogMonitorTest.java
- copied unchanged from r800440, qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/util/LogMonitorTest.java
Removed:
qpid/branches/java-broker-0-10/qpid/cpp/src/tests/allSegmentTypes.h
qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockProtocolSession.java
qpid/branches/java-broker-0-10/qpid/java/systests/src/main/java/org/apache/qpid/server/AlertingTest.java
Modified:
qpid/branches/java-broker-0-10/qpid/ (props changed)
qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/amqp_0_10/Connection.cpp
qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/amqp_0_10/Connection.h
qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/broker/Broker.h
qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/broker/Connection.cpp
qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/broker/Connection.h
qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/broker/DtxManager.cpp
qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/broker/DtxManager.h
qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/broker/DtxTimeout.h
qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/broker/LinkRegistry.cpp
qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/broker/LinkRegistry.h
qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/broker/Queue.cpp
qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/broker/Queue.h
qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/broker/QueueCleaner.cpp
qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/broker/QueueCleaner.h
qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/broker/SessionState.cpp
qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/broker/SessionState.h
qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/client/Connector.cpp
qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/cluster/Cluster.cpp
qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/cluster/Cluster.h
qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/cluster/ClusterMap.cpp
qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/cluster/ClusterMap.h
qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/cluster/Connection.cpp
qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/cluster/Connection.h
qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/cluster/ConnectionCodec.cpp
qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/cluster/ConnectionCodec.h
qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/cluster/ErrorCheck.cpp
qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/cluster/ErrorCheck.h
qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/cluster/ExpiryPolicy.cpp
qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/cluster/ExpiryPolicy.h
qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/framing/AMQFrame.h
qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/management/ManagementAgent.cpp
qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/management/ManagementAgent.h
qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/replication/ReplicatingEventListener.cpp
qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/replication/ReplicationExchange.cpp
qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/replication/constants.h
qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/sys/Timer.cpp
qpid/branches/java-broker-0-10/qpid/cpp/src/tests/Makefile.am
qpid/branches/java-broker-0-10/qpid/cpp/src/tests/PartialFailure.cpp
qpid/branches/java-broker-0-10/qpid/cpp/src/tests/QueueTest.cpp
qpid/branches/java-broker-0-10/qpid/cpp/src/tests/replication_test
qpid/branches/java-broker-0-10/qpid/cpp/xml/cluster.xml
qpid/branches/java-broker-0-10/qpid/gentools/templ.java/model/ProtocolVersionListClass.vm
qpid/branches/java-broker-0-10/qpid/java/ (props changed)
qpid/branches/java-broker-0-10/qpid/java/broker/ (props changed)
qpid/branches/java-broker-0-10/qpid/java/broker/bin/ (props changed)
qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/ServerConfiguration.java
qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/LogActor.java
qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/RootMessageLogger.java
qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/RootMessageLoggerImpl.java
qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/actors/AbstractActor.java
qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/messages/LogMessages_en_US.properties
qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/rawloggers/Log4jMessageLogger.java
qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/management/ (props changed)
qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java
qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java
qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBean.java
qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ (props changed)
qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java
qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java
qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/DestWildExchangeTest.java
qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/HeadersExchangeTest.java
qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/logging/actors/AMQPChannelActorTest.java
qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/logging/actors/AMQPConnectionActorTest.java
qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/logging/actors/CurrentActorTest.java
qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/logging/messages/ChannelMessagesTest.java
qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/logging/messages/ConnectionMessagesTest.java
qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/logging/rawloggers/Log4jMessageLoggerTest.java
qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/logging/subjects/AbstractTestLogSubject.java
qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/logging/subjects/ChannelLogSubjectTest.java
qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/logging/subjects/ConnectionLogSubjectTest.java
qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/logging/subjects/SubscriptionLogSubjectTest.java
qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBeanTest.java
qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/InternalTestProtocolSession.java
qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/MaxChannelsTest.java
qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueAlertTest.java
qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java
qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AckTest.java
qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/security/access/ACLManagerTest.java
qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/store/MessageStoreTest.java
qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/util/InternalBrokerBaseCase.java
qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/util/TestApplicationRegistry.java
qpid/branches/java-broker-0-10/qpid/java/broker/src/velocity/templates/org/apache/qpid/server/logging/messages/LogMessages.vm
qpid/branches/java-broker-0-10/qpid/java/common/templates/model/ProtocolVersionListClass.vm
qpid/branches/java-broker-0-10/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/ConfigurationManagement.java (props changed)
qpid/branches/java-broker-0-10/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/LoggingManagement.java (props changed)
qpid/branches/java-broker-0-10/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/ManagedBroker.java (props changed)
qpid/branches/java-broker-0-10/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/ManagedConnection.java (props changed)
qpid/branches/java-broker-0-10/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/ManagedExchange.java (props changed)
qpid/branches/java-broker-0-10/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/ManagedQueue.java (props changed)
qpid/branches/java-broker-0-10/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/UserManagement.java (props changed)
qpid/branches/java-broker-0-10/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/annotations/MBeanAttribute.java (props changed)
qpid/branches/java-broker-0-10/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/annotations/MBeanConstructor.java (props changed)
qpid/branches/java-broker-0-10/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/annotations/MBeanDescription.java (props changed)
qpid/branches/java-broker-0-10/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/annotations/MBeanOperation.java (props changed)
qpid/branches/java-broker-0-10/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/annotations/MBeanOperationParameter.java (props changed)
qpid/branches/java-broker-0-10/qpid/java/management/eclipse-plugin/src/ (props changed)
qpid/branches/java-broker-0-10/qpid/java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/views/type/ConnectionTypeTabControl.java (props changed)
qpid/branches/java-broker-0-10/qpid/java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/views/type/ExchangeTypeTabControl.java (props changed)
qpid/branches/java-broker-0-10/qpid/java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/views/type/MBeanTypeTabControl.java (props changed)
qpid/branches/java-broker-0-10/qpid/java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/views/type/QueueTypeTabControl.java (props changed)
qpid/branches/java-broker-0-10/qpid/java/management/eclipse-plugin/src/main/resources/macosx/Contents/MacOS/qpidmc (props changed)
qpid/branches/java-broker-0-10/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/QpidTestCase.java
qpid/branches/java-broker-0-10/qpid/java/test-profiles/010Excludes (contents, props changed)
qpid/branches/java-broker-0-10/qpid/java/test-profiles/08Excludes (props changed)
qpid/branches/java-broker-0-10/qpid/java/test-profiles/08StandaloneExcludes (props changed)
qpid/branches/java-broker-0-10/qpid/java/test-profiles/08TransientExcludes (props changed)
qpid/branches/java-broker-0-10/qpid/java/test-profiles/Excludes (props changed)
qpid/branches/java-broker-0-10/qpid/java/test-profiles/XAExcludes (props changed)
qpid/branches/java-broker-0-10/qpid/java/test-profiles/cpp.ssl.excludes (props changed)
Propchange: qpid/branches/java-broker-0-10/qpid/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Mon Oct 5 10:47:52 2009
@@ -1 +1 @@
-/qpid/trunk/qpid:796196-799240
+/qpid/trunk/qpid:796196-800440
Modified: qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/amqp_0_10/Connection.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/amqp_0_10/Connection.cpp?rev=821749&r1=821748&r2=821749&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/amqp_0_10/Connection.cpp (original)
+++ qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/amqp_0_10/Connection.cpp Mon Oct 5 10:47:52 2009
@@ -31,7 +31,8 @@
using sys::Mutex;
Connection::Connection(sys::OutputControl& o, const std::string& id, bool _isClient)
- : frameQueueClosed(false), output(o), identifier(id), initialized(false), isClient(_isClient), buffered(0)
+ : frameQueueClosed(false), output(o), identifier(id), initialized(false),
+ isClient(_isClient), buffered(0), version(0,10)
{}
void Connection::setInputHandler(std::auto_ptr<sys::ConnectionInputHandler> c) {
@@ -44,7 +45,9 @@
//read in protocol header
framing::ProtocolInitiation pi;
if (pi.decode(in)) {
- //TODO: check the version is correct
+ if(!(pi==version))
+ throw Exception(QPID_MSG("Unsupported version: " << pi
+ << " supported version " << version));
QPID_LOG(trace, "RECV " << identifier << " INIT(" << pi << ")");
}
initialized = true;
@@ -128,7 +131,11 @@
}
framing::ProtocolVersion Connection::getVersion() const {
- return framing::ProtocolVersion(0,10);
+ return version;
+}
+
+void Connection::setVersion(const framing::ProtocolVersion& v) {
+ version = v;
}
size_t Connection::getBuffered() const {
Modified: qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/amqp_0_10/Connection.h
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/amqp_0_10/Connection.h?rev=821749&r1=821748&r2=821749&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/amqp_0_10/Connection.h (original)
+++ qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/amqp_0_10/Connection.h Mon Oct 5 10:47:52 2009
@@ -55,6 +55,7 @@
bool initialized;
bool isClient;
size_t buffered;
+ framing::ProtocolVersion version;
public:
QPID_BROKER_EXTERN Connection(sys::OutputControl&, const std::string& id, bool isClient);
@@ -71,6 +72,10 @@
void send(framing::AMQFrame&);
framing::ProtocolVersion getVersion() const;
size_t getBuffered() const;
+
+ /** Used by cluster code to set a special version on "update" connections. */
+ // FIXME aconway 2009-07-30: find a cleaner mechanism for this.
+ void setVersion(const framing::ProtocolVersion&);
};
}} // namespace qpid::amqp_0_10
Modified: qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/broker/Broker.h
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/broker/Broker.h?rev=821749&r1=821748&r2=821749&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/broker/Broker.h (original)
+++ qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/broker/Broker.h Mon Oct 5 10:47:52 2009
@@ -36,7 +36,6 @@
#include "qpid/broker/QueueEvents.h"
#include "qpid/broker/Vhost.h"
#include "qpid/broker/System.h"
-#include "qpid/broker/Timer.h"
#include "qpid/broker/ExpiryPolicy.h"
#include "qpid/management/Manageable.h"
#include "qpid/management/ManagementAgent.h"
@@ -49,6 +48,7 @@
#include "qpid/framing/OutputHandler.h"
#include "qpid/framing/ProtocolInitiation.h"
#include "qpid/sys/Runnable.h"
+#include "qpid/sys/Timer.h"
#include "qpid/RefCounted.h"
#include "qpid/broker/AclModule.h"
@@ -115,13 +115,14 @@
private:
std::string getHome();
};
-
+
private:
typedef std::map<std::string, boost::shared_ptr<sys::ProtocolFactory> > ProtocolFactoryMap;
void declareStandardExchange(const std::string& name, const std::string& type);
boost::shared_ptr<sys::Poller> poller;
+ sys::Timer timer;
Options config;
ProtocolFactoryMap protocolFactories;
std::auto_ptr<MessageStore> store;
@@ -132,7 +133,6 @@
ExchangeRegistry exchanges;
LinkRegistry links;
boost::shared_ptr<sys::ConnectionCodec::Factory> factory;
- Timer timer;
DtxManager dtxManager;
SessionManager sessionManager;
management::ManagementAgent* managementAgent;
@@ -148,8 +148,6 @@
boost::intrusive_ptr<ExpiryPolicy> expiryPolicy;
public:
-
-
virtual ~Broker();
QPID_BROKER_EXTERN Broker(const Options& configuration);
@@ -188,7 +186,7 @@
void setExpiryPolicy(const boost::intrusive_ptr<ExpiryPolicy>& e) { expiryPolicy = e; }
boost::intrusive_ptr<ExpiryPolicy> getExpiryPolicy() { return expiryPolicy; }
-
+
SessionManager& getSessionManager() { return sessionManager; }
const std::string& getFederationTag() const { return federationTag; }
@@ -197,7 +195,7 @@
management::Manageable::status_t ManagementMethod (uint32_t methodId,
management::Args& args,
std::string& text);
-
+
/** Add to the broker's protocolFactorys */
void registerProtocolFactory(const std::string& name, boost::shared_ptr<sys::ProtocolFactory>);
@@ -229,7 +227,7 @@
boost::shared_ptr<sys::ConnectionCodec::Factory> getConnectionFactory() { return factory; }
void setConnectionFactory(boost::shared_ptr<sys::ConnectionCodec::Factory> f) { factory = f; }
- Timer& getTimer() { return timer; }
+ sys::Timer& getTimer() { return timer; }
boost::function<std::vector<Url> ()> getKnownBrokers;
@@ -242,7 +240,5 @@
};
}}
-
-
#endif /*!_Broker_*/
Modified: qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/broker/Connection.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/broker/Connection.cpp?rev=821749&r1=821748&r2=821749&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/broker/Connection.cpp (original)
+++ qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/broker/Connection.cpp Mon Oct 5 10:47:52 2009
@@ -49,35 +49,25 @@
namespace qpid {
namespace broker {
-struct ConnectionTimeoutTask : public TimerTask {
- Timer& timer;
+struct ConnectionTimeoutTask : public sys::TimerTask {
+ sys::Timer& timer;
Connection& connection;
- AbsTime expires;
- ConnectionTimeoutTask(uint16_t hb, Timer& t, Connection& c) :
+ ConnectionTimeoutTask(uint16_t hb, sys::Timer& t, Connection& c) :
TimerTask(Duration(hb*2*TIME_SEC)),
timer(t),
- connection(c),
- expires(AbsTime::now(), duration)
+ connection(c)
{}
- void touch()
- {
- expires = AbsTime(AbsTime::now(), duration);
+ void touch() {
+ restart();
}
void fire() {
- // This is the best we can currently do to avoid a destruction/fire race
- if (isCancelled()) return;
- if (expires < AbsTime::now()) {
- // If we get here then we've not received any traffic in the timeout period
- // Schedule closing the connection for the io thread
- QPID_LOG(error, "Connection timed out: closing");
- connection.abort();
- } else {
- reset();
- timer.add(this);
- }
+ // If we get here then we've not received any traffic in the timeout period
+ // Schedule closing the connection for the io thread
+ QPID_LOG(error, "Connection timed out: closing");
+ connection.abort();
}
};
@@ -338,25 +328,22 @@
adapter.setSecureConnection(s);
}
-struct ConnectionHeartbeatTask : public TimerTask {
- Timer& timer;
+struct ConnectionHeartbeatTask : public sys::TimerTask {
+ sys::Timer& timer;
Connection& connection;
- ConnectionHeartbeatTask(uint16_t hb, Timer& t, Connection& c) :
+ ConnectionHeartbeatTask(uint16_t hb, sys::Timer& t, Connection& c) :
TimerTask(Duration(hb*TIME_SEC)),
timer(t),
connection(c)
{}
void fire() {
- // This is the best we can currently do to avoid a destruction/fire race
- if (!isCancelled()) {
- // Setup next firing
- reset();
- timer.add(this);
+ // Setup next firing
+ setupNextFire();
+ timer.add(this);
- // Send Heartbeat
- connection.sendHeartbeat();
- }
+ // Send Heartbeat
+ connection.sendHeartbeat();
}
};
Modified: qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/broker/Connection.h
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/broker/Connection.h?rev=821749&r1=821748&r2=821749&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/broker/Connection.h (original)
+++ qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/broker/Connection.h Mon Oct 5 10:47:52 2009
@@ -152,8 +152,8 @@
qmf::org::apache::qpid::broker::Connection* mgmtObject;
LinkRegistry& links;
management::ManagementAgent* agent;
- Timer& timer;
- boost::intrusive_ptr<TimerTask> heartbeatTimer;
+ sys::Timer& timer;
+ boost::intrusive_ptr<sys::TimerTask> heartbeatTimer;
boost::intrusive_ptr<ConnectionTimeoutTask> timeoutTimer;
ErrorListener* errorListener;
bool shadow;
Modified: qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/broker/DtxManager.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/broker/DtxManager.cpp?rev=821749&r1=821748&r2=821749&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/broker/DtxManager.cpp (original)
+++ qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/broker/DtxManager.cpp Mon Oct 5 10:47:52 2009
@@ -22,6 +22,7 @@
#include "qpid/broker/DtxTimeout.h"
#include "qpid/framing/reply_exceptions.h"
#include "qpid/log/Statement.h"
+#include "qpid/sys/Timer.h"
#include "qpid/ptr_map.h"
#include <boost/format.hpp>
@@ -33,7 +34,7 @@
using namespace qpid::broker;
using namespace qpid::framing;
-DtxManager::DtxManager(Timer& t) : store(0), timer(t) {}
+DtxManager::DtxManager(qpid::sys::Timer& t) : store(0), timer(t) {}
DtxManager::~DtxManager() {}
@@ -130,8 +131,7 @@
}
timeout = intrusive_ptr<DtxTimeout>(new DtxTimeout(secs, *this, xid));
record->setTimeout(timeout);
- timer.add(boost::static_pointer_cast<TimerTask>(timeout));
-
+ timer.add(timeout);
}
uint32_t DtxManager::getTimeout(const std::string& xid)
Modified: qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/broker/DtxManager.h
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/broker/DtxManager.h?rev=821749&r1=821748&r2=821749&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/broker/DtxManager.h (original)
+++ qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/broker/DtxManager.h Mon Oct 5 10:47:52 2009
@@ -24,9 +24,9 @@
#include <boost/ptr_container/ptr_map.hpp>
#include "qpid/broker/DtxBuffer.h"
#include "qpid/broker/DtxWorkRecord.h"
-#include "qpid/broker/Timer.h"
#include "qpid/broker/TransactionalStore.h"
#include "qpid/framing/amqp_types.h"
+#include "qpid/sys/Timer.h"
#include "qpid/sys/Mutex.h"
namespace qpid {
@@ -35,7 +35,7 @@
class DtxManager{
typedef boost::ptr_map<std::string, DtxWorkRecord> WorkMap;
- struct DtxCleanup : public TimerTask
+ struct DtxCleanup : public sys::TimerTask
{
DtxManager& mgr;
const std::string& xid;
@@ -47,14 +47,14 @@
WorkMap work;
TransactionalStore* store;
qpid::sys::Mutex lock;
- Timer& timer;
+ qpid::sys::Timer& timer;
void remove(const std::string& xid);
DtxWorkRecord* getWork(const std::string& xid);
DtxWorkRecord* createWork(std::string xid);
public:
- DtxManager(Timer&);
+ DtxManager(qpid::sys::Timer&);
~DtxManager();
void start(const std::string& xid, DtxBuffer::shared_ptr work);
void join(const std::string& xid, DtxBuffer::shared_ptr work);
Modified: qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/broker/DtxTimeout.h
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/broker/DtxTimeout.h?rev=821749&r1=821748&r2=821749&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/broker/DtxTimeout.h (original)
+++ qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/broker/DtxTimeout.h Mon Oct 5 10:47:52 2009
@@ -22,7 +22,7 @@
#define _DtxTimeout_
#include "qpid/Exception.h"
-#include "qpid/broker/Timer.h"
+#include "qpid/sys/Timer.h"
namespace qpid {
namespace broker {
@@ -31,12 +31,12 @@
struct DtxTimeoutException : public Exception {};
-struct DtxTimeout : public TimerTask
+struct DtxTimeout : public sys::TimerTask
{
const uint32_t timeout;
DtxManager& mgr;
const std::string xid;
-
+
DtxTimeout(uint32_t timeout, DtxManager& mgr, const std::string& xid);
void fire();
};
Modified: qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/broker/LinkRegistry.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/broker/LinkRegistry.cpp?rev=821749&r1=821748&r2=821749&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/broker/LinkRegistry.cpp (original)
+++ qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/broker/LinkRegistry.cpp Mon Oct 5 10:47:52 2009
@@ -41,18 +41,18 @@
// factored: The persistence element and maintenance element
// should be factored separately
LinkRegistry::LinkRegistry () :
- broker(0),
+ broker(0), timer(0),
parent(0), store(0), passive(false), passiveChanged(false),
realm("")
{
}
LinkRegistry::LinkRegistry (Broker* _broker) :
- broker(_broker),
- parent(0), store(0), passive(false), passiveChanged(false),
+ broker(_broker), timer(&broker->getTimer()),
+ parent(0), store(0), passive(false), passiveChanged(false),
realm(broker->getOptions().realm)
{
- timer.add (intrusive_ptr<TimerTask> (new Periodic(*this)));
+ timer->add (new Periodic(*this));
}
LinkRegistry::Periodic::Periodic (LinkRegistry& _links) :
@@ -61,7 +61,7 @@
void LinkRegistry::Periodic::fire ()
{
links.periodicMaintenance ();
- links.timer.add (intrusive_ptr<TimerTask> (new Periodic(links)));
+ links.timer->add (new Periodic(links));
}
void LinkRegistry::periodicMaintenance ()
Modified: qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/broker/LinkRegistry.h
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/broker/LinkRegistry.h?rev=821749&r1=821748&r2=821749&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/broker/LinkRegistry.h (original)
+++ qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/broker/LinkRegistry.h Mon Oct 5 10:47:52 2009
@@ -25,9 +25,9 @@
#include <map>
#include "qpid/broker/Bridge.h"
#include "qpid/broker/MessageStore.h"
-#include "qpid/broker/Timer.h"
#include "qpid/Address.h"
#include "qpid/sys/Mutex.h"
+#include "qpid/sys/Timer.h"
#include "qpid/management/Manageable.h"
#include <boost/shared_ptr.hpp>
@@ -41,7 +41,7 @@
// Declare a timer task to manage the establishment of link connections and the
// re-establishment of lost link connections.
- struct Periodic : public TimerTask
+ struct Periodic : public sys::TimerTask
{
LinkRegistry& links;
@@ -62,7 +62,7 @@
qpid::sys::Mutex lock;
Broker* broker;
- Timer timer;
+ sys::Timer* timer;
management::Manageable* parent;
MessageStore* store;
bool passive;
Modified: qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/broker/Queue.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/broker/Queue.cpp?rev=821749&r1=821748&r2=821749&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/broker/Queue.cpp (original)
+++ qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/broker/Queue.cpp Mon Oct 5 10:47:52 2009
@@ -677,21 +677,25 @@
{
if (persistLastNode){
Mutex::ScopedLock locker(messageLock);
- for ( Messages::iterator i = messages.begin(); i != messages.end(); ++i ) {
- if (lastValueQueue) checkLvqReplace(*i);
- // don't force a message twice to disk.
- if(!i->payload->isStoredOnQueue(shared_from_this())) {
- i->payload->forcePersistent();
- if (i->payload->isForcedPersistent() ){
- enqueue(0, i->payload);
+ try {
+ for ( Messages::iterator i = messages.begin(); i != messages.end(); ++i ) {
+ if (lastValueQueue) checkLvqReplace(*i);
+ // don't force a message twice to disk.
+ if(!i->payload->isStoredOnQueue(shared_from_this())) {
+ i->payload->forcePersistent();
+ if (i->payload->isForcedPersistent() ){
+ enqueue(0, i->payload);
+ }
}
- }
- }
+ }
+ } catch (const std::exception& e) {
+ // Could not go into last node standing (for example journal not large enough)
+ QPID_LOG(error, "Unable to fail to last node standing for queue: " << name << " : " << e.what());
+ }
inLastNodeFailure = true;
}
}
-
// return true if store exists,
bool Queue::enqueue(TransactionContext* ctxt, boost::intrusive_ptr<Message> msg)
{
@@ -1012,6 +1016,10 @@
sequence = n;
}
+SequenceNumber Queue::getPosition() {
+ return sequence;
+}
+
int Queue::getEventMode() { return eventMode; }
void Queue::setQueueEventManager(QueueEvents& mgr)
Modified: qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/broker/Queue.h
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/broker/Queue.h?rev=821749&r1=821748&r2=821749&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/broker/Queue.h (original)
+++ qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/broker/Queue.h Mon Oct 5 10:47:52 2009
@@ -324,6 +324,9 @@
* Used by cluster to replicate queues.
*/
void setPosition(framing::SequenceNumber pos);
+ /** return current position sequence number for the next message on the queue.
+ */
+ framing::SequenceNumber getPosition();
int getEventMode();
void setQueueEventManager(QueueEvents&);
QPID_BROKER_EXTERN void insertSequenceNumbers(const std::string& key);
Modified: qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/broker/QueueCleaner.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/broker/QueueCleaner.cpp?rev=821749&r1=821748&r2=821749&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/broker/QueueCleaner.cpp (original)
+++ qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/broker/QueueCleaner.cpp Mon Oct 5 10:47:52 2009
@@ -26,15 +26,15 @@
namespace qpid {
namespace broker {
-QueueCleaner::QueueCleaner(QueueRegistry& q, Timer& t) : queues(q), timer(t) {}
+QueueCleaner::QueueCleaner(QueueRegistry& q, sys::Timer& t) : queues(q), timer(t) {}
void QueueCleaner::start(qpid::sys::Duration p)
{
- task = boost::intrusive_ptr<TimerTask>(new Task(*this, p));
+ task = new Task(*this, p);
timer.add(task);
}
-QueueCleaner::Task::Task(QueueCleaner& p, qpid::sys::Duration d) : TimerTask(d), parent(p) {}
+QueueCleaner::Task::Task(QueueCleaner& p, qpid::sys::Duration d) : sys::TimerTask(d), parent(p) {}
void QueueCleaner::Task::fire()
{
@@ -44,7 +44,7 @@
void QueueCleaner::fired()
{
queues.eachQueue(boost::bind(&Queue::purgeExpired, _1));
- task->reset();
+ task->setupNextFire();
timer.add(task);
}
Modified: qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/broker/QueueCleaner.h
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/broker/QueueCleaner.h?rev=821749&r1=821748&r2=821749&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/broker/QueueCleaner.h (original)
+++ qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/broker/QueueCleaner.h Mon Oct 5 10:47:52 2009
@@ -23,7 +23,7 @@
*/
#include "qpid/broker/BrokerImportExport.h"
-#include "qpid/broker/Timer.h"
+#include "qpid/sys/Timer.h"
namespace qpid {
namespace broker {
@@ -35,10 +35,10 @@
class QueueCleaner
{
public:
- QPID_BROKER_EXTERN QueueCleaner(QueueRegistry& queues, Timer& timer);
+ QPID_BROKER_EXTERN QueueCleaner(QueueRegistry& queues, sys::Timer& timer);
QPID_BROKER_EXTERN void start(qpid::sys::Duration period);
private:
- class Task : public TimerTask
+ class Task : public sys::TimerTask
{
public:
Task(QueueCleaner& parent, qpid::sys::Duration duration);
@@ -46,10 +46,10 @@
private:
QueueCleaner& parent;
};
-
- boost::intrusive_ptr<TimerTask> task;
+
+ boost::intrusive_ptr<sys::TimerTask> task;
QueueRegistry& queues;
- Timer& timer;
+ sys::Timer& timer;
void fired();
};
Modified: qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/broker/SessionState.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/broker/SessionState.cpp?rev=821749&r1=821748&r2=821749&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/broker/SessionState.cpp (original)
+++ qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/broker/SessionState.cpp Mon Oct 5 10:47:52 2009
@@ -25,7 +25,7 @@
#include "qpid/broker/SessionManager.h"
#include "qpid/broker/SessionHandler.h"
#include "qpid/broker/RateFlowcontrol.h"
-#include "qpid/broker/Timer.h"
+#include "qpid/sys/Timer.h"
#include "qpid/framing/AMQContentBody.h"
#include "qpid/framing/AMQHeaderBody.h"
#include "qpid/framing/AMQMethodBody.h"
@@ -49,6 +49,7 @@
using qpid::management::Manageable;
using qpid::management::Args;
using qpid::sys::AbsTime;
+//using qpid::sys::Timer;
namespace _qmf = qmf::org::apache::qpid::broker;
SessionState::SessionState(
@@ -206,10 +207,10 @@
}
}
-struct ScheduledCreditTask : public TimerTask {
- Timer& timer;
+struct ScheduledCreditTask : public sys::TimerTask {
+ sys::Timer& timer;
SessionState& sessionState;
- ScheduledCreditTask(const qpid::sys::Duration& d, Timer& t,
+ ScheduledCreditTask(const qpid::sys::Duration& d, sys::Timer& t,
SessionState& s) :
TimerTask(d),
timer(t),
@@ -218,15 +219,13 @@
void fire() {
// This is the best we can currently do to avoid a destruction/fire race
- if (!isCancelled()) {
- sessionState.getConnection().requestIOProcessing(boost::bind(&ScheduledCreditTask::sendCredit, this));
- }
+ sessionState.getConnection().requestIOProcessing(boost::bind(&ScheduledCreditTask::sendCredit, this));
}
void sendCredit() {
if ( !sessionState.processSendCredit(0) ) {
QPID_LOG(warning, sessionState.getId() << ": Reschedule sending credit");
- reset();
+ setupNextFire();
timer.add(this);
}
}
@@ -269,7 +268,7 @@
if (rateFlowcontrol && frame.getBof() && frame.getBos()) {
if ( !processSendCredit(1) ) {
QPID_LOG(debug, getId() << ": Schedule sending credit");
- Timer& timer = getBroker().getTimer();
+ sys::Timer& timer = getBroker().getTimer();
// Use heuristic for scheduled credit of time for 50 messages, but not longer than 500ms
sys::Duration d = std::min(sys::TIME_SEC * 50 / rateFlowcontrol->getRate(), 500 * sys::TIME_MSEC);
flowControlTimer = new ScheduledCreditTask(d, timer, *this);
Modified: qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/broker/SessionState.h
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/broker/SessionState.h?rev=821749&r1=821748&r2=821749&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/broker/SessionState.h (original)
+++ qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/broker/SessionState.h Mon Oct 5 10:47:52 2009
@@ -48,6 +48,10 @@
class AMQP_ClientProxy;
}
+namespace sys {
+class TimerTask;
+}
+
namespace broker {
class Broker;
@@ -56,7 +60,6 @@
class SessionHandler;
class SessionManager;
class RateFlowcontrol;
-struct TimerTask;
/**
* Broker-side session state includes session's handler chains, which
@@ -153,7 +156,7 @@
// State used for producer flow control (rate limited)
qpid::sys::Mutex rateLock;
boost::scoped_ptr<RateFlowcontrol> rateFlowcontrol;
- boost::intrusive_ptr<TimerTask> flowControlTimer;
+ boost::intrusive_ptr<sys::TimerTask> flowControlTimer;
friend class SessionManager;
};
Modified: qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/client/Connector.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/client/Connector.cpp?rev=821749&r1=821748&r2=821749&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/client/Connector.cpp (original)
+++ qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/client/Connector.cpp Mon Oct 5 10:47:52 2009
@@ -360,8 +360,11 @@
if (!initiated) {
framing::ProtocolInitiation protocolInit;
if (protocolInit.decode(in)) {
- //TODO: check the version is correct
QPID_LOG(debug, "RECV " << identifier << " INIT(" << protocolInit << ")");
+ if(!(protocolInit==version)){
+ throw Exception(QPID_MSG("Unsupported version: " << protocolInit
+ << " supported version " << version));
+ }
}
initiated = true;
}
Modified: qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/cluster/Cluster.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/cluster/Cluster.cpp?rev=821749&r1=821748&r2=821749&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/cluster/Cluster.cpp (original)
+++ qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/cluster/Cluster.cpp Mon Oct 5 10:47:52 2009
@@ -66,7 +66,7 @@
*
* Events are either
* - Connection events: non-0 connection number and are associated with a connection.
- * - Cluster Events: 0 connection number, are not associated with a connectin.
+ * - Cluster Events: 0 connection number, are not associated with a connection.
*
* Events are further categorized as:
* - Control: carries method frame(s) that affect cluster behavior.
@@ -149,7 +149,7 @@
* sensible reporting of an attempt to mix different versions in a
* cluster.
*/
-const uint32_t Cluster::CLUSTER_VERSION = 1;
+const uint32_t Cluster::CLUSTER_VERSION = 2;
struct ClusterDispatcher : public framing::AMQP_AllOperations::ClusterHandler {
qpid::cluster::Cluster& cluster;
@@ -163,7 +163,7 @@
void updateOffer(uint64_t updatee, const Uuid& id, uint32_t version) { cluster.updateOffer(member, updatee, id, version, l); }
void retractOffer(uint64_t updatee) { cluster.retractOffer(member, updatee, l); }
void messageExpired(uint64_t id) { cluster.messageExpired(member, id, l); }
- void errorCheck(uint8_t type, uint64_t frameSeq) { cluster.errorCheck(member, type, frameSeq, l); }
+ void errorCheck(uint8_t type, const framing::SequenceNumber& frameSeq) { cluster.errorCheck(member, type, frameSeq, l); }
void shutdown() { cluster.shutdown(member, l); }
@@ -869,15 +869,12 @@
expiryPolicy->deliverExpire(id);
}
-void Cluster::errorCheck(const MemberId&, uint8_t type, uint64_t frameSeq, Lock&) {
- // If we handle an errorCheck at this point (rather than in the
- // ErrorCheck class) then we have processed succesfully past the
- // point of the error.
- if (state >= CATCHUP && type != ERROR_TYPE_NONE) {
- QPID_LOG(notice, *this << " error " << frameSeq << " did not occur locally.");
- mcast.mcastControl(
- ClusterErrorCheckBody(ProtocolVersion(), ERROR_TYPE_NONE, frameSeq), self);
- }
+void Cluster::errorCheck(const MemberId& from, uint8_t type, framing::SequenceNumber frameSeq, Lock&) {
+ // If we see an errorCheck here (rather than in the ErrorCheck
+ // class) then we have processed succesfully past the point of the
+ // error.
+ if (state >= CATCHUP) // Don't respond pre catchup, we don't know what happened
+ error.respondNone(from, type, frameSeq);
}
Modified: qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/cluster/Cluster.h
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/cluster/Cluster.h?rev=821749&r1=821748&r2=821749&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/cluster/Cluster.h (original)
+++ qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/cluster/Cluster.h Mon Oct 5 10:47:52 2009
@@ -152,7 +152,7 @@
void ready(const MemberId&, const std::string&, Lock&);
void configChange(const MemberId&, const std::string& current, Lock& l);
void messageExpired(const MemberId&, uint64_t, Lock& l);
- void errorCheck(const MemberId&, uint8_t type, uint64_t frameSeq, Lock&);
+ void errorCheck(const MemberId&, uint8_t type, SequenceNumber frameSeq, Lock&);
void shutdown(const MemberId&, Lock&);
Modified: qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/cluster/ClusterMap.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/cluster/ClusterMap.cpp?rev=821749&r1=821748&r2=821749&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/cluster/ClusterMap.cpp (original)
+++ qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/cluster/ClusterMap.cpp Mon Oct 5 10:47:52 2009
@@ -71,7 +71,7 @@
joiners[id] = url;
}
-ClusterMap::ClusterMap(const FieldTable& joinersFt, const FieldTable& membersFt, uint64_t frameSeq_)
+ClusterMap::ClusterMap(const FieldTable& joinersFt, const FieldTable& membersFt, framing::SequenceNumber frameSeq_)
: frameSeq(frameSeq_)
{
std::for_each(joinersFt.begin(), joinersFt.end(), boost::bind(&addFieldTableValue, _1, boost::ref(joiners), boost::ref(alive)));
Modified: qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/cluster/ClusterMap.h
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/cluster/ClusterMap.h?rev=821749&r1=821748&r2=821749&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/cluster/ClusterMap.h (original)
+++ qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/cluster/ClusterMap.h Mon Oct 5 10:47:52 2009
@@ -25,6 +25,7 @@
#include "qpid/cluster/types.h"
#include "qpid/Url.h"
#include "qpid/framing/ClusterConnectionMembershipBody.h"
+#include "qpid/framing/SequenceNumber.h"
#include <boost/function.hpp>
#include <boost/optional.hpp>
@@ -53,7 +54,7 @@
ClusterMap();
ClusterMap(const MemberId& id, const Url& url, bool isReady);
- ClusterMap(const framing::FieldTable& joiners, const framing::FieldTable& members, uint64_t frameSeq);
+ ClusterMap(const framing::FieldTable& joiners, const framing::FieldTable& members, framing::SequenceNumber frameSeq);
/** Update from config change.
*@return true if member set changed.
@@ -92,8 +93,8 @@
*/
static Set intersection(const Set& a, const Set& b);
- uint64_t getFrameSeq() { return frameSeq; }
- uint64_t incrementFrameSeq() { return ++frameSeq; }
+ framing::SequenceNumber getFrameSeq() { return frameSeq; }
+ framing::SequenceNumber incrementFrameSeq() { return ++frameSeq; }
/** Clear out all knowledge of joiners & members, just keep alive set */
void clearStatus() { joiners.clear(); members.clear(); }
@@ -103,7 +104,7 @@
Map joiners, members;
Set alive;
- uint64_t frameSeq;
+ framing::SequenceNumber frameSeq;
friend std::ostream& operator<<(std::ostream&, const Map&);
friend std::ostream& operator<<(std::ostream&, const ClusterMap&);
Modified: qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/cluster/Connection.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/cluster/Connection.cpp?rev=821749&r1=821748&r2=821749&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/cluster/Connection.cpp (original)
+++ qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/cluster/Connection.cpp Mon Oct 5 10:47:52 2009
@@ -311,7 +311,7 @@
output.setSendMax(sendMax);
}
-void Connection::membership(const FieldTable& joiners, const FieldTable& members, uint64_t frameSeq) {
+void Connection::membership(const FieldTable& joiners, const FieldTable& members, const framing::SequenceNumber& frameSeq) {
QPID_LOG(debug, cluster << " incoming update complete on connection " << *this);
cluster.updateInDone(ClusterMap(joiners, members, frameSeq));
consumerNumbering.clear();
Modified: qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/cluster/Connection.h
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/cluster/Connection.h?rev=821749&r1=821748&r2=821749&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/cluster/Connection.h (original)
+++ qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/cluster/Connection.h Mon Oct 5 10:47:52 2009
@@ -122,7 +122,7 @@
void shadowReady(uint64_t memberId, uint64_t connectionId, const std::string& username, const std::string& fragment, uint32_t sendMax);
- void membership(const framing::FieldTable&, const framing::FieldTable&, uint64_t frameSeq);
+ void membership(const framing::FieldTable&, const framing::FieldTable&, const framing::SequenceNumber& frameSeq);
void retractOffer();
Modified: qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/cluster/ConnectionCodec.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/cluster/ConnectionCodec.cpp?rev=821749&r1=821748&r2=821749&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/cluster/ConnectionCodec.cpp (original)
+++ qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/cluster/ConnectionCodec.cpp Mon Oct 5 10:47:52 2009
@@ -38,24 +38,27 @@
sys::ConnectionCodec*
ConnectionCodec::Factory::create(ProtocolVersion v, sys::OutputControl& out, const std::string& id) {
if (v == ProtocolVersion(0, 10))
- return new ConnectionCodec(out, id, cluster, false, false);
- else if (v == ProtocolVersion(0x80 + 0, 0x80 + 10))
- return new ConnectionCodec(out, id, cluster, true, false); // Catch-up connection
+ return new ConnectionCodec(v, out, id, cluster, false, false);
+ else if (v == ProtocolVersion(0x80 + 0, 0x80 + 10)) // Catch-up connection
+ return new ConnectionCodec(v, out, id, cluster, true, false);
return 0;
}
// Used for outgoing Link connections
sys::ConnectionCodec*
ConnectionCodec::Factory::create(sys::OutputControl& out, const std::string& logId) {
- return new ConnectionCodec(out, logId, cluster, false, true);
+ return new ConnectionCodec(ProtocolVersion(0,10), out, logId, cluster, false, true);
}
-ConnectionCodec::ConnectionCodec(sys::OutputControl& out, const std::string& logId, Cluster& cluster, bool catchUp, bool isLink)
- : codec(out, logId, isLink),
- interceptor(new Connection(cluster, codec, logId, cluster.getId(), catchUp, isLink))
+ConnectionCodec::ConnectionCodec(
+ const ProtocolVersion& v, sys::OutputControl& out,
+ const std::string& logId, Cluster& cluster, bool catchUp, bool isLink
+) : codec(out, logId, isLink),
+ interceptor(new Connection(cluster, codec, logId, cluster.getId(), catchUp, isLink))
{
std::auto_ptr<sys::ConnectionInputHandler> ih(new ProxyInputHandler(interceptor));
codec.setInputHandler(ih);
+ codec.setVersion(v);
}
ConnectionCodec::~ConnectionCodec() {}
Modified: qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/cluster/ConnectionCodec.h
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/cluster/ConnectionCodec.h?rev=821749&r1=821748&r2=821749&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/cluster/ConnectionCodec.h (original)
+++ qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/cluster/ConnectionCodec.h Mon Oct 5 10:47:52 2009
@@ -56,7 +56,8 @@
sys::ConnectionCodec* create(sys::OutputControl&, const std::string& id);
};
- ConnectionCodec(sys::OutputControl& out, const std::string& logId, Cluster& c, bool catchUp, bool isLink);
+ ConnectionCodec(const framing::ProtocolVersion&, sys::OutputControl& out,
+ const std::string& logId, Cluster& c, bool catchUp, bool isLink);
~ConnectionCodec();
// ConnectionCodec functions.
Modified: qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/cluster/ErrorCheck.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/cluster/ErrorCheck.cpp?rev=821749&r1=821748&r2=821749&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/cluster/ErrorCheck.cpp (original)
+++ qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/cluster/ErrorCheck.cpp Mon Oct 5 10:47:52 2009
@@ -45,11 +45,11 @@
}
void ErrorCheck::error(
- Connection& c, ErrorType t, uint64_t seq, const MemberSet& ms, const std::string& msg)
+ Connection& c, ErrorType t, framing::SequenceNumber seq, const MemberSet& ms, const std::string& msg)
{
// Detected a local error, inform cluster and set error state.
assert(t != ERROR_TYPE_NONE); // Must be an error.
- assert(type == ERROR_TYPE_NONE); // Can only be called while processing
+ assert(type == ERROR_TYPE_NONE); // Can't be called when already in an error state.
type = t;
unresolved = ms;
frameSeq = seq;
@@ -59,7 +59,7 @@
<< " error " << frameSeq << " on " << c << ": " << msg
<< " must be resolved with: " << unresolved);
mcast.mcastControl(
- ClusterErrorCheckBody(ProtocolVersion(), type, frameSeq), c.getId().getMember());
+ ClusterErrorCheckBody(ProtocolVersion(), type, frameSeq), cluster.getId());
// If there are already frames queued up by a previous error, review
// them with respect to this new error.
for (FrameQueue::iterator i = frames.begin(); i != frames.end(); i = review(i))
@@ -74,41 +74,52 @@
// Review a frame in the queue with respect to the current error.
ErrorCheck::FrameQueue::iterator ErrorCheck::review(const FrameQueue::iterator& i) {
FrameQueue::iterator next = i+1;
- if (isUnresolved()) {
- const ClusterErrorCheckBody* errorCheck = 0;
- if (i->frame.getBody())
- errorCheck = dynamic_cast<const ClusterErrorCheckBody*>(
- i->frame.getMethod());
- if (errorCheck && errorCheck->getFrameSeq() == frameSeq) { // Same error
+ if(!isUnresolved() || !i->frame.getBody() || !i->frame.getMethod())
+ return next; // Only interested in control frames while unresolved.
+ const AMQMethodBody* method = i->frame.getMethod();
+ if (method->isA<const ClusterErrorCheckBody>()) {
+ const ClusterErrorCheckBody* errorCheck =
+ static_cast<const ClusterErrorCheckBody*>(method);
+
+ if (errorCheck->getFrameSeq() == frameSeq) { // Addresses current error
next = frames.erase(i); // Drop matching error check controls
if (errorCheck->getType() < type) { // my error is worse than his
QPID_LOG(critical, cluster << " error " << frameSeq
<< " did not occur on " << i->getMemberId());
- throw Exception("Aborted by failure that did not occur on all replicas");
+ throw Exception(QPID_MSG("Error " << frameSeq
+ << " did not occur on all members"));
}
else { // his error is worse/same as mine.
- QPID_LOG(notice, cluster << " error " << frameSeq
+ QPID_LOG(info, cluster << " error " << frameSeq
<< " resolved with " << i->getMemberId());
unresolved.erase(i->getMemberId());
checkResolved();
}
}
- else {
- const ClusterConfigChangeBody* configChange = 0;
- if (i->frame.getBody())
- configChange = dynamic_cast<const ClusterConfigChangeBody*>(
- i->frame.getMethod());
- if (configChange) {
- MemberSet members(ClusterMap::decode(configChange->getCurrent()));
- QPID_LOG(debug, cluster << " apply config change to unresolved: "
- << members);
- MemberSet intersect;
- set_intersection(members.begin(), members.end(),
- unresolved.begin(), unresolved.end(),
- inserter(intersect, intersect.begin()));
- unresolved.swap(intersect);
- checkResolved();
- }
+ else if (errorCheck->getFrameSeq() < frameSeq && errorCheck->getType() != NONE
+ && i->connectionId.getMember() != cluster.getId())
+ {
+ // This error occured before the current error so we
+ // have processed past it.
+ next = frames.erase(i); // Drop the error check control
+ respondNone(i->connectionId.getMember(), errorCheck->getType(),
+ errorCheck->getFrameSeq());
+ }
+ // if errorCheck->getFrameSeq() > frameSeq then leave it in the queue.
+ }
+ else if (method->isA<const ClusterConfigChangeBody>()) {
+ const ClusterConfigChangeBody* configChange =
+ static_cast<const ClusterConfigChangeBody*>(method);
+ if (configChange) {
+ MemberSet members(ClusterMap::decode(configChange->getCurrent()));
+ QPID_LOG(debug, cluster << " apply config change to error "
+ << frameSeq << ": " << members);
+ MemberSet intersect;
+ set_intersection(members.begin(), members.end(),
+ unresolved.begin(), unresolved.end(),
+ inserter(intersect, intersect.begin()));
+ unresolved.swap(intersect);
+ checkResolved();
}
}
return next;
@@ -117,10 +128,10 @@
void ErrorCheck::checkResolved() {
if (unresolved.empty()) { // No more potentially conflicted members, we're clear.
type = ERROR_TYPE_NONE;
- QPID_LOG(notice, cluster << " error " << frameSeq << " resolved.");
+ QPID_LOG(info, cluster << " error " << frameSeq << " resolved.");
}
else
- QPID_LOG(notice, cluster << " error " << frameSeq
+ QPID_LOG(info, cluster << " error " << frameSeq
<< " must be resolved with " << unresolved);
}
@@ -131,4 +142,15 @@
return e;
}
+void ErrorCheck::respondNone(const MemberId& from, uint8_t type, framing::SequenceNumber frameSeq) {
+ // Don't respond to non-errors or to my own errors.
+ if (type == ERROR_TYPE_NONE || from == cluster.getId())
+ return;
+ QPID_LOG(info, cluster << " error " << frameSeq << " did not occur locally.");
+ mcast.mcastControl(
+ ClusterErrorCheckBody(ProtocolVersion(), ERROR_TYPE_NONE, frameSeq),
+ cluster.getId()
+ );
+}
+
}} // namespace qpid::cluster
Modified: qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/cluster/ErrorCheck.h
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/cluster/ErrorCheck.h?rev=821749&r1=821748&r2=821749&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/cluster/ErrorCheck.h (original)
+++ qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/cluster/ErrorCheck.h Mon Oct 5 10:47:52 2009
@@ -25,6 +25,7 @@
#include "qpid/cluster/types.h"
#include "qpid/cluster/Multicaster.h"
#include "qpid/framing/enum.h"
+#include "qpid/framing/SequenceNumber.h"
#include <boost/function.hpp>
#include <deque>
#include <set>
@@ -49,11 +50,12 @@
public:
typedef std::set<MemberId> MemberSet;
typedef framing::cluster::ErrorType ErrorType;
+ typedef framing::SequenceNumber SequenceNumber;
ErrorCheck(Cluster&);
/** A local error has occured */
- void error(Connection&, ErrorType, uint64_t frameSeq, const MemberSet&,
+ void error(Connection&, ErrorType, SequenceNumber frameSeq, const MemberSet&,
const std::string& msg);
/** Called when a frame is delivered */
@@ -66,7 +68,8 @@
bool isUnresolved() const { return type != NONE; }
-
+ /** Respond to an error check saying we had no error. */
+ void respondNone(const MemberId&, uint8_t type, SequenceNumber frameSeq);
private:
static const ErrorType NONE = framing::cluster::ERROR_TYPE_NONE;
@@ -78,7 +81,7 @@
Multicaster& mcast;
FrameQueue frames;
MemberSet unresolved;
- uint64_t frameSeq;
+ SequenceNumber frameSeq;
ErrorType type;
Connection* connection;
};
Modified: qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/cluster/ExpiryPolicy.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/cluster/ExpiryPolicy.cpp?rev=821749&r1=821748&r2=821749&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/cluster/ExpiryPolicy.cpp (original)
+++ qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/cluster/ExpiryPolicy.cpp Mon Oct 5 10:47:52 2009
@@ -19,21 +19,21 @@
*
*/
+#include "qpid/broker/Message.h"
#include "qpid/cluster/ExpiryPolicy.h"
#include "qpid/cluster/Multicaster.h"
#include "qpid/framing/ClusterMessageExpiredBody.h"
#include "qpid/sys/Time.h"
-#include "qpid/broker/Message.h"
-#include "qpid/broker/Timer.h"
+#include "qpid/sys/Timer.h"
#include "qpid/log/Statement.h"
namespace qpid {
namespace cluster {
-ExpiryPolicy::ExpiryPolicy(Multicaster& m, const MemberId& id, broker::Timer& t)
+ExpiryPolicy::ExpiryPolicy(Multicaster& m, const MemberId& id, sys::Timer& t)
: expiryId(0), expiredPolicy(new Expired), mcast(m), memberId(id), timer(t) {}
-struct ExpiryTask : public broker::TimerTask {
+struct ExpiryTask : public sys::TimerTask {
ExpiryTask(const boost::intrusive_ptr<ExpiryPolicy>& policy, uint64_t id, sys::AbsTime when)
: TimerTask(when), expiryPolicy(policy), expiryId(id) {}
void fire() { expiryPolicy->sendExpire(expiryId); }
Modified: qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/cluster/ExpiryPolicy.h
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/cluster/ExpiryPolicy.h?rev=821749&r1=821748&r2=821749&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/cluster/ExpiryPolicy.h (original)
+++ qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/cluster/ExpiryPolicy.h Mon Oct 5 10:47:52 2009
@@ -33,10 +33,13 @@
namespace qpid {
namespace broker {
-class Timer;
class Message;
}
+namespace sys {
+class Timer;
+}
+
namespace cluster {
class Multicaster;
@@ -46,7 +49,7 @@
class ExpiryPolicy : public broker::ExpiryPolicy
{
public:
- ExpiryPolicy(Multicaster&, const MemberId&, broker::Timer&);
+ ExpiryPolicy(Multicaster&, const MemberId&, sys::Timer&);
void willExpire(broker::Message&);
bool hasExpired(broker::Message&);
@@ -78,7 +81,7 @@
boost::intrusive_ptr<Expired> expiredPolicy;
Multicaster& mcast;
MemberId memberId;
- broker::Timer& timer;
+ sys::Timer& timer;
};
}} // namespace qpid::cluster
Modified: qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/framing/AMQFrame.h
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/framing/AMQFrame.h?rev=821749&r1=821748&r2=821749&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/framing/AMQFrame.h (original)
+++ qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/framing/AMQFrame.h Mon Oct 5 10:47:52 2009
@@ -46,8 +46,8 @@
QPID_COMMON_EXTERN AMQBody* getBody();
QPID_COMMON_EXTERN const AMQBody* getBody() const;
- AMQMethodBody* getMethod() { return getBody()->getMethod(); }
- const AMQMethodBody* getMethod() const { return getBody()->getMethod(); }
+ AMQMethodBody* getMethod() { return getBody() ? getBody()->getMethod() : 0; }
+ const AMQMethodBody* getMethod() const { return getBody() ? getBody()->getMethod() : 0; }
void setMethod(ClassId c, MethodId m);
Modified: qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/management/ManagementAgent.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/management/ManagementAgent.cpp?rev=821749&r1=821748&r2=821749&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/management/ManagementAgent.cpp (original)
+++ qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/management/ManagementAgent.cpp Mon Oct 5 10:47:52 2009
@@ -62,7 +62,6 @@
ManagementAgent::~ManagementAgent ()
{
- timer.stop();
{
Mutex::ScopedLock lock (userLock);
@@ -90,9 +89,10 @@
dataDir = _dataDir;
interval = _interval;
broker = _broker;
+ timer = &_broker->getTimer();
threadPoolSize = _threads;
ManagementObject::maxThreads = threadPoolSize;
- timer.add (intrusive_ptr<TimerTask> (new Periodic(*this, interval)));
+ timer->add (new Periodic(*this, interval));
// Get from file or generate and save to file.
if (dataDir.empty())
@@ -219,7 +219,7 @@
void ManagementAgent::Periodic::fire ()
{
- agent.timer.add (intrusive_ptr<TimerTask> (new Periodic (agent, agent.interval)));
+ agent.timer->add (new Periodic (agent, agent.interval));
agent.periodicProcessing ();
}
Modified: qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/management/ManagementAgent.h
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/management/ManagementAgent.h?rev=821749&r1=821748&r2=821749&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/management/ManagementAgent.h (original)
+++ qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/management/ManagementAgent.h Mon Oct 5 10:47:52 2009
@@ -24,9 +24,9 @@
#include "qpid/broker/BrokerImportExport.h"
#include "qpid/Options.h"
#include "qpid/broker/Exchange.h"
-#include "qpid/broker/Timer.h"
#include "qpid/framing/Uuid.h"
#include "qpid/sys/Mutex.h"
+#include "qpid/sys/Timer.h"
#include "qpid/broker/ConnectionToken.h"
#include "qpid/management/ManagementObject.h"
#include "qpid/management/ManagementEvent.h"
@@ -98,7 +98,7 @@
void disallow(const std::string& className, const std::string& methodName, const std::string& message);
private:
- struct Periodic : public qpid::broker::TimerTask
+ struct Periodic : public qpid::sys::TimerTask
{
ManagementAgent& agent;
@@ -183,12 +183,12 @@
framing::Uuid uuid;
sys::Mutex addLock;
sys::Mutex userLock;
- qpid::broker::Timer timer;
qpid::broker::Exchange::shared_ptr mExchange;
qpid::broker::Exchange::shared_ptr dExchange;
std::string dataDir;
uint16_t interval;
qpid::broker::Broker* broker;
+ qpid::sys::Timer* timer;
uint16_t bootSequence;
uint32_t nextObjectId;
uint32_t brokerBank;
Modified: qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/replication/ReplicatingEventListener.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/replication/ReplicatingEventListener.cpp?rev=821749&r1=821748&r2=821749&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/replication/ReplicatingEventListener.cpp (original)
+++ qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/replication/ReplicatingEventListener.cpp Mon Oct 5 10:47:52 2009
@@ -72,6 +72,7 @@
FieldTable& headers = msg->getProperties<MessageProperties>()->getApplicationHeaders();
headers.setString(REPLICATION_TARGET_QUEUE, enqueued.queue->getName());
headers.setInt(REPLICATION_EVENT_TYPE, ENQUEUE);
+ headers.setInt(QUEUE_MESSAGE_POSITION,enqueued.position);
route(msg);
}
Modified: qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/replication/ReplicationExchange.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/replication/ReplicationExchange.cpp?rev=821749&r1=821748&r2=821749&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/replication/ReplicationExchange.cpp (original)
+++ qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/replication/ReplicationExchange.cpp Mon Oct 5 10:47:52 2009
@@ -83,15 +83,29 @@
std::string queueName = args->getAsString(REPLICATION_TARGET_QUEUE);
Queue::shared_ptr queue = queues.find(queueName);
if (queue) {
- FieldTable& headers = msg.getMessage().getProperties<MessageProperties>()->getApplicationHeaders();
- headers.erase(REPLICATION_TARGET_QUEUE);
- headers.erase(REPLICATION_EVENT_SEQNO);
- headers.erase(REPLICATION_EVENT_TYPE);
- msg.deliverTo(queue);
- QPID_LOG(debug, "Enqueued replicated message onto " << queueName);
- if (mgmtExchange != 0) {
- mgmtExchange->inc_msgRoutes();
- mgmtExchange->inc_byteRoutes( msg.contentSize());
+
+ SequenceNumber seqno1(args->getAsInt(QUEUE_MESSAGE_POSITION));
+
+ // note that queue will ++ before enqueue.
+ if (queue->getPosition() > --seqno1) // test queue.pos < seqnumber
+ {
+ QPID_LOG(error, "Cannot enqueue replicated message. Destination Queue " << queueName << " ahead of source queue");
+ mgmtExchange->inc_msgDrops();
+ mgmtExchange->inc_byteDrops(msg.contentSize());
+ } else {
+ queue->setPosition(seqno1);
+
+ FieldTable& headers = msg.getMessage().getProperties<MessageProperties>()->getApplicationHeaders();
+ headers.erase(REPLICATION_TARGET_QUEUE);
+ headers.erase(REPLICATION_EVENT_SEQNO);
+ headers.erase(REPLICATION_EVENT_TYPE);
+ headers.erase(QUEUE_MESSAGE_POSITION);
+ msg.deliverTo(queue);
+ QPID_LOG(debug, "Enqueued replicated message onto " << queueName);
+ if (mgmtExchange != 0) {
+ mgmtExchange->inc_msgRoutes();
+ mgmtExchange->inc_byteRoutes( msg.contentSize());
+ }
}
} else {
QPID_LOG(error, "Cannot enqueue replicated message. Queue " << queueName << " does not exist");
Modified: qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/replication/constants.h
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/replication/constants.h?rev=821749&r1=821748&r2=821749&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/replication/constants.h (original)
+++ qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/replication/constants.h Mon Oct 5 10:47:52 2009
@@ -26,6 +26,7 @@
const std::string REPLICATION_EVENT_SEQNO("qpid.replication.seqno");
const std::string REPLICATION_TARGET_QUEUE("qpid.replication.target_queue");
const std::string DEQUEUED_MESSAGE_POSITION("qpid.replication.message");
+const std::string QUEUE_MESSAGE_POSITION("qpid.replication.queue.position");
const int ENQUEUE(1);
const int DEQUEUE(2);
Modified: qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/sys/Timer.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/sys/Timer.cpp?rev=821749&r1=821748&r2=821749&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/sys/Timer.cpp (original)
+++ qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/sys/Timer.cpp Mon Oct 5 10:47:52 2009
@@ -20,7 +20,8 @@
*/
#include "qpid/sys/Timer.h"
#include "qpid/sys/Mutex.h"
-#include <iostream>
+#include "qpid/log/Statement.h"
+
#include <numeric>
using boost::intrusive_ptr;
@@ -58,6 +59,8 @@
if (period && readyToFire()) {
nextFireTime = AbsTime(nextFireTime, period);
cancelled = false;
+ } else {
+ QPID_LOG(error, "Couldn't setup next timer firing: " << Duration(nextFireTime, AbsTime::now()) << "[" << period << "]");
}
}
@@ -91,24 +94,39 @@
} else {
intrusive_ptr<TimerTask> t = tasks.top();
tasks.pop();
+ assert(!(t->nextFireTime < t->sortTime));
+
+ // warn on extreme lateness
+ AbsTime start(AbsTime::now());
+ Duration late(t->sortTime, start);
+ if (late > 500 * TIME_MSEC) {
+ QPID_LOG(warning, "Timer delayed by " << late / TIME_MSEC << "ms");
+ }
{
ScopedLock<Mutex> l(t->callbackLock);
if (t->cancelled) {
continue;
- } else if(t->readyToFire()) {
+ } else if(Duration(t->nextFireTime, start) >= 0) {
Monitor::ScopedUnlock u(monitor);
t->fireTask();
+ // Warn on callback overrun
+ AbsTime end(AbsTime::now());
+ Duration overrun(tasks.top()->nextFireTime, end);
+ if (overrun > 1 * TIME_MSEC) {
+ QPID_LOG(warning,
+ "Timer callback overran by " << overrun / TIME_MSEC << "ms [taking "
+ << Duration(start, end) << "]");
+ }
continue;
} else {
// If the timer was adjusted into the future it might no longer
// be the next event, so push and then get top to make sure
// You can only push events into the future
- assert(!(t->nextFireTime < t->sortTime));
t->sortTime = t->nextFireTime;
tasks.push(t);
}
}
- monitor.wait(tasks.top()->nextFireTime);
+ monitor.wait(tasks.top()->sortTime);
}
}
}
Modified: qpid/branches/java-broker-0-10/qpid/cpp/src/tests/Makefile.am
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/cpp/src/tests/Makefile.am?rev=821749&r1=821748&r2=821749&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/cpp/src/tests/Makefile.am (original)
+++ qpid/branches/java-broker-0-10/qpid/cpp/src/tests/Makefile.am Mon Oct 5 10:47:52 2009
@@ -65,6 +65,7 @@
$(lib_client) $(lib_broker) $(lib_console)
unit_test_SOURCES= unit_test.cpp unit_test.h \
+ ClientSessionTest.cpp \
BrokerFixture.h SocketProxy.h \
exception_test.cpp \
RefCounted.cpp \
@@ -75,7 +76,6 @@
QueueOptionsTest.cpp \
InlineAllocator.cpp \
InlineVector.cpp \
- ClientSessionTest.cpp \
SequenceSet.cpp \
StringUtils.cpp \
IncompleteMessageList.cpp \
Modified: qpid/branches/java-broker-0-10/qpid/cpp/src/tests/PartialFailure.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/cpp/src/tests/PartialFailure.cpp?rev=821749&r1=821748&r2=821749&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/cpp/src/tests/PartialFailure.cpp (original)
+++ qpid/branches/java-broker-0-10/qpid/cpp/src/tests/PartialFailure.cpp Mon Oct 5 10:47:52 2009
@@ -82,10 +82,31 @@
c.subs.subscribe(c.lq, c.name);
}
+// Handle near-simultaneous errors
+QPID_AUTO_TEST_CASE(testCoincidentErrors) {
+ ClusterFixture cluster(2, updateArgs, -1);
+ Client c0(cluster[0], "c0");
+ Client c1(cluster[1], "c1");
+
+ c0.session.queueDeclare("q", durable=true);
+ {
+ ScopedSuppressLogging allQuiet;
+ async(c0.session).messageTransfer(content=pMessage("TEST_STORE_DO: s0[exception]", "q"));
+ async(c1.session).messageTransfer(content=pMessage("TEST_STORE_DO: s1[exception]", "q"));
+
+ int alive=0;
+ try { Client c00(cluster[0], "c00"); ++alive; } catch (...) {}
+ try { Client c11(cluster[1], "c11"); ++alive; } catch (...) {}
+
+ BOOST_CHECK_EQUAL(alive, 1);
+ }
+}
+
+#if 0 // FIXME aconway 2009-07-30:
// Verify normal cluster-wide errors.
QPID_AUTO_TEST_CASE(testNormalErrors) {
// FIXME aconway 2009-04-10: Would like to put a scope just around
- // the statements expected to fail (in BOOST_CHECK_THROW) but that
+ // the statements expected to fail (in BOOST_CHECK_yTHROW) but that
// sproadically lets out messages, possibly because they're in
// Connection thread.
@@ -96,7 +117,7 @@
{
ScopedSuppressLogging allQuiet;
- queueAndSub(c0);
+ queueAndsub(c0);
c0.session.messageTransfer(content=Message("x", "c0"));
BOOST_CHECK_EQUAL(c0.lq.get(TIMEOUT).getData(), "x");
@@ -234,5 +255,5 @@
}
}
#endif
-
+#endif // FIXME aconway 2009-07-30:
QPID_AUTO_TEST_SUITE_END()
Modified: qpid/branches/java-broker-0-10/qpid/cpp/src/tests/QueueTest.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/cpp/src/tests/QueueTest.cpp?rev=821749&r1=821748&r2=821749&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/cpp/src/tests/QueueTest.cpp (original)
+++ qpid/branches/java-broker-0-10/qpid/cpp/src/tests/QueueTest.cpp Mon Oct 5 10:47:52 2009
@@ -19,6 +19,7 @@
*
*/
#include "unit_test.h"
+#include "test_tools.h"
#include "qpid/Exception.h"
#include "qpid/broker/Broker.h"
#include "qpid/broker/Queue.h"
@@ -275,11 +276,13 @@
uint enqCnt;
uint deqCnt;
+ bool error;
virtual void dequeue(TransactionContext*,
const boost::intrusive_ptr<PersistableMessage>& /*msg*/,
const PersistableQueue& /*queue*/)
{
+ if (error) throw Exception("Dequeue error test");
deqCnt++;
}
@@ -287,10 +290,16 @@
const boost::intrusive_ptr<PersistableMessage>& /*msg*/,
const PersistableQueue& /* queue */)
{
+ if (error) throw Exception("Enqueue error test");
enqCnt++;
}
- TestMessageStoreOC() : NullMessageStore(),enqCnt(0),deqCnt(0) {}
+ void createError()
+ {
+ error=true;
+ }
+
+ TestMessageStoreOC() : NullMessageStore(),enqCnt(0),deqCnt(0),error(false) {}
~TestMessageStoreOC(){}
};
@@ -689,6 +698,30 @@
}
-QPID_AUTO_TEST_SUITE_END()
+QPID_AUTO_TEST_CASE(testLastNodeJournalError){
+/*
+simulate store excption going into last node standing
+
+*/
+ TestMessageStoreOC testStore;
+ client::QueueOptions args;
+ // set queue mode
+ args.setPersistLastNode();
+
+ Queue::shared_ptr queue1(new Queue("my-queue", true, &testStore));
+ intrusive_ptr<Message> received;
+ queue1->configure(args);
+
+ // check requeue 1
+ intrusive_ptr<Message> msg1 = create_message("e", "C");
+
+ queue1->deliver(msg1);
+ testStore.createError();
+
+ ScopedSuppressLogging sl; // Suppress messages for expected errors.
+ queue1->setLastNodeFailure();
+ BOOST_CHECK_EQUAL(testStore.enqCnt, 0u);
+
+}QPID_AUTO_TEST_SUITE_END()
Modified: qpid/branches/java-broker-0-10/qpid/cpp/src/tests/replication_test
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/cpp/src/tests/replication_test?rev=821749&r1=821748&r2=821749&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/cpp/src/tests/replication_test (original)
+++ qpid/branches/java-broker-0-10/qpid/cpp/src/tests/replication_test Mon Oct 5 10:47:52 2009
@@ -56,10 +56,12 @@
$PYTHON_DIR/commands/qpid-config -a "localhost:$BROKER_A" add queue queue-b --generate-queue-events 2
$PYTHON_DIR/commands/qpid-config -a "localhost:$BROKER_A" add queue queue-c --generate-queue-events 1
$PYTHON_DIR/commands/qpid-config -a "localhost:$BROKER_A" add queue queue-d --generate-queue-events 2
+ $PYTHON_DIR/commands/qpid-config -a "localhost:$BROKER_A" add queue queue-e --generate-queue-events 1
$PYTHON_DIR/commands/qpid-config -a "localhost:$BROKER_B" add queue queue-a
$PYTHON_DIR/commands/qpid-config -a "localhost:$BROKER_B" add queue queue-b
$PYTHON_DIR/commands/qpid-config -a "localhost:$BROKER_B" add queue queue-c
+ $PYTHON_DIR/commands/qpid-config -a "localhost:$BROKER_B" add queue queue-e
#queue-d deliberately not declared on DR; this error case should be handled
#publish and consume from test queues on broker A:
@@ -89,6 +91,12 @@
./receiver --port $BROKER_A --queue queue-c --messages 10 > /dev/null
./receiver --port $BROKER_A --queue queue-d > /dev/null
+
+ # What we are doing is putting a message on the end of repliaction queue & waiting for it on remote side
+ # making sure all the messages have been flushed from the replication queue.
+ echo dummy | ./sender --port $BROKER_A --routing-key queue-e --send-eos 1
+ ./receiver --port $BROKER_B --queue queue-e --messages 1 > /dev/null
+
#shutdown broker A then check that broker Bs versions of the queues are as expected
../qpidd -q --port $BROKER_A
unset BROKER_A
@@ -98,7 +106,6 @@
./receiver --port $BROKER_B --queue queue-b > queue-b-backup.repl
./receiver --port $BROKER_B --queue queue-c > queue-c-backup.repl
- stop_brokers
tail -5 queue-a-input.repl > queue-a-expected.repl
tail -10 queue-b-input.repl > queue-b-expected.repl
@@ -108,6 +115,60 @@
grep 'queue-d does not exist' replication-dest.log > /dev/null || echo "WARNING: Expected error to be logged!"
+ stop_brokers
+
+ # now check offsets working (enqueue based on position being set, not queue abs position)
+
+ ../qpidd --daemon --port 0 --no-data-dir --no-module-dir --auth no --load-module ../.libs/replicating_listener.so --replication-queue replication --create-replication-queue true --log-enable info+ --log-to-file replication-source.log --log-to-stderr 0 > qpidd.port
+ BROKER_A=`cat qpidd.port`
+
+ ../qpidd --daemon --port 0 --no-data-dir --no-module-dir --auth no --load-module ../.libs/replication_exchange.so --log-enable info+ --log-to-file replication-dest.log --log-to-stderr 0 > qpidd.port
+ BROKER_B=`cat qpidd.port`
+
+ $PYTHON_DIR/commands/qpid-config -a "localhost:$BROKER_B" add exchange replication replication
+ $PYTHON_DIR/commands/qpid-route --ack 5 queue add "localhost:$BROKER_B" "localhost:$BROKER_A" replication replication
+
+ $PYTHON_DIR/commands/qpid-config -a "localhost:$BROKER_A" add queue queue-e --generate-queue-events 2
+ $PYTHON_DIR/commands/qpid-config -a "localhost:$BROKER_B" add queue queue-e
+ $PYTHON_DIR/commands/qpid-config -a "localhost:$BROKER_A" add queue queue-d --generate-queue-events 1
+ $PYTHON_DIR/commands/qpid-config -a "localhost:$BROKER_B" add queue queue-d
+
+ i=1
+ while [ $i -le 10 ]; do
+ echo Message $i for A >> queue-e-input.repl
+ i=`expr $i + 1`
+ done
+
+ ./sender --port $BROKER_A --routing-key queue-e --send-eos 1 < queue-e-input.repl
+ ./receiver --port $BROKER_A --queue queue-e --messages 10 > /dev/null
+
+ # What we are doing is putting a message on the end of repliaction queue & waiting for it on remote side
+ # making sure all the messages have been flushed from the replication queue.
+ echo dummy | ./sender --port $BROKER_A --routing-key queue-d --send-eos 1
+ ./receiver --port $BROKER_B --queue queue-d --messages 1 > /dev/null
+
+ # now check offsets working
+ ../qpidd -q --port $BROKER_B
+ unset BROKER_B
+ ../qpidd --daemon --port 0 --no-data-dir --no-module-dir --auth no --load-module ../.libs/replication_exchange.so --log-enable info+ --log-to-file replication-dest.log --log-to-stderr 0 > qpidd.port
+ BROKER_B=`cat qpidd.port`
+
+ $PYTHON_DIR/commands/qpid-config -a "localhost:$BROKER_B" add queue queue-e
+ $PYTHON_DIR/commands/qpid-config -a "localhost:$BROKER_B" add exchange replication replication
+ $PYTHON_DIR/commands/qpid-route --ack 5 queue add "localhost:$BROKER_B" "localhost:$BROKER_A" replication replication
+ # now send another 15
+ i=11
+ while [ $i -le 15 ]; do
+ echo Message $i for A >> queue-e1-input.repl
+ i=`expr $i + 1`
+ done
+ ./sender --port $BROKER_A --routing-key queue-e --send-eos 1 < queue-e1-input.repl
+
+ ./receiver --port $BROKER_B --queue queue-e > queue-e-backup.repl
+ diff queue-e-backup.repl queue-e1-input.repl || FAIL=1
+
+ stop_brokers
+
if [ x$FAIL != x ]; then
echo replication test failed: expectations not met!
exit 1
Modified: qpid/branches/java-broker-0-10/qpid/cpp/xml/cluster.xml
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/cpp/xml/cluster.xml?rev=821749&r1=821748&r2=821749&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/cpp/xml/cluster.xml (original)
+++ qpid/branches/java-broker-0-10/qpid/cpp/xml/cluster.xml Mon Oct 5 10:47:52 2009
@@ -68,7 +68,7 @@
<!-- Check for error consistency across the cluster -->
<control name="error-check" code="0x14">
<field name="type" type="error-type"/>
- <field name="frame-seq" type="uint64"/>
+ <field name="frame-seq" type="sequence-no"/>
</control>
@@ -170,7 +170,7 @@
<control name="membership" code="0x21" label="Cluster membership details.">
<field name="joiners" type="map"/> <!-- member-id -> URL -->
<field name="members" type="map"/> <!-- member-id -> state -->
- <field name="frame-seq" type="uint64"/> <!-- frame sequence number -->
+ <field name="frame-seq" type="sequence-no"/> <!-- frame sequence number -->
</control>
<!-- Updater cannot fulfill an update offer. -->
Modified: qpid/branches/java-broker-0-10/qpid/gentools/templ.java/model/ProtocolVersionListClass.vm
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/gentools/templ.java/model/ProtocolVersionListClass.vm?rev=821749&r1=821748&r2=821749&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/gentools/templ.java/model/ProtocolVersionListClass.vm (original)
+++ qpid/branches/java-broker-0-10/qpid/gentools/templ.java/model/ProtocolVersionListClass.vm Mon Oct 5 10:47:52 2009
@@ -39,12 +39,14 @@
{
private final byte _majorVersion;
private final byte _minorVersion;
+ private final String _stringFormat;
public ProtocolVersion(byte majorVersion, byte minorVersion)
{
_majorVersion = majorVersion;
_minorVersion = minorVersion;
+ _stringFormat = _majorVersion+"-"+_minorVersion;
}
public byte getMajorVersion()
@@ -57,6 +59,10 @@
return _minorVersion;
}
+ public String toString()
+ {
+ return _stringFormat;
+ }
public int compareTo(Object o)
{
Propchange: qpid/branches/java-broker-0-10/qpid/java/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Mon Oct 5 10:47:52 2009
@@ -1,2 +1,2 @@
/qpid/trunk/qpid:796646-796653
-/qpid/trunk/qpid/java:796196-799240
+/qpid/trunk/qpid/java:796196-800440
---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project: http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org