You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rg...@apache.org on 2009/10/05 12:47:56 UTC

svn commit: r821749 [1/3] - in /qpid/branches/java-broker-0-10/qpid: ./ cpp/src/qpid/amqp_0_10/ cpp/src/qpid/broker/ cpp/src/qpid/client/ cpp/src/qpid/cluster/ cpp/src/qpid/framing/ cpp/src/qpid/management/ cpp/src/qpid/replication/ cpp/src/qpid/sys/ c...

Author: rgodfrey
Date: Mon Oct  5 10:47:52 2009
New Revision: 821749

URL: http://svn.apache.org/viewvc?rev=821749&view=rev
Log:
Merged from trunk up to r800440

Added:
    qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/subjects/MessagesStoreLogSubject.java
      - copied unchanged from r800440, qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/subjects/MessagesStoreLogSubject.java
    qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/logging/subjects/MessageStoreLogSubjectTest.java
      - copied unchanged from r800440, qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/logging/subjects/MessageStoreLogSubjectTest.java
    qpid/branches/java-broker-0-10/qpid/java/systests/src/main/java/org/apache/qpid/server/logging/
      - copied from r800440, qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/logging/
    qpid/branches/java-broker-0-10/qpid/java/systests/src/main/java/org/apache/qpid/server/logging/AbstractTestLogging.java
      - copied unchanged from r800440, qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/logging/AbstractTestLogging.java
    qpid/branches/java-broker-0-10/qpid/java/systests/src/main/java/org/apache/qpid/server/logging/AlertingTest.java
      - copied unchanged from r800440, qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/logging/AlertingTest.java
    qpid/branches/java-broker-0-10/qpid/java/systests/src/main/java/org/apache/qpid/server/logging/ChannelLoggingTest.java
      - copied unchanged from r800440, qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/logging/ChannelLoggingTest.java
    qpid/branches/java-broker-0-10/qpid/java/systests/src/main/java/org/apache/qpid/server/logging/ConnectionLoggingTest.java
      - copied unchanged from r800440, qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/logging/ConnectionLoggingTest.java
    qpid/branches/java-broker-0-10/qpid/java/systests/src/main/java/org/apache/qpid/util/LogMonitor.java
      - copied unchanged from r800440, qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/util/LogMonitor.java
    qpid/branches/java-broker-0-10/qpid/java/systests/src/main/java/org/apache/qpid/util/LogMonitorTest.java
      - copied unchanged from r800440, qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/util/LogMonitorTest.java
Removed:
    qpid/branches/java-broker-0-10/qpid/cpp/src/tests/allSegmentTypes.h
    qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockProtocolSession.java
    qpid/branches/java-broker-0-10/qpid/java/systests/src/main/java/org/apache/qpid/server/AlertingTest.java
Modified:
    qpid/branches/java-broker-0-10/qpid/   (props changed)
    qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/amqp_0_10/Connection.cpp
    qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/amqp_0_10/Connection.h
    qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/broker/Broker.h
    qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/broker/Connection.cpp
    qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/broker/Connection.h
    qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/broker/DtxManager.cpp
    qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/broker/DtxManager.h
    qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/broker/DtxTimeout.h
    qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/broker/LinkRegistry.cpp
    qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/broker/LinkRegistry.h
    qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/broker/Queue.cpp
    qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/broker/Queue.h
    qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/broker/QueueCleaner.cpp
    qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/broker/QueueCleaner.h
    qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/broker/SessionState.cpp
    qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/broker/SessionState.h
    qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/client/Connector.cpp
    qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/cluster/Cluster.cpp
    qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/cluster/Cluster.h
    qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/cluster/ClusterMap.cpp
    qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/cluster/ClusterMap.h
    qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/cluster/Connection.cpp
    qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/cluster/Connection.h
    qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/cluster/ConnectionCodec.cpp
    qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/cluster/ConnectionCodec.h
    qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/cluster/ErrorCheck.cpp
    qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/cluster/ErrorCheck.h
    qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/cluster/ExpiryPolicy.cpp
    qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/cluster/ExpiryPolicy.h
    qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/framing/AMQFrame.h
    qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/management/ManagementAgent.cpp
    qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/management/ManagementAgent.h
    qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/replication/ReplicatingEventListener.cpp
    qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/replication/ReplicationExchange.cpp
    qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/replication/constants.h
    qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/sys/Timer.cpp
    qpid/branches/java-broker-0-10/qpid/cpp/src/tests/Makefile.am
    qpid/branches/java-broker-0-10/qpid/cpp/src/tests/PartialFailure.cpp
    qpid/branches/java-broker-0-10/qpid/cpp/src/tests/QueueTest.cpp
    qpid/branches/java-broker-0-10/qpid/cpp/src/tests/replication_test
    qpid/branches/java-broker-0-10/qpid/cpp/xml/cluster.xml
    qpid/branches/java-broker-0-10/qpid/gentools/templ.java/model/ProtocolVersionListClass.vm
    qpid/branches/java-broker-0-10/qpid/java/   (props changed)
    qpid/branches/java-broker-0-10/qpid/java/broker/   (props changed)
    qpid/branches/java-broker-0-10/qpid/java/broker/bin/   (props changed)
    qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
    qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/ServerConfiguration.java
    qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/LogActor.java
    qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/RootMessageLogger.java
    qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/RootMessageLoggerImpl.java
    qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/actors/AbstractActor.java
    qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/messages/LogMessages_en_US.properties
    qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/rawloggers/Log4jMessageLogger.java
    qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/management/   (props changed)
    qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java
    qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java
    qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBean.java
    qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/   (props changed)
    qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java
    qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java
    qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/DestWildExchangeTest.java
    qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/HeadersExchangeTest.java
    qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/logging/actors/AMQPChannelActorTest.java
    qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/logging/actors/AMQPConnectionActorTest.java
    qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/logging/actors/CurrentActorTest.java
    qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/logging/messages/ChannelMessagesTest.java
    qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/logging/messages/ConnectionMessagesTest.java
    qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/logging/rawloggers/Log4jMessageLoggerTest.java
    qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/logging/subjects/AbstractTestLogSubject.java
    qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/logging/subjects/ChannelLogSubjectTest.java
    qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/logging/subjects/ConnectionLogSubjectTest.java
    qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/logging/subjects/SubscriptionLogSubjectTest.java
    qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBeanTest.java
    qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/InternalTestProtocolSession.java
    qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/MaxChannelsTest.java
    qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueAlertTest.java
    qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java
    qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AckTest.java
    qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/security/access/ACLManagerTest.java
    qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/store/MessageStoreTest.java
    qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/util/InternalBrokerBaseCase.java
    qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/util/TestApplicationRegistry.java
    qpid/branches/java-broker-0-10/qpid/java/broker/src/velocity/templates/org/apache/qpid/server/logging/messages/LogMessages.vm
    qpid/branches/java-broker-0-10/qpid/java/common/templates/model/ProtocolVersionListClass.vm
    qpid/branches/java-broker-0-10/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/ConfigurationManagement.java   (props changed)
    qpid/branches/java-broker-0-10/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/LoggingManagement.java   (props changed)
    qpid/branches/java-broker-0-10/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/ManagedBroker.java   (props changed)
    qpid/branches/java-broker-0-10/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/ManagedConnection.java   (props changed)
    qpid/branches/java-broker-0-10/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/ManagedExchange.java   (props changed)
    qpid/branches/java-broker-0-10/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/ManagedQueue.java   (props changed)
    qpid/branches/java-broker-0-10/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/UserManagement.java   (props changed)
    qpid/branches/java-broker-0-10/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/annotations/MBeanAttribute.java   (props changed)
    qpid/branches/java-broker-0-10/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/annotations/MBeanConstructor.java   (props changed)
    qpid/branches/java-broker-0-10/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/annotations/MBeanDescription.java   (props changed)
    qpid/branches/java-broker-0-10/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/annotations/MBeanOperation.java   (props changed)
    qpid/branches/java-broker-0-10/qpid/java/management/common/src/main/java/org/apache/qpid/management/common/mbeans/annotations/MBeanOperationParameter.java   (props changed)
    qpid/branches/java-broker-0-10/qpid/java/management/eclipse-plugin/src/   (props changed)
    qpid/branches/java-broker-0-10/qpid/java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/views/type/ConnectionTypeTabControl.java   (props changed)
    qpid/branches/java-broker-0-10/qpid/java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/views/type/ExchangeTypeTabControl.java   (props changed)
    qpid/branches/java-broker-0-10/qpid/java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/views/type/MBeanTypeTabControl.java   (props changed)
    qpid/branches/java-broker-0-10/qpid/java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/views/type/QueueTypeTabControl.java   (props changed)
    qpid/branches/java-broker-0-10/qpid/java/management/eclipse-plugin/src/main/resources/macosx/Contents/MacOS/qpidmc   (props changed)
    qpid/branches/java-broker-0-10/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/QpidTestCase.java
    qpid/branches/java-broker-0-10/qpid/java/test-profiles/010Excludes   (contents, props changed)
    qpid/branches/java-broker-0-10/qpid/java/test-profiles/08Excludes   (props changed)
    qpid/branches/java-broker-0-10/qpid/java/test-profiles/08StandaloneExcludes   (props changed)
    qpid/branches/java-broker-0-10/qpid/java/test-profiles/08TransientExcludes   (props changed)
    qpid/branches/java-broker-0-10/qpid/java/test-profiles/Excludes   (props changed)
    qpid/branches/java-broker-0-10/qpid/java/test-profiles/XAExcludes   (props changed)
    qpid/branches/java-broker-0-10/qpid/java/test-profiles/cpp.ssl.excludes   (props changed)

Propchange: qpid/branches/java-broker-0-10/qpid/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Mon Oct  5 10:47:52 2009
@@ -1 +1 @@
-/qpid/trunk/qpid:796196-799240
+/qpid/trunk/qpid:796196-800440

Modified: qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/amqp_0_10/Connection.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/amqp_0_10/Connection.cpp?rev=821749&r1=821748&r2=821749&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/amqp_0_10/Connection.cpp (original)
+++ qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/amqp_0_10/Connection.cpp Mon Oct  5 10:47:52 2009
@@ -31,7 +31,8 @@
 using sys::Mutex;
 
 Connection::Connection(sys::OutputControl& o, const std::string& id, bool _isClient)
-    : frameQueueClosed(false), output(o), identifier(id), initialized(false), isClient(_isClient), buffered(0)
+    : frameQueueClosed(false), output(o), identifier(id), initialized(false),
+      isClient(_isClient), buffered(0), version(0,10)
 {}
 
 void Connection::setInputHandler(std::auto_ptr<sys::ConnectionInputHandler> c) {
@@ -44,7 +45,9 @@
         //read in protocol header
         framing::ProtocolInitiation pi;
         if (pi.decode(in)) {
-            //TODO: check the version is correct
+            if(!(pi==version))
+                throw Exception(QPID_MSG("Unsupported version: " << pi
+                                         << " supported version " << version));
             QPID_LOG(trace, "RECV " << identifier << " INIT(" << pi << ")");
         }
         initialized = true;
@@ -128,7 +131,11 @@
 }
 
 framing::ProtocolVersion Connection::getVersion() const {
-    return framing::ProtocolVersion(0,10);
+    return version;
+}
+
+void Connection::setVersion(const framing::ProtocolVersion& v)  {
+    version = v;
 }
 
 size_t Connection::getBuffered() const {

Modified: qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/amqp_0_10/Connection.h
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/amqp_0_10/Connection.h?rev=821749&r1=821748&r2=821749&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/amqp_0_10/Connection.h (original)
+++ qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/amqp_0_10/Connection.h Mon Oct  5 10:47:52 2009
@@ -55,6 +55,7 @@
     bool initialized;
     bool isClient;
     size_t buffered;
+    framing::ProtocolVersion version;
 
   public:
     QPID_BROKER_EXTERN Connection(sys::OutputControl&, const std::string& id, bool isClient);
@@ -71,6 +72,10 @@
     void send(framing::AMQFrame&);
     framing::ProtocolVersion getVersion() const;
     size_t getBuffered() const;
+
+    /** Used by cluster code to set a special version on "update" connections. */
+    // FIXME aconway 2009-07-30: find a cleaner mechanism for this.
+    void setVersion(const framing::ProtocolVersion&);
 };
 
 }} // namespace qpid::amqp_0_10

Modified: qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/broker/Broker.h
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/broker/Broker.h?rev=821749&r1=821748&r2=821749&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/broker/Broker.h (original)
+++ qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/broker/Broker.h Mon Oct  5 10:47:52 2009
@@ -36,7 +36,6 @@
 #include "qpid/broker/QueueEvents.h"
 #include "qpid/broker/Vhost.h"
 #include "qpid/broker/System.h"
-#include "qpid/broker/Timer.h"
 #include "qpid/broker/ExpiryPolicy.h"
 #include "qpid/management/Manageable.h"
 #include "qpid/management/ManagementAgent.h"
@@ -49,6 +48,7 @@
 #include "qpid/framing/OutputHandler.h"
 #include "qpid/framing/ProtocolInitiation.h"
 #include "qpid/sys/Runnable.h"
+#include "qpid/sys/Timer.h"
 #include "qpid/RefCounted.h"
 #include "qpid/broker/AclModule.h"
 
@@ -115,13 +115,14 @@
       private:
         std::string getHome();
     };
- 
+
   private:
     typedef std::map<std::string, boost::shared_ptr<sys::ProtocolFactory> > ProtocolFactoryMap;
 
     void declareStandardExchange(const std::string& name, const std::string& type);
 
     boost::shared_ptr<sys::Poller> poller;
+    sys::Timer timer;
     Options config;
     ProtocolFactoryMap protocolFactories;
     std::auto_ptr<MessageStore> store;
@@ -132,7 +133,6 @@
     ExchangeRegistry exchanges;
     LinkRegistry links;
     boost::shared_ptr<sys::ConnectionCodec::Factory> factory;
-    Timer timer;
     DtxManager dtxManager;
     SessionManager sessionManager;
     management::ManagementAgent* managementAgent;
@@ -148,8 +148,6 @@
     boost::intrusive_ptr<ExpiryPolicy> expiryPolicy;
 
   public:
-
-  
     virtual ~Broker();
 
     QPID_BROKER_EXTERN Broker(const Options& configuration);
@@ -188,7 +186,7 @@
 
     void setExpiryPolicy(const boost::intrusive_ptr<ExpiryPolicy>& e) { expiryPolicy = e; }
     boost::intrusive_ptr<ExpiryPolicy> getExpiryPolicy() { return expiryPolicy; }
-    
+
     SessionManager& getSessionManager() { return sessionManager; }
     const std::string& getFederationTag() const { return federationTag; }
 
@@ -197,7 +195,7 @@
     management::Manageable::status_t  ManagementMethod (uint32_t methodId,
                                                         management::Args& args,
                                                         std::string& text);
-    
+
     /** Add to the broker's protocolFactorys */
     void registerProtocolFactory(const std::string& name, boost::shared_ptr<sys::ProtocolFactory>);
 
@@ -229,7 +227,7 @@
     boost::shared_ptr<sys::ConnectionCodec::Factory> getConnectionFactory() { return factory; }
     void setConnectionFactory(boost::shared_ptr<sys::ConnectionCodec::Factory> f) { factory = f; }
 
-    Timer& getTimer() { return timer; }
+    sys::Timer& getTimer() { return timer; }
 
     boost::function<std::vector<Url> ()> getKnownBrokers;
 
@@ -242,7 +240,5 @@
 };
 
 }}
-            
-
 
 #endif  /*!_Broker_*/

Modified: qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/broker/Connection.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/broker/Connection.cpp?rev=821749&r1=821748&r2=821749&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/broker/Connection.cpp (original)
+++ qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/broker/Connection.cpp Mon Oct  5 10:47:52 2009
@@ -49,35 +49,25 @@
 namespace qpid {
 namespace broker {
 
-struct ConnectionTimeoutTask : public TimerTask {
-    Timer& timer;
+struct ConnectionTimeoutTask : public sys::TimerTask {
+    sys::Timer& timer;
     Connection& connection;
-    AbsTime expires;
 
-    ConnectionTimeoutTask(uint16_t hb, Timer& t, Connection& c) :
+    ConnectionTimeoutTask(uint16_t hb, sys::Timer& t, Connection& c) :
         TimerTask(Duration(hb*2*TIME_SEC)),
         timer(t),
-        connection(c),
-        expires(AbsTime::now(), duration)
+        connection(c)
     {}
 
-    void touch()
-    {
-        expires = AbsTime(AbsTime::now(), duration);
+    void touch() {
+        restart();
     }
 
     void fire() {
-        // This is the best we can currently do to avoid a destruction/fire race
-        if (isCancelled()) return;
-        if (expires < AbsTime::now()) {
-            // If we get here then we've not received any traffic in the timeout period
-            // Schedule closing the connection for the io thread
-            QPID_LOG(error, "Connection timed out: closing");
-            connection.abort();
-        } else {
-            reset();
-            timer.add(this);
-        }
+        // If we get here then we've not received any traffic in the timeout period
+        // Schedule closing the connection for the io thread
+        QPID_LOG(error, "Connection timed out: closing");
+        connection.abort();
     }
 };
 
@@ -338,25 +328,22 @@
     adapter.setSecureConnection(s);
 }
 
-struct ConnectionHeartbeatTask : public TimerTask {
-    Timer& timer;
+struct ConnectionHeartbeatTask : public sys::TimerTask {
+    sys::Timer& timer;
     Connection& connection;
-    ConnectionHeartbeatTask(uint16_t hb, Timer& t, Connection& c) :
+    ConnectionHeartbeatTask(uint16_t hb, sys::Timer& t, Connection& c) :
         TimerTask(Duration(hb*TIME_SEC)),
         timer(t),
         connection(c)
     {}
 
     void fire() {
-        // This is the best we can currently do to avoid a destruction/fire race
-        if (!isCancelled()) {
-            // Setup next firing
-            reset();
-            timer.add(this);
+        // Setup next firing
+        setupNextFire();
+        timer.add(this);
 
-            // Send Heartbeat
-            connection.sendHeartbeat();
-        }
+        // Send Heartbeat
+        connection.sendHeartbeat();
     }
 };
 

Modified: qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/broker/Connection.h
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/broker/Connection.h?rev=821749&r1=821748&r2=821749&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/broker/Connection.h (original)
+++ qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/broker/Connection.h Mon Oct  5 10:47:52 2009
@@ -152,8 +152,8 @@
     qmf::org::apache::qpid::broker::Connection* mgmtObject;
     LinkRegistry& links;
     management::ManagementAgent* agent;
-    Timer& timer;
-    boost::intrusive_ptr<TimerTask> heartbeatTimer;
+    sys::Timer& timer;
+    boost::intrusive_ptr<sys::TimerTask> heartbeatTimer;
     boost::intrusive_ptr<ConnectionTimeoutTask> timeoutTimer;
     ErrorListener* errorListener;
     bool shadow;

Modified: qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/broker/DtxManager.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/broker/DtxManager.cpp?rev=821749&r1=821748&r2=821749&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/broker/DtxManager.cpp (original)
+++ qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/broker/DtxManager.cpp Mon Oct  5 10:47:52 2009
@@ -22,6 +22,7 @@
 #include "qpid/broker/DtxTimeout.h"
 #include "qpid/framing/reply_exceptions.h"
 #include "qpid/log/Statement.h"
+#include "qpid/sys/Timer.h"
 #include "qpid/ptr_map.h"
 
 #include <boost/format.hpp>
@@ -33,7 +34,7 @@
 using namespace qpid::broker;
 using namespace qpid::framing;
 
-DtxManager::DtxManager(Timer& t) : store(0), timer(t) {}
+DtxManager::DtxManager(qpid::sys::Timer& t) : store(0), timer(t) {}
 
 DtxManager::~DtxManager() {}
 
@@ -130,8 +131,7 @@
     }
     timeout = intrusive_ptr<DtxTimeout>(new DtxTimeout(secs, *this, xid));
     record->setTimeout(timeout);
-    timer.add(boost::static_pointer_cast<TimerTask>(timeout));
-    
+    timer.add(timeout);
 }
 
 uint32_t DtxManager::getTimeout(const std::string& xid)

Modified: qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/broker/DtxManager.h
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/broker/DtxManager.h?rev=821749&r1=821748&r2=821749&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/broker/DtxManager.h (original)
+++ qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/broker/DtxManager.h Mon Oct  5 10:47:52 2009
@@ -24,9 +24,9 @@
 #include <boost/ptr_container/ptr_map.hpp>
 #include "qpid/broker/DtxBuffer.h"
 #include "qpid/broker/DtxWorkRecord.h"
-#include "qpid/broker/Timer.h"
 #include "qpid/broker/TransactionalStore.h"
 #include "qpid/framing/amqp_types.h"
+#include "qpid/sys/Timer.h"
 #include "qpid/sys/Mutex.h"
 
 namespace qpid {
@@ -35,7 +35,7 @@
 class DtxManager{
     typedef boost::ptr_map<std::string, DtxWorkRecord> WorkMap;
 
-    struct DtxCleanup : public TimerTask
+    struct DtxCleanup : public sys::TimerTask
     {
         DtxManager& mgr;
         const std::string& xid;
@@ -47,14 +47,14 @@
     WorkMap work;
     TransactionalStore* store;
     qpid::sys::Mutex lock;
-    Timer& timer;
+    qpid::sys::Timer& timer;
 
     void remove(const std::string& xid);
     DtxWorkRecord* getWork(const std::string& xid);
     DtxWorkRecord* createWork(std::string xid);
 
 public:
-    DtxManager(Timer&);
+    DtxManager(qpid::sys::Timer&);
     ~DtxManager();
     void start(const std::string& xid, DtxBuffer::shared_ptr work);
     void join(const std::string& xid, DtxBuffer::shared_ptr work);

Modified: qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/broker/DtxTimeout.h
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/broker/DtxTimeout.h?rev=821749&r1=821748&r2=821749&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/broker/DtxTimeout.h (original)
+++ qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/broker/DtxTimeout.h Mon Oct  5 10:47:52 2009
@@ -22,7 +22,7 @@
 #define _DtxTimeout_
 
 #include "qpid/Exception.h"
-#include "qpid/broker/Timer.h"
+#include "qpid/sys/Timer.h"
 
 namespace qpid {
 namespace broker {
@@ -31,12 +31,12 @@
 
 struct DtxTimeoutException : public Exception {};
 
-struct DtxTimeout : public TimerTask
+struct DtxTimeout : public sys::TimerTask
 {
     const uint32_t timeout;
     DtxManager& mgr;
     const std::string xid;
-    
+
     DtxTimeout(uint32_t timeout, DtxManager& mgr, const std::string& xid);    
     void fire();
 };

Modified: qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/broker/LinkRegistry.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/broker/LinkRegistry.cpp?rev=821749&r1=821748&r2=821749&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/broker/LinkRegistry.cpp (original)
+++ qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/broker/LinkRegistry.cpp Mon Oct  5 10:47:52 2009
@@ -41,18 +41,18 @@
 // factored: The persistence element and maintenance element
 // should be factored separately
 LinkRegistry::LinkRegistry () :
-    broker(0),
+    broker(0), timer(0),
     parent(0), store(0), passive(false), passiveChanged(false),
     realm("")
 {
 }
 
 LinkRegistry::LinkRegistry (Broker* _broker) :
-    broker(_broker),
-    parent(0), store(0), passive(false), passiveChanged(false), 
+    broker(_broker), timer(&broker->getTimer()),
+    parent(0), store(0), passive(false), passiveChanged(false),
     realm(broker->getOptions().realm)
 {
-    timer.add (intrusive_ptr<TimerTask> (new Periodic(*this)));
+    timer->add (new Periodic(*this));
 }
 
 LinkRegistry::Periodic::Periodic (LinkRegistry& _links) :
@@ -61,7 +61,7 @@
 void LinkRegistry::Periodic::fire ()
 {
     links.periodicMaintenance ();
-    links.timer.add (intrusive_ptr<TimerTask> (new Periodic(links)));
+    links.timer->add (new Periodic(links));
 }
 
 void LinkRegistry::periodicMaintenance ()

Modified: qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/broker/LinkRegistry.h
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/broker/LinkRegistry.h?rev=821749&r1=821748&r2=821749&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/broker/LinkRegistry.h (original)
+++ qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/broker/LinkRegistry.h Mon Oct  5 10:47:52 2009
@@ -25,9 +25,9 @@
 #include <map>
 #include "qpid/broker/Bridge.h"
 #include "qpid/broker/MessageStore.h"
-#include "qpid/broker/Timer.h"
 #include "qpid/Address.h"
 #include "qpid/sys/Mutex.h"
+#include "qpid/sys/Timer.h"
 #include "qpid/management/Manageable.h"
 #include <boost/shared_ptr.hpp>
 
@@ -41,7 +41,7 @@
 
         // Declare a timer task to manage the establishment of link connections and the
         // re-establishment of lost link connections.
-        struct Periodic : public TimerTask
+        struct Periodic : public sys::TimerTask
         {
             LinkRegistry& links;
 
@@ -62,7 +62,7 @@
 
         qpid::sys::Mutex lock;
         Broker* broker;
-        Timer   timer;
+        sys::Timer* timer;
         management::Manageable* parent;
         MessageStore* store;
         bool passive;

Modified: qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/broker/Queue.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/broker/Queue.cpp?rev=821749&r1=821748&r2=821749&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/broker/Queue.cpp (original)
+++ qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/broker/Queue.cpp Mon Oct  5 10:47:52 2009
@@ -677,21 +677,25 @@
 {
     if (persistLastNode){
         Mutex::ScopedLock locker(messageLock);
-    	for ( Messages::iterator i = messages.begin(); i != messages.end(); ++i ) {
-            if (lastValueQueue) checkLvqReplace(*i);
-            // don't force a message twice to disk.
-            if(!i->payload->isStoredOnQueue(shared_from_this())) {
-                i->payload->forcePersistent();
-                if (i->payload->isForcedPersistent() ){
-            	    enqueue(0, i->payload);
+        try {
+    	    for ( Messages::iterator i = messages.begin(); i != messages.end(); ++i ) {
+                if (lastValueQueue) checkLvqReplace(*i);
+                // don't force a message twice to disk.
+                if(!i->payload->isStoredOnQueue(shared_from_this())) {
+                    i->payload->forcePersistent();
+                    if (i->payload->isForcedPersistent() ){
+            	        enqueue(0, i->payload);
+                    }
                 }
-            }
-    	}
+    	    }
+        } catch (const std::exception& e) {
+            // Could not go into last node standing (for example journal not large enough)
+            QPID_LOG(error, "Unable to fail to last node standing for queue: " << name << " : " << e.what());
+        }
         inLastNodeFailure = true;
     }
 }
 
-
 // return true if store exists, 
 bool Queue::enqueue(TransactionContext* ctxt, boost::intrusive_ptr<Message> msg)
 {
@@ -1012,6 +1016,10 @@
     sequence = n;
 }
 
+SequenceNumber Queue::getPosition() {
+    return sequence;
+}
+
 int Queue::getEventMode() { return eventMode; }
 
 void Queue::setQueueEventManager(QueueEvents& mgr)

Modified: qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/broker/Queue.h
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/broker/Queue.h?rev=821749&r1=821748&r2=821749&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/broker/Queue.h (original)
+++ qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/broker/Queue.h Mon Oct  5 10:47:52 2009
@@ -324,6 +324,9 @@
              * Used by cluster to replicate queues.
              */
             void setPosition(framing::SequenceNumber pos);
+            /** return current position sequence number for the next message on the queue.
+            */
+            framing::SequenceNumber getPosition();
             int getEventMode();
             void setQueueEventManager(QueueEvents&);
             QPID_BROKER_EXTERN void insertSequenceNumbers(const std::string& key);

Modified: qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/broker/QueueCleaner.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/broker/QueueCleaner.cpp?rev=821749&r1=821748&r2=821749&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/broker/QueueCleaner.cpp (original)
+++ qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/broker/QueueCleaner.cpp Mon Oct  5 10:47:52 2009
@@ -26,15 +26,15 @@
 namespace qpid {
 namespace broker {
 
-QueueCleaner::QueueCleaner(QueueRegistry& q, Timer& t) : queues(q), timer(t) {}
+QueueCleaner::QueueCleaner(QueueRegistry& q, sys::Timer& t) : queues(q), timer(t) {}
 
 void QueueCleaner::start(qpid::sys::Duration p)
 {
-    task = boost::intrusive_ptr<TimerTask>(new Task(*this, p));
+    task = new Task(*this, p);
     timer.add(task);
 }
 
-QueueCleaner::Task::Task(QueueCleaner& p, qpid::sys::Duration d) : TimerTask(d), parent(p) {}
+QueueCleaner::Task::Task(QueueCleaner& p, qpid::sys::Duration d) : sys::TimerTask(d), parent(p) {}
 
 void QueueCleaner::Task::fire()
 {
@@ -44,7 +44,7 @@
 void QueueCleaner::fired()
 {
     queues.eachQueue(boost::bind(&Queue::purgeExpired, _1));
-    task->reset();
+    task->setupNextFire();
     timer.add(task);
 }
 

Modified: qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/broker/QueueCleaner.h
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/broker/QueueCleaner.h?rev=821749&r1=821748&r2=821749&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/broker/QueueCleaner.h (original)
+++ qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/broker/QueueCleaner.h Mon Oct  5 10:47:52 2009
@@ -23,7 +23,7 @@
  */
 
 #include "qpid/broker/BrokerImportExport.h"
-#include "qpid/broker/Timer.h"
+#include "qpid/sys/Timer.h"
 
 namespace qpid {
 namespace broker {
@@ -35,10 +35,10 @@
 class QueueCleaner
 {
   public:
-    QPID_BROKER_EXTERN QueueCleaner(QueueRegistry& queues, Timer& timer);
+    QPID_BROKER_EXTERN QueueCleaner(QueueRegistry& queues, sys::Timer& timer);
     QPID_BROKER_EXTERN void start(qpid::sys::Duration period);
   private:
-    class Task : public TimerTask
+    class Task : public sys::TimerTask
     {
       public:
         Task(QueueCleaner& parent, qpid::sys::Duration duration);
@@ -46,10 +46,10 @@
       private:
         QueueCleaner& parent;
     };
-    
-    boost::intrusive_ptr<TimerTask> task;
+
+    boost::intrusive_ptr<sys::TimerTask> task;
     QueueRegistry& queues;
-    Timer& timer;
+    sys::Timer& timer;
 
     void fired();
 };

Modified: qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/broker/SessionState.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/broker/SessionState.cpp?rev=821749&r1=821748&r2=821749&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/broker/SessionState.cpp (original)
+++ qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/broker/SessionState.cpp Mon Oct  5 10:47:52 2009
@@ -25,7 +25,7 @@
 #include "qpid/broker/SessionManager.h"
 #include "qpid/broker/SessionHandler.h"
 #include "qpid/broker/RateFlowcontrol.h"
-#include "qpid/broker/Timer.h"
+#include "qpid/sys/Timer.h"
 #include "qpid/framing/AMQContentBody.h"
 #include "qpid/framing/AMQHeaderBody.h"
 #include "qpid/framing/AMQMethodBody.h"
@@ -49,6 +49,7 @@
 using qpid::management::Manageable;
 using qpid::management::Args;
 using qpid::sys::AbsTime;
+//using qpid::sys::Timer;
 namespace _qmf = qmf::org::apache::qpid::broker;
 
 SessionState::SessionState(
@@ -206,10 +207,10 @@
     }
 }
 
-struct ScheduledCreditTask : public TimerTask {
-    Timer& timer;
+struct ScheduledCreditTask : public sys::TimerTask {
+    sys::Timer& timer;
     SessionState& sessionState;
-    ScheduledCreditTask(const qpid::sys::Duration& d, Timer& t,
+    ScheduledCreditTask(const qpid::sys::Duration& d, sys::Timer& t,
                         SessionState& s) :
         TimerTask(d),
         timer(t),
@@ -218,15 +219,13 @@
 
     void fire() {
         // This is the best we can currently do to avoid a destruction/fire race
-        if (!isCancelled()) {
-            sessionState.getConnection().requestIOProcessing(boost::bind(&ScheduledCreditTask::sendCredit, this));
-        }
+        sessionState.getConnection().requestIOProcessing(boost::bind(&ScheduledCreditTask::sendCredit, this));
     }
 
     void sendCredit() {
         if ( !sessionState.processSendCredit(0) ) {
             QPID_LOG(warning, sessionState.getId() << ": Reschedule sending credit");
-            reset();
+            setupNextFire();
             timer.add(this);
         }
     }
@@ -269,7 +268,7 @@
     if (rateFlowcontrol && frame.getBof() && frame.getBos()) {
         if ( !processSendCredit(1) ) {
             QPID_LOG(debug, getId() << ": Schedule sending credit");
-            Timer& timer = getBroker().getTimer();
+            sys::Timer& timer = getBroker().getTimer();
             // Use heuristic for scheduled credit of time for 50 messages, but not longer than 500ms
             sys::Duration d = std::min(sys::TIME_SEC * 50 / rateFlowcontrol->getRate(), 500 * sys::TIME_MSEC);
             flowControlTimer = new ScheduledCreditTask(d, timer, *this);

Modified: qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/broker/SessionState.h
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/broker/SessionState.h?rev=821749&r1=821748&r2=821749&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/broker/SessionState.h (original)
+++ qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/broker/SessionState.h Mon Oct  5 10:47:52 2009
@@ -48,6 +48,10 @@
 class AMQP_ClientProxy;
 }
 
+namespace sys {
+class TimerTask;
+}
+
 namespace broker {
 
 class Broker;
@@ -56,7 +60,6 @@
 class SessionHandler;
 class SessionManager;
 class RateFlowcontrol;
-struct TimerTask;
 
 /**
  * Broker-side session state includes session's handler chains, which
@@ -153,7 +156,7 @@
     // State used for producer flow control (rate limited)
     qpid::sys::Mutex rateLock;
     boost::scoped_ptr<RateFlowcontrol> rateFlowcontrol;
-    boost::intrusive_ptr<TimerTask> flowControlTimer;
+    boost::intrusive_ptr<sys::TimerTask> flowControlTimer;
 
     friend class SessionManager;
 };

Modified: qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/client/Connector.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/client/Connector.cpp?rev=821749&r1=821748&r2=821749&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/client/Connector.cpp (original)
+++ qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/client/Connector.cpp Mon Oct  5 10:47:52 2009
@@ -360,8 +360,11 @@
     if (!initiated) {
         framing::ProtocolInitiation protocolInit;
         if (protocolInit.decode(in)) {
-            //TODO: check the version is correct
             QPID_LOG(debug, "RECV " << identifier << " INIT(" << protocolInit << ")");
+            if(!(protocolInit==version)){
+                throw Exception(QPID_MSG("Unsupported version: " << protocolInit
+                                         << " supported version " << version));
+            }
         }
         initiated = true;
     }

Modified: qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/cluster/Cluster.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/cluster/Cluster.cpp?rev=821749&r1=821748&r2=821749&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/cluster/Cluster.cpp (original)
+++ qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/cluster/Cluster.cpp Mon Oct  5 10:47:52 2009
@@ -66,7 +66,7 @@
  * 
  * Events are either
  *  - Connection events: non-0 connection number and are associated with a connection.
- *  - Cluster Events: 0 connection number, are not associated with a connectin.
+ *  - Cluster Events: 0 connection number, are not associated with a connection.
  * 
  * Events are further categorized as:
  *  - Control: carries method frame(s) that affect cluster behavior.
@@ -149,7 +149,7 @@
  * sensible reporting of an attempt to mix different versions in a
  * cluster.
  */
-const uint32_t Cluster::CLUSTER_VERSION = 1;
+const uint32_t Cluster::CLUSTER_VERSION = 2;
 
 struct ClusterDispatcher : public framing::AMQP_AllOperations::ClusterHandler {
     qpid::cluster::Cluster& cluster;
@@ -163,7 +163,7 @@
     void updateOffer(uint64_t updatee, const Uuid& id, uint32_t version) { cluster.updateOffer(member, updatee, id, version, l); }
     void retractOffer(uint64_t updatee) { cluster.retractOffer(member, updatee, l); }
     void messageExpired(uint64_t id) { cluster.messageExpired(member, id, l); }
-    void errorCheck(uint8_t type, uint64_t frameSeq) { cluster.errorCheck(member, type, frameSeq, l); }
+    void errorCheck(uint8_t type, const framing::SequenceNumber& frameSeq) { cluster.errorCheck(member, type, frameSeq, l); }
 
     void shutdown() { cluster.shutdown(member, l); }
 
@@ -869,15 +869,12 @@
     expiryPolicy->deliverExpire(id);
 }
 
-void Cluster::errorCheck(const MemberId&, uint8_t type, uint64_t frameSeq, Lock&) {
-    // If we handle an errorCheck at this point (rather than in the
-    // ErrorCheck class) then we have processed succesfully past the
-    // point of the error.
-    if (state >= CATCHUP && type != ERROR_TYPE_NONE) {
-        QPID_LOG(notice, *this << " error " << frameSeq << " did not occur locally.");
-        mcast.mcastControl(
-            ClusterErrorCheckBody(ProtocolVersion(), ERROR_TYPE_NONE, frameSeq), self);
-    }
+void Cluster::errorCheck(const MemberId& from, uint8_t type, framing::SequenceNumber frameSeq, Lock&) {
+    // If we see an errorCheck here (rather than in the ErrorCheck
+    // class) then we have processed succesfully past the point of the
+    // error.
+    if (state >= CATCHUP) // Don't respond pre catchup, we don't know what happened
+        error.respondNone(from, type, frameSeq);
 }
 
 

Modified: qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/cluster/Cluster.h
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/cluster/Cluster.h?rev=821749&r1=821748&r2=821749&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/cluster/Cluster.h (original)
+++ qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/cluster/Cluster.h Mon Oct  5 10:47:52 2009
@@ -152,7 +152,7 @@
     void ready(const MemberId&, const std::string&, Lock&);
     void configChange(const MemberId&, const std::string& current, Lock& l);
     void messageExpired(const MemberId&, uint64_t, Lock& l);
-    void errorCheck(const MemberId&, uint8_t type, uint64_t frameSeq, Lock&);
+    void errorCheck(const MemberId&, uint8_t type, SequenceNumber frameSeq, Lock&);
 
     void shutdown(const MemberId&, Lock&);
 

Modified: qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/cluster/ClusterMap.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/cluster/ClusterMap.cpp?rev=821749&r1=821748&r2=821749&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/cluster/ClusterMap.cpp (original)
+++ qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/cluster/ClusterMap.cpp Mon Oct  5 10:47:52 2009
@@ -71,7 +71,7 @@
         joiners[id] = url;
 }
 
-ClusterMap::ClusterMap(const FieldTable& joinersFt, const FieldTable& membersFt, uint64_t frameSeq_)
+ClusterMap::ClusterMap(const FieldTable& joinersFt, const FieldTable& membersFt, framing::SequenceNumber frameSeq_)
   : frameSeq(frameSeq_)
 {
     std::for_each(joinersFt.begin(), joinersFt.end(), boost::bind(&addFieldTableValue, _1, boost::ref(joiners), boost::ref(alive)));

Modified: qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/cluster/ClusterMap.h
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/cluster/ClusterMap.h?rev=821749&r1=821748&r2=821749&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/cluster/ClusterMap.h (original)
+++ qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/cluster/ClusterMap.h Mon Oct  5 10:47:52 2009
@@ -25,6 +25,7 @@
 #include "qpid/cluster/types.h"
 #include "qpid/Url.h"
 #include "qpid/framing/ClusterConnectionMembershipBody.h"
+#include "qpid/framing/SequenceNumber.h"
 
 #include <boost/function.hpp>
 #include <boost/optional.hpp>
@@ -53,7 +54,7 @@
         
     ClusterMap();
     ClusterMap(const MemberId& id, const Url& url, bool isReady);
-    ClusterMap(const framing::FieldTable& joiners, const framing::FieldTable& members, uint64_t frameSeq);
+    ClusterMap(const framing::FieldTable& joiners, const framing::FieldTable& members, framing::SequenceNumber frameSeq);
 
     /** Update from config change.
      *@return true if member set changed.
@@ -92,8 +93,8 @@
      */
     static Set intersection(const Set& a, const Set& b);
 
-    uint64_t getFrameSeq() { return frameSeq; }
-    uint64_t incrementFrameSeq() { return ++frameSeq; }
+    framing::SequenceNumber getFrameSeq() { return frameSeq; }
+    framing::SequenceNumber incrementFrameSeq() { return ++frameSeq; }
     
     /** Clear out all knowledge of joiners & members, just keep alive set */
     void clearStatus() { joiners.clear(); members.clear(); }
@@ -103,7 +104,7 @@
     
     Map joiners, members;
     Set alive;
-    uint64_t frameSeq;
+    framing::SequenceNumber frameSeq;
 
   friend std::ostream& operator<<(std::ostream&, const Map&);
   friend std::ostream& operator<<(std::ostream&, const ClusterMap&);

Modified: qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/cluster/Connection.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/cluster/Connection.cpp?rev=821749&r1=821748&r2=821749&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/cluster/Connection.cpp (original)
+++ qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/cluster/Connection.cpp Mon Oct  5 10:47:52 2009
@@ -311,7 +311,7 @@
     output.setSendMax(sendMax);
 }
 
-void Connection::membership(const FieldTable& joiners, const FieldTable& members, uint64_t frameSeq) {
+void Connection::membership(const FieldTable& joiners, const FieldTable& members, const framing::SequenceNumber& frameSeq) {
     QPID_LOG(debug, cluster << " incoming update complete on connection " << *this);
     cluster.updateInDone(ClusterMap(joiners, members, frameSeq));
     consumerNumbering.clear();

Modified: qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/cluster/Connection.h
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/cluster/Connection.h?rev=821749&r1=821748&r2=821749&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/cluster/Connection.h (original)
+++ qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/cluster/Connection.h Mon Oct  5 10:47:52 2009
@@ -122,7 +122,7 @@
     
     void shadowReady(uint64_t memberId, uint64_t connectionId, const std::string& username, const std::string& fragment, uint32_t sendMax);
 
-    void membership(const framing::FieldTable&, const framing::FieldTable&, uint64_t frameSeq);
+    void membership(const framing::FieldTable&, const framing::FieldTable&, const framing::SequenceNumber& frameSeq);
 
     void retractOffer();
 

Modified: qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/cluster/ConnectionCodec.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/cluster/ConnectionCodec.cpp?rev=821749&r1=821748&r2=821749&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/cluster/ConnectionCodec.cpp (original)
+++ qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/cluster/ConnectionCodec.cpp Mon Oct  5 10:47:52 2009
@@ -38,24 +38,27 @@
 sys::ConnectionCodec*
 ConnectionCodec::Factory::create(ProtocolVersion v, sys::OutputControl& out, const std::string& id) {
     if (v == ProtocolVersion(0, 10))
-        return new ConnectionCodec(out, id, cluster, false, false);
-    else if (v == ProtocolVersion(0x80 + 0, 0x80 + 10))
-        return new ConnectionCodec(out, id, cluster, true, false); // Catch-up connection
+        return new ConnectionCodec(v, out, id, cluster, false, false);
+    else if (v == ProtocolVersion(0x80 + 0, 0x80 + 10)) // Catch-up connection
+        return new ConnectionCodec(v, out, id, cluster, true, false); 
     return 0;
 }
 
 // Used for outgoing Link connections
 sys::ConnectionCodec*
 ConnectionCodec::Factory::create(sys::OutputControl& out, const std::string& logId) {
-    return new ConnectionCodec(out, logId, cluster, false, true);
+    return new ConnectionCodec(ProtocolVersion(0,10), out, logId, cluster, false, true);
 }
 
-ConnectionCodec::ConnectionCodec(sys::OutputControl& out, const std::string& logId, Cluster& cluster, bool catchUp, bool isLink)
-    : codec(out, logId, isLink),
-      interceptor(new Connection(cluster, codec, logId, cluster.getId(), catchUp, isLink))
+ConnectionCodec::ConnectionCodec(
+    const ProtocolVersion& v, sys::OutputControl& out,
+    const std::string& logId, Cluster& cluster, bool catchUp, bool isLink
+) : codec(out, logId, isLink),
+    interceptor(new Connection(cluster, codec, logId, cluster.getId(), catchUp, isLink))
 {
     std::auto_ptr<sys::ConnectionInputHandler> ih(new ProxyInputHandler(interceptor));
     codec.setInputHandler(ih);
+    codec.setVersion(v);
 }
 
 ConnectionCodec::~ConnectionCodec() {}

Modified: qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/cluster/ConnectionCodec.h
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/cluster/ConnectionCodec.h?rev=821749&r1=821748&r2=821749&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/cluster/ConnectionCodec.h (original)
+++ qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/cluster/ConnectionCodec.h Mon Oct  5 10:47:52 2009
@@ -56,7 +56,8 @@
         sys::ConnectionCodec* create(sys::OutputControl&, const std::string& id);
     };
 
-    ConnectionCodec(sys::OutputControl& out, const std::string& logId, Cluster& c, bool catchUp, bool isLink);
+    ConnectionCodec(const framing::ProtocolVersion&, sys::OutputControl& out,
+                    const std::string& logId, Cluster& c, bool catchUp, bool isLink);
     ~ConnectionCodec();
 
     // ConnectionCodec functions.

Modified: qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/cluster/ErrorCheck.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/cluster/ErrorCheck.cpp?rev=821749&r1=821748&r2=821749&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/cluster/ErrorCheck.cpp (original)
+++ qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/cluster/ErrorCheck.cpp Mon Oct  5 10:47:52 2009
@@ -45,11 +45,11 @@
 }
 
 void ErrorCheck::error(
-    Connection& c, ErrorType t, uint64_t seq, const MemberSet& ms, const std::string& msg)
+    Connection& c, ErrorType t, framing::SequenceNumber seq, const MemberSet& ms, const std::string& msg)
 {
     // Detected a local error, inform cluster and set error state.
     assert(t != ERROR_TYPE_NONE); // Must be an error.
-    assert(type == ERROR_TYPE_NONE); // Can only be called while processing
+    assert(type == ERROR_TYPE_NONE); // Can't be called when already in an error state.
     type = t;
     unresolved = ms;
     frameSeq = seq;
@@ -59,7 +59,7 @@
              << " error " << frameSeq << " on " << c << ": " << msg
              << " must be resolved with: " << unresolved);
     mcast.mcastControl(
-        ClusterErrorCheckBody(ProtocolVersion(), type, frameSeq), c.getId().getMember());
+        ClusterErrorCheckBody(ProtocolVersion(), type, frameSeq), cluster.getId());
     // If there are already frames queued up by a previous error, review
     // them with respect to this new error.
     for (FrameQueue::iterator i = frames.begin(); i != frames.end(); i = review(i))
@@ -74,41 +74,52 @@
 // Review a frame in the queue with respect to the current error.
 ErrorCheck::FrameQueue::iterator ErrorCheck::review(const FrameQueue::iterator& i) {
     FrameQueue::iterator next = i+1;
-    if (isUnresolved()) {
-        const ClusterErrorCheckBody* errorCheck = 0;
-        if (i->frame.getBody())
-            errorCheck = dynamic_cast<const ClusterErrorCheckBody*>(
-                i->frame.getMethod());
-        if (errorCheck && errorCheck->getFrameSeq() == frameSeq) { // Same error
+    if(!isUnresolved() || !i->frame.getBody() || !i->frame.getMethod())
+        return next;            // Only interested in control frames while unresolved.
+    const AMQMethodBody* method = i->frame.getMethod();
+    if (method->isA<const ClusterErrorCheckBody>()) {
+        const ClusterErrorCheckBody* errorCheck =
+            static_cast<const ClusterErrorCheckBody*>(method);
+
+        if (errorCheck->getFrameSeq() == frameSeq) { // Addresses current error
             next = frames.erase(i);    // Drop matching error check controls
             if (errorCheck->getType() < type) { // my error is worse than his
                 QPID_LOG(critical, cluster << " error " << frameSeq
                          << " did not occur on " << i->getMemberId());
-                throw Exception("Aborted by failure that did not occur on all replicas");
+                throw Exception(QPID_MSG("Error " << frameSeq
+                                         << " did not occur on all members"));
             }
             else {              // his error is worse/same as mine.
-                QPID_LOG(notice, cluster << " error " << frameSeq
+                QPID_LOG(info, cluster << " error " << frameSeq
                          << " resolved with " << i->getMemberId());
                 unresolved.erase(i->getMemberId());
                 checkResolved();
             }
         }
-        else {
-            const ClusterConfigChangeBody* configChange = 0;
-            if (i->frame.getBody())
-                configChange = dynamic_cast<const ClusterConfigChangeBody*>(
-                    i->frame.getMethod());
-            if (configChange) {
-                MemberSet members(ClusterMap::decode(configChange->getCurrent()));
-                QPID_LOG(debug, cluster << " apply config change to unresolved: "
-                         << members);
-                MemberSet intersect;
-                set_intersection(members.begin(), members.end(),
-                                 unresolved.begin(), unresolved.end(),
-                                 inserter(intersect, intersect.begin()));
-                unresolved.swap(intersect);
-                checkResolved();
-            }
+        else if (errorCheck->getFrameSeq() < frameSeq && errorCheck->getType() != NONE
+                 && i->connectionId.getMember() != cluster.getId())
+        {
+            // This error occured before the current error so we
+            // have processed past it.
+            next = frames.erase(i); // Drop the error check control
+            respondNone(i->connectionId.getMember(), errorCheck->getType(),
+                        errorCheck->getFrameSeq());
+        }
+        // if errorCheck->getFrameSeq() > frameSeq then leave it in the queue.
+    }
+    else if (method->isA<const ClusterConfigChangeBody>()) {
+        const ClusterConfigChangeBody* configChange =
+            static_cast<const ClusterConfigChangeBody*>(method);
+        if (configChange) {
+            MemberSet members(ClusterMap::decode(configChange->getCurrent()));
+            QPID_LOG(debug, cluster << " apply config change to error "
+                     << frameSeq << ": " << members);
+            MemberSet intersect;
+            set_intersection(members.begin(), members.end(),
+                             unresolved.begin(), unresolved.end(),
+                             inserter(intersect, intersect.begin()));
+            unresolved.swap(intersect);
+            checkResolved();
         }
     }
     return next;
@@ -117,10 +128,10 @@
 void ErrorCheck::checkResolved() {
     if (unresolved.empty()) {   // No more potentially conflicted members, we're clear.
         type = ERROR_TYPE_NONE;
-        QPID_LOG(notice, cluster << " error " << frameSeq << " resolved.");
+        QPID_LOG(info, cluster << " error " << frameSeq << " resolved.");
     }
     else 
-        QPID_LOG(notice, cluster << " error " << frameSeq
+        QPID_LOG(info, cluster << " error " << frameSeq
                  << " must be resolved with " << unresolved);
 }
 
@@ -131,4 +142,15 @@
     return e;
 }
 
+void ErrorCheck::respondNone(const MemberId& from, uint8_t type, framing::SequenceNumber frameSeq) {
+    // Don't respond to non-errors or to my own errors.
+    if (type == ERROR_TYPE_NONE || from == cluster.getId())
+        return;
+    QPID_LOG(info, cluster << " error " << frameSeq << " did not occur locally.");
+    mcast.mcastControl(
+        ClusterErrorCheckBody(ProtocolVersion(), ERROR_TYPE_NONE, frameSeq),
+        cluster.getId()
+    );
+}
+
 }} // namespace qpid::cluster

Modified: qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/cluster/ErrorCheck.h
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/cluster/ErrorCheck.h?rev=821749&r1=821748&r2=821749&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/cluster/ErrorCheck.h (original)
+++ qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/cluster/ErrorCheck.h Mon Oct  5 10:47:52 2009
@@ -25,6 +25,7 @@
 #include "qpid/cluster/types.h"
 #include "qpid/cluster/Multicaster.h"
 #include "qpid/framing/enum.h"
+#include "qpid/framing/SequenceNumber.h"
 #include <boost/function.hpp>
 #include <deque>
 #include <set>
@@ -49,11 +50,12 @@
   public:
     typedef std::set<MemberId> MemberSet;
     typedef framing::cluster::ErrorType ErrorType;
+    typedef framing::SequenceNumber SequenceNumber;
     
     ErrorCheck(Cluster&);
 
     /** A local error has occured */
-    void error(Connection&, ErrorType, uint64_t frameSeq, const MemberSet&,
+    void error(Connection&, ErrorType, SequenceNumber frameSeq, const MemberSet&,
                const std::string& msg);
 
     /** Called when a frame is delivered */
@@ -66,7 +68,8 @@
 
     bool isUnresolved() const { return type != NONE; }
 
-
+    /** Respond to an error check saying we had no error. */
+    void respondNone(const MemberId&, uint8_t type, SequenceNumber frameSeq);
     
   private:
     static const ErrorType NONE = framing::cluster::ERROR_TYPE_NONE;
@@ -78,7 +81,7 @@
     Multicaster& mcast;
     FrameQueue frames;
     MemberSet unresolved;
-    uint64_t frameSeq;
+    SequenceNumber frameSeq;
     ErrorType type;
     Connection* connection;
 };

Modified: qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/cluster/ExpiryPolicy.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/cluster/ExpiryPolicy.cpp?rev=821749&r1=821748&r2=821749&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/cluster/ExpiryPolicy.cpp (original)
+++ qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/cluster/ExpiryPolicy.cpp Mon Oct  5 10:47:52 2009
@@ -19,21 +19,21 @@
  *
  */
 
+#include "qpid/broker/Message.h"
 #include "qpid/cluster/ExpiryPolicy.h"
 #include "qpid/cluster/Multicaster.h"
 #include "qpid/framing/ClusterMessageExpiredBody.h"
 #include "qpid/sys/Time.h"
-#include "qpid/broker/Message.h"
-#include "qpid/broker/Timer.h"
+#include "qpid/sys/Timer.h"
 #include "qpid/log/Statement.h"
 
 namespace qpid {
 namespace cluster {
 
-ExpiryPolicy::ExpiryPolicy(Multicaster& m, const MemberId& id, broker::Timer& t)
+ExpiryPolicy::ExpiryPolicy(Multicaster& m, const MemberId& id, sys::Timer& t)
     : expiryId(0), expiredPolicy(new Expired), mcast(m), memberId(id), timer(t) {}
 
-struct ExpiryTask : public broker::TimerTask {
+struct ExpiryTask : public sys::TimerTask {
     ExpiryTask(const boost::intrusive_ptr<ExpiryPolicy>& policy, uint64_t id, sys::AbsTime when)
         : TimerTask(when), expiryPolicy(policy), expiryId(id) {}
     void fire() { expiryPolicy->sendExpire(expiryId); }

Modified: qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/cluster/ExpiryPolicy.h
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/cluster/ExpiryPolicy.h?rev=821749&r1=821748&r2=821749&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/cluster/ExpiryPolicy.h (original)
+++ qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/cluster/ExpiryPolicy.h Mon Oct  5 10:47:52 2009
@@ -33,10 +33,13 @@
 namespace qpid {
 
 namespace broker {
-class Timer;
 class Message;
 }
 
+namespace sys {
+class Timer;
+}
+
 namespace cluster {
 class Multicaster;
 
@@ -46,7 +49,7 @@
 class ExpiryPolicy : public broker::ExpiryPolicy
 {
   public:
-    ExpiryPolicy(Multicaster&, const MemberId&, broker::Timer&);
+    ExpiryPolicy(Multicaster&, const MemberId&, sys::Timer&);
 
     void willExpire(broker::Message&);
     bool hasExpired(broker::Message&);
@@ -78,7 +81,7 @@
     boost::intrusive_ptr<Expired> expiredPolicy;
     Multicaster& mcast;
     MemberId memberId;
-    broker::Timer& timer;
+    sys::Timer& timer;
 };
 
 }} // namespace qpid::cluster

Modified: qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/framing/AMQFrame.h
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/framing/AMQFrame.h?rev=821749&r1=821748&r2=821749&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/framing/AMQFrame.h (original)
+++ qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/framing/AMQFrame.h Mon Oct  5 10:47:52 2009
@@ -46,8 +46,8 @@
     QPID_COMMON_EXTERN AMQBody* getBody();
     QPID_COMMON_EXTERN const AMQBody* getBody() const;
 
-    AMQMethodBody* getMethod() { return getBody()->getMethod(); }
-    const AMQMethodBody* getMethod() const { return getBody()->getMethod(); }
+    AMQMethodBody* getMethod() { return getBody() ? getBody()->getMethod() : 0; }
+    const AMQMethodBody* getMethod() const { return getBody() ? getBody()->getMethod() : 0; }
 
     void setMethod(ClassId c, MethodId m);
 

Modified: qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/management/ManagementAgent.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/management/ManagementAgent.cpp?rev=821749&r1=821748&r2=821749&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/management/ManagementAgent.cpp (original)
+++ qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/management/ManagementAgent.cpp Mon Oct  5 10:47:52 2009
@@ -62,7 +62,6 @@
 
 ManagementAgent::~ManagementAgent ()
 {
-    timer.stop();
     {
         Mutex::ScopedLock lock (userLock);
 
@@ -90,9 +89,10 @@
     dataDir        = _dataDir;
     interval       = _interval;
     broker         = _broker;
+    timer          = &_broker->getTimer();
     threadPoolSize = _threads;
     ManagementObject::maxThreads = threadPoolSize;
-    timer.add (intrusive_ptr<TimerTask> (new Periodic(*this, interval)));
+    timer->add (new Periodic(*this, interval));
 
     // Get from file or generate and save to file.
     if (dataDir.empty())
@@ -219,7 +219,7 @@
 
 void ManagementAgent::Periodic::fire ()
 {
-    agent.timer.add (intrusive_ptr<TimerTask> (new Periodic (agent, agent.interval)));
+    agent.timer->add (new Periodic (agent, agent.interval));
     agent.periodicProcessing ();
 }
 

Modified: qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/management/ManagementAgent.h
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/management/ManagementAgent.h?rev=821749&r1=821748&r2=821749&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/management/ManagementAgent.h (original)
+++ qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/management/ManagementAgent.h Mon Oct  5 10:47:52 2009
@@ -24,9 +24,9 @@
 #include "qpid/broker/BrokerImportExport.h"
 #include "qpid/Options.h"
 #include "qpid/broker/Exchange.h"
-#include "qpid/broker/Timer.h"
 #include "qpid/framing/Uuid.h"
 #include "qpid/sys/Mutex.h"
+#include "qpid/sys/Timer.h"
 #include "qpid/broker/ConnectionToken.h"
 #include "qpid/management/ManagementObject.h"
 #include "qpid/management/ManagementEvent.h"
@@ -98,7 +98,7 @@
     void disallow(const std::string& className, const std::string& methodName, const std::string& message);
                   
 private:
-    struct Periodic : public qpid::broker::TimerTask
+    struct Periodic : public qpid::sys::TimerTask
     {
         ManagementAgent& agent;
 
@@ -183,12 +183,12 @@
     framing::Uuid                uuid;
     sys::Mutex                   addLock;
     sys::Mutex                   userLock;
-    qpid::broker::Timer          timer;
     qpid::broker::Exchange::shared_ptr mExchange;
     qpid::broker::Exchange::shared_ptr dExchange;
     std::string                  dataDir;
     uint16_t                     interval;
     qpid::broker::Broker*        broker;
+    qpid::sys::Timer*            timer;
     uint16_t                     bootSequence;
     uint32_t                     nextObjectId;
     uint32_t                     brokerBank;

Modified: qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/replication/ReplicatingEventListener.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/replication/ReplicatingEventListener.cpp?rev=821749&r1=821748&r2=821749&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/replication/ReplicatingEventListener.cpp (original)
+++ qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/replication/ReplicatingEventListener.cpp Mon Oct  5 10:47:52 2009
@@ -72,6 +72,7 @@
     FieldTable& headers = msg->getProperties<MessageProperties>()->getApplicationHeaders();
     headers.setString(REPLICATION_TARGET_QUEUE, enqueued.queue->getName());
     headers.setInt(REPLICATION_EVENT_TYPE, ENQUEUE);
+    headers.setInt(QUEUE_MESSAGE_POSITION,enqueued.position);
     route(msg);
 }
 

Modified: qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/replication/ReplicationExchange.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/replication/ReplicationExchange.cpp?rev=821749&r1=821748&r2=821749&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/replication/ReplicationExchange.cpp (original)
+++ qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/replication/ReplicationExchange.cpp Mon Oct  5 10:47:52 2009
@@ -83,15 +83,29 @@
     std::string queueName = args->getAsString(REPLICATION_TARGET_QUEUE);
     Queue::shared_ptr queue = queues.find(queueName);
     if (queue) {
-        FieldTable& headers = msg.getMessage().getProperties<MessageProperties>()->getApplicationHeaders();
-        headers.erase(REPLICATION_TARGET_QUEUE);
-        headers.erase(REPLICATION_EVENT_SEQNO);
-        headers.erase(REPLICATION_EVENT_TYPE);
-        msg.deliverTo(queue);
-        QPID_LOG(debug, "Enqueued replicated message onto " << queueName);
-        if (mgmtExchange != 0) {
-            mgmtExchange->inc_msgRoutes();
-            mgmtExchange->inc_byteRoutes( msg.contentSize());
+
+        SequenceNumber seqno1(args->getAsInt(QUEUE_MESSAGE_POSITION));
+
+        // note that queue will ++ before enqueue.      
+        if (queue->getPosition() > --seqno1) // test queue.pos < seqnumber
+        {
+            QPID_LOG(error, "Cannot enqueue replicated message. Destination Queue " << queueName << " ahead of source queue");
+            mgmtExchange->inc_msgDrops();
+            mgmtExchange->inc_byteDrops(msg.contentSize());
+        } else {
+            queue->setPosition(seqno1);  
+
+            FieldTable& headers = msg.getMessage().getProperties<MessageProperties>()->getApplicationHeaders();
+            headers.erase(REPLICATION_TARGET_QUEUE);
+            headers.erase(REPLICATION_EVENT_SEQNO);
+            headers.erase(REPLICATION_EVENT_TYPE);
+            headers.erase(QUEUE_MESSAGE_POSITION);
+            msg.deliverTo(queue);
+            QPID_LOG(debug, "Enqueued replicated message onto " << queueName);
+            if (mgmtExchange != 0) {
+                mgmtExchange->inc_msgRoutes();
+                mgmtExchange->inc_byteRoutes( msg.contentSize());
+            }
         }
     } else {
         QPID_LOG(error, "Cannot enqueue replicated message. Queue " << queueName << " does not exist");

Modified: qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/replication/constants.h
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/replication/constants.h?rev=821749&r1=821748&r2=821749&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/replication/constants.h (original)
+++ qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/replication/constants.h Mon Oct  5 10:47:52 2009
@@ -26,6 +26,7 @@
 const std::string REPLICATION_EVENT_SEQNO("qpid.replication.seqno");
 const std::string REPLICATION_TARGET_QUEUE("qpid.replication.target_queue");
 const std::string DEQUEUED_MESSAGE_POSITION("qpid.replication.message");
+const std::string QUEUE_MESSAGE_POSITION("qpid.replication.queue.position");
 
 const int ENQUEUE(1);
 const int DEQUEUE(2);

Modified: qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/sys/Timer.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/sys/Timer.cpp?rev=821749&r1=821748&r2=821749&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/sys/Timer.cpp (original)
+++ qpid/branches/java-broker-0-10/qpid/cpp/src/qpid/sys/Timer.cpp Mon Oct  5 10:47:52 2009
@@ -20,7 +20,8 @@
  */
 #include "qpid/sys/Timer.h"
 #include "qpid/sys/Mutex.h"
-#include <iostream>
+#include "qpid/log/Statement.h"
+
 #include <numeric>
 
 using boost::intrusive_ptr;
@@ -58,6 +59,8 @@
     if (period && readyToFire()) {
         nextFireTime = AbsTime(nextFireTime, period);
         cancelled = false;
+    } else {
+        QPID_LOG(error, "Couldn't setup next timer firing: " << Duration(nextFireTime, AbsTime::now()) << "[" << period << "]");
     }
 }
 
@@ -91,24 +94,39 @@
         } else {
             intrusive_ptr<TimerTask> t = tasks.top();
             tasks.pop();
+            assert(!(t->nextFireTime < t->sortTime));
+
+            // warn on extreme lateness
+            AbsTime start(AbsTime::now());
+            Duration late(t->sortTime, start);
+            if (late > 500 * TIME_MSEC) {
+                QPID_LOG(warning, "Timer delayed by " << late / TIME_MSEC << "ms");
+            }
             {
             ScopedLock<Mutex> l(t->callbackLock);
             if (t->cancelled) {
                 continue;
-            } else if(t->readyToFire()) {
+            } else if(Duration(t->nextFireTime, start) >= 0) {
                 Monitor::ScopedUnlock u(monitor);
                 t->fireTask();
+                // Warn on callback overrun
+                AbsTime end(AbsTime::now());
+                Duration overrun(tasks.top()->nextFireTime, end);
+                if (overrun > 1 * TIME_MSEC) {
+                    QPID_LOG(warning,
+                        "Timer callback overran by " << overrun / TIME_MSEC << "ms [taking "
+                        << Duration(start, end) << "]");
+                }
                 continue;
             } else {
                 // If the timer was adjusted into the future it might no longer
                 // be the next event, so push and then get top to make sure
                 // You can only push events into the future
-                assert(!(t->nextFireTime < t->sortTime));
                 t->sortTime = t->nextFireTime;
                 tasks.push(t);
             }
             }
-            monitor.wait(tasks.top()->nextFireTime);
+            monitor.wait(tasks.top()->sortTime);
         }
     }
 }

Modified: qpid/branches/java-broker-0-10/qpid/cpp/src/tests/Makefile.am
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/cpp/src/tests/Makefile.am?rev=821749&r1=821748&r2=821749&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/cpp/src/tests/Makefile.am (original)
+++ qpid/branches/java-broker-0-10/qpid/cpp/src/tests/Makefile.am Mon Oct  5 10:47:52 2009
@@ -65,6 +65,7 @@
 	$(lib_client) $(lib_broker) $(lib_console)
 
 unit_test_SOURCES= unit_test.cpp unit_test.h \
+	ClientSessionTest.cpp \
 	BrokerFixture.h SocketProxy.h \
 	exception_test.cpp \
 	RefCounted.cpp \
@@ -75,7 +76,6 @@
 	QueueOptionsTest.cpp \
 	InlineAllocator.cpp \
 	InlineVector.cpp \
-	ClientSessionTest.cpp \
 	SequenceSet.cpp \
 	StringUtils.cpp \
 	IncompleteMessageList.cpp \

Modified: qpid/branches/java-broker-0-10/qpid/cpp/src/tests/PartialFailure.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/cpp/src/tests/PartialFailure.cpp?rev=821749&r1=821748&r2=821749&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/cpp/src/tests/PartialFailure.cpp (original)
+++ qpid/branches/java-broker-0-10/qpid/cpp/src/tests/PartialFailure.cpp Mon Oct  5 10:47:52 2009
@@ -82,10 +82,31 @@
     c.subs.subscribe(c.lq, c.name);
 }
 
+// Handle near-simultaneous errors
+QPID_AUTO_TEST_CASE(testCoincidentErrors) {
+    ClusterFixture cluster(2, updateArgs, -1);
+    Client c0(cluster[0], "c0");
+    Client c1(cluster[1], "c1");
+
+    c0.session.queueDeclare("q", durable=true);
+    {
+        ScopedSuppressLogging allQuiet;
+        async(c0.session).messageTransfer(content=pMessage("TEST_STORE_DO: s0[exception]", "q"));
+        async(c1.session).messageTransfer(content=pMessage("TEST_STORE_DO: s1[exception]", "q"));
+
+        int alive=0;
+        try { Client c00(cluster[0], "c00"); ++alive; } catch (...) {}
+        try { Client c11(cluster[1], "c11"); ++alive; } catch (...) {}
+
+        BOOST_CHECK_EQUAL(alive, 1);
+    }
+}
+
+#if 0                           // FIXME aconway 2009-07-30:
 // Verify normal cluster-wide errors.
 QPID_AUTO_TEST_CASE(testNormalErrors) {
     // FIXME aconway 2009-04-10: Would like to put a scope just around
-    // the statements expected to fail (in BOOST_CHECK_THROW) but that
+    // the statements expected to fail (in BOOST_CHECK_yTHROW) but that
     // sproadically lets out messages, possibly because they're in
     // Connection thread.
 
@@ -96,7 +117,7 @@
 
     {
         ScopedSuppressLogging allQuiet;
-        queueAndSub(c0);
+        queueAndsub(c0);
         c0.session.messageTransfer(content=Message("x", "c0"));
         BOOST_CHECK_EQUAL(c0.lq.get(TIMEOUT).getData(), "x");
 
@@ -234,5 +255,5 @@
     }
 }
 #endif
-
+#endif  // FIXME aconway 2009-07-30:
 QPID_AUTO_TEST_SUITE_END()

Modified: qpid/branches/java-broker-0-10/qpid/cpp/src/tests/QueueTest.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/cpp/src/tests/QueueTest.cpp?rev=821749&r1=821748&r2=821749&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/cpp/src/tests/QueueTest.cpp (original)
+++ qpid/branches/java-broker-0-10/qpid/cpp/src/tests/QueueTest.cpp Mon Oct  5 10:47:52 2009
@@ -19,6 +19,7 @@
  *
  */
 #include "unit_test.h"
+#include "test_tools.h"
 #include "qpid/Exception.h"
 #include "qpid/broker/Broker.h"
 #include "qpid/broker/Queue.h"
@@ -275,11 +276,13 @@
 
     uint enqCnt;
     uint deqCnt;
+    bool error;
     
     virtual void dequeue(TransactionContext*,
                  const boost::intrusive_ptr<PersistableMessage>& /*msg*/,
                  const PersistableQueue& /*queue*/)
     {
+        if (error) throw Exception("Dequeue error test");
         deqCnt++;
     }
 
@@ -287,10 +290,16 @@
                  const boost::intrusive_ptr<PersistableMessage>& /*msg*/,
                  const PersistableQueue& /* queue */)
     {
+        if (error) throw Exception("Enqueue error test");
         enqCnt++;
     }
 
-    TestMessageStoreOC() : NullMessageStore(),enqCnt(0),deqCnt(0) {}
+    void createError()
+    {
+        error=true;
+    }
+    
+    TestMessageStoreOC() : NullMessageStore(),enqCnt(0),deqCnt(0),error(false) {}
     ~TestMessageStoreOC(){}
 };
 
@@ -689,6 +698,30 @@
 
 }
 
-QPID_AUTO_TEST_SUITE_END()
+QPID_AUTO_TEST_CASE(testLastNodeJournalError){
+/*
+simulate store excption going into last node standing
+
+*/
+    TestMessageStoreOC  testStore;
+    client::QueueOptions args;
+    // set queue mode
+    args.setPersistLastNode();
+
+    Queue::shared_ptr queue1(new Queue("my-queue", true, &testStore));
+    intrusive_ptr<Message> received;
+    queue1->configure(args);
+ 	
+    // check requeue 1
+    intrusive_ptr<Message> msg1 = create_message("e", "C");
+
+    queue1->deliver(msg1);
+    testStore.createError();
+    
+    ScopedSuppressLogging sl; // Suppress messages for expected errors.
+    queue1->setLastNodeFailure();
+    BOOST_CHECK_EQUAL(testStore.enqCnt, 0u);
+
+}QPID_AUTO_TEST_SUITE_END()
 
 

Modified: qpid/branches/java-broker-0-10/qpid/cpp/src/tests/replication_test
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/cpp/src/tests/replication_test?rev=821749&r1=821748&r2=821749&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/cpp/src/tests/replication_test (original)
+++ qpid/branches/java-broker-0-10/qpid/cpp/src/tests/replication_test Mon Oct  5 10:47:52 2009
@@ -56,10 +56,12 @@
     $PYTHON_DIR/commands/qpid-config -a "localhost:$BROKER_A" add queue queue-b --generate-queue-events 2
     $PYTHON_DIR/commands/qpid-config -a "localhost:$BROKER_A" add queue queue-c --generate-queue-events 1
     $PYTHON_DIR/commands/qpid-config -a "localhost:$BROKER_A" add queue queue-d --generate-queue-events 2
+    $PYTHON_DIR/commands/qpid-config -a "localhost:$BROKER_A" add queue queue-e --generate-queue-events 1
 
     $PYTHON_DIR/commands/qpid-config -a "localhost:$BROKER_B" add queue queue-a
     $PYTHON_DIR/commands/qpid-config -a "localhost:$BROKER_B" add queue queue-b
     $PYTHON_DIR/commands/qpid-config -a "localhost:$BROKER_B" add queue queue-c
+    $PYTHON_DIR/commands/qpid-config -a "localhost:$BROKER_B" add queue queue-e
     #queue-d deliberately not declared on DR; this error case should be handled
 
     #publish and consume from test queues on broker A:
@@ -89,6 +91,12 @@
     ./receiver --port $BROKER_A --queue queue-c --messages 10 > /dev/null
     ./receiver --port $BROKER_A --queue queue-d > /dev/null
     
+
+    # What we are doing is putting a message on the end of repliaction queue & waiting for it on remote side
+    # making sure all the messages have been flushed from the replication queue.
+    echo dummy | ./sender --port $BROKER_A --routing-key queue-e --send-eos 1
+    ./receiver --port $BROKER_B --queue queue-e --messages 1 > /dev/null
+
     #shutdown broker A then check that broker Bs versions of the queues are as expected
     ../qpidd -q --port $BROKER_A
     unset BROKER_A
@@ -98,7 +106,6 @@
     ./receiver --port $BROKER_B --queue queue-b > queue-b-backup.repl
     ./receiver --port $BROKER_B --queue queue-c > queue-c-backup.repl
 
-    stop_brokers
 
     tail -5 queue-a-input.repl > queue-a-expected.repl
     tail -10 queue-b-input.repl > queue-b-expected.repl
@@ -108,6 +115,60 @@
 
     grep 'queue-d does not exist' replication-dest.log > /dev/null || echo "WARNING: Expected error to be logged!"
 
+    stop_brokers
+
+    # now check offsets working (enqueue based on position being set, not queue abs position)
+    
+    ../qpidd --daemon --port 0 --no-data-dir --no-module-dir --auth no --load-module ../.libs/replicating_listener.so --replication-queue replication --create-replication-queue true  --log-enable info+ --log-to-file replication-source.log  --log-to-stderr 0 > qpidd.port 
+    BROKER_A=`cat qpidd.port`
+
+    ../qpidd --daemon --port 0 --no-data-dir --no-module-dir --auth no --load-module ../.libs/replication_exchange.so  --log-enable info+ --log-to-file replication-dest.log  --log-to-stderr 0 > qpidd.port
+    BROKER_B=`cat qpidd.port`
+
+    $PYTHON_DIR/commands/qpid-config -a "localhost:$BROKER_B" add exchange replication replication
+    $PYTHON_DIR/commands/qpid-route --ack 5 queue add "localhost:$BROKER_B" "localhost:$BROKER_A" replication replication
+
+    $PYTHON_DIR/commands/qpid-config -a "localhost:$BROKER_A" add queue queue-e --generate-queue-events 2
+    $PYTHON_DIR/commands/qpid-config -a "localhost:$BROKER_B" add queue queue-e
+    $PYTHON_DIR/commands/qpid-config -a "localhost:$BROKER_A" add queue queue-d --generate-queue-events 1
+    $PYTHON_DIR/commands/qpid-config -a "localhost:$BROKER_B" add queue queue-d
+
+    i=1
+    while [ $i -le 10 ]; do
+        echo Message $i for A >> queue-e-input.repl
+        i=`expr $i + 1`
+    done
+
+    ./sender --port $BROKER_A --routing-key queue-e --send-eos 1 < queue-e-input.repl
+    ./receiver --port $BROKER_A --queue queue-e --messages 10 > /dev/null
+    
+    # What we are doing is putting a message on the end of repliaction queue & waiting for it on remote side
+    # making sure all the messages have been flushed from the replication queue.
+    echo dummy | ./sender --port $BROKER_A --routing-key queue-d --send-eos 1
+    ./receiver --port $BROKER_B --queue queue-d --messages 1 > /dev/null
+
+    # now check offsets working
+    ../qpidd -q --port $BROKER_B
+    unset BROKER_B
+    ../qpidd --daemon --port 0 --no-data-dir --no-module-dir --auth no --load-module ../.libs/replication_exchange.so  --log-enable info+ --log-to-file replication-dest.log  --log-to-stderr 0 > qpidd.port
+    BROKER_B=`cat qpidd.port`
+
+    $PYTHON_DIR/commands/qpid-config -a "localhost:$BROKER_B" add queue queue-e
+    $PYTHON_DIR/commands/qpid-config -a "localhost:$BROKER_B" add exchange replication replication
+    $PYTHON_DIR/commands/qpid-route --ack 5 queue add "localhost:$BROKER_B" "localhost:$BROKER_A" replication replication
+    # now send another 15
+    i=11
+    while [ $i -le 15 ]; do
+        echo Message $i for A >> queue-e1-input.repl
+        i=`expr $i + 1`
+    done
+    ./sender --port $BROKER_A --routing-key queue-e --send-eos 1 < queue-e1-input.repl
+    
+    ./receiver --port $BROKER_B --queue queue-e > queue-e-backup.repl
+    diff queue-e-backup.repl queue-e1-input.repl || FAIL=1
+    
+    stop_brokers
+
     if [ x$FAIL != x ]; then
         echo replication test failed: expectations not met!
         exit 1

Modified: qpid/branches/java-broker-0-10/qpid/cpp/xml/cluster.xml
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/cpp/xml/cluster.xml?rev=821749&r1=821748&r2=821749&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/cpp/xml/cluster.xml (original)
+++ qpid/branches/java-broker-0-10/qpid/cpp/xml/cluster.xml Mon Oct  5 10:47:52 2009
@@ -68,7 +68,7 @@
     <!-- Check for error consistency across the cluster -->
     <control name="error-check" code="0x14">
       <field name="type" type="error-type"/>
-      <field name="frame-seq" type="uint64"/>
+      <field name="frame-seq" type="sequence-no"/>
     </control>
     
 
@@ -170,7 +170,7 @@
     <control name="membership" code="0x21" label="Cluster membership details.">
       <field name="joiners" type="map"/> <!-- member-id -> URL -->
       <field name="members" type="map"/> <!-- member-id -> state -->
-      <field name="frame-seq" type="uint64"/>	 <!-- frame sequence number -->
+      <field name="frame-seq" type="sequence-no"/> <!-- frame sequence number -->
     </control>
 
     <!-- Updater cannot fulfill an update offer. -->

Modified: qpid/branches/java-broker-0-10/qpid/gentools/templ.java/model/ProtocolVersionListClass.vm
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/gentools/templ.java/model/ProtocolVersionListClass.vm?rev=821749&r1=821748&r2=821749&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/gentools/templ.java/model/ProtocolVersionListClass.vm (original)
+++ qpid/branches/java-broker-0-10/qpid/gentools/templ.java/model/ProtocolVersionListClass.vm Mon Oct  5 10:47:52 2009
@@ -39,12 +39,14 @@
 {
     private final byte _majorVersion;
     private final byte _minorVersion;
+    private final String _stringFormat;
 
 
     public ProtocolVersion(byte majorVersion, byte minorVersion)
     {
         _majorVersion = majorVersion;
         _minorVersion = minorVersion;
+        _stringFormat = _majorVersion+"-"+_minorVersion;
     }
 
     public byte getMajorVersion()
@@ -57,6 +59,10 @@
         return _minorVersion;
     }
 
+    public String toString()
+    {
+        return _stringFormat;
+    }
 
     public int compareTo(Object o)
     {

Propchange: qpid/branches/java-broker-0-10/qpid/java/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Mon Oct  5 10:47:52 2009
@@ -1,2 +1,2 @@
 /qpid/trunk/qpid:796646-796653
-/qpid/trunk/qpid/java:796196-799240
+/qpid/trunk/qpid/java:796196-800440



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org