You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ac...@apache.org on 2008/10/20 21:37:06 UTC

svn commit: r706381 - in /incubator/qpid/trunk/qpid/cpp/src: qpid/broker/ qpid/client/ qpid/cluster/ tests/

Author: aconway
Date: Mon Oct 20 12:37:06 2008
New Revision: 706381

URL: http://svn.apache.org/viewvc?rev=706381&view=rev
Log:
cluster: DumpClient replicates session MessageBuilder.

Modified:
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/IncompleteMessageList.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/IncompleteMessageList.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/client/SessionBase_0_10.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/client/SessionBase_0_10.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterPlugin.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/DumpClient.cpp
    incubator/qpid/trunk/qpid/cpp/src/tests/ais_check
    incubator/qpid/trunk/qpid/cpp/src/tests/cluster_test.cpp

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/IncompleteMessageList.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/IncompleteMessageList.cpp?rev=706381&r1=706380&r2=706381&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/IncompleteMessageList.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/IncompleteMessageList.cpp Mon Oct 20 12:37:06 2008
@@ -30,7 +30,7 @@
     incomplete.push_back(msg);
 }
 
-void IncompleteMessageList::process(CompletionListener l, bool sync)
+void IncompleteMessageList::process(const CompletionListener& l, bool sync)
 {
     while (!incomplete.empty()) {
         boost::intrusive_ptr<Message>& msg = incomplete.front();
@@ -48,4 +48,8 @@
     }
 }
 
+void IncompleteMessageList::each(const CompletionListener& l) {
+    std::for_each(incomplete.begin(), incomplete.end(), l);
+}
+
 }}

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/IncompleteMessageList.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/IncompleteMessageList.h?rev=706381&r1=706380&r2=706381&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/IncompleteMessageList.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/IncompleteMessageList.h Mon Oct 20 12:37:06 2008
@@ -39,7 +39,8 @@
     typedef boost::function<void(boost::intrusive_ptr<Message>)> CompletionListener;    
 
     void add(boost::intrusive_ptr<Message> msg);
-    void process(CompletionListener l, bool sync);
+    void process(const CompletionListener& l, bool sync);
+    void each(const CompletionListener& l);
 };
 
 

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.h?rev=706381&r1=706380&r2=706381&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.h Mon Oct 20 12:37:06 2008
@@ -102,7 +102,9 @@
 
     template <class F> void eachConsumer(F f) { semanticState.eachConsumer(f); }
     SemanticState::ConsumerImpl& getConsumer(const string& dest) { return semanticState.find(dest); } 
-    
+
+    boost::intrusive_ptr<Message> getMessageInProgress() { return msgBuilder.getMessage(); }
+
   private:
 
     void handleCommand(framing::AMQMethodBody* method, const framing::SequenceNumber& id);

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/SessionBase_0_10.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/SessionBase_0_10.cpp?rev=706381&r1=706380&r2=706381&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/SessionBase_0_10.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/SessionBase_0_10.cpp Mon Oct 20 12:37:06 2008
@@ -59,6 +59,8 @@
     impl->sendCompletion();
 }
 
+uint16_t SessionBase_0_10::getChannel() const { return impl->getChannel(); }
+
 void SessionBase_0_10::suspend() { impl->suspend(); }
 void SessionBase_0_10::resume(Connection c) { impl->resume(c.impl); }
 uint32_t SessionBase_0_10::timeout(uint32_t seconds) { return impl->setTimeout(seconds); }

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/SessionBase_0_10.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/SessionBase_0_10.h?rev=706381&r1=706380&r2=706381&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/SessionBase_0_10.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/SessionBase_0_10.h Mon Oct 20 12:37:06 2008
@@ -99,6 +99,9 @@
     /** Resume a suspended session with a new connection */
     void resume(Connection);
 
+    /** Get the channel associated with this session */
+    uint16_t getChannel() const;
+
     Execution& getExecution();  
     void flush();
     void markCompleted(const framing::SequenceNumber& id, bool cumulative, bool notifyPeer);

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterPlugin.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterPlugin.cpp?rev=706381&r1=706380&r2=706381&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterPlugin.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterPlugin.cpp Mon Oct 20 12:37:06 2008
@@ -78,7 +78,7 @@
         if (values.name.empty()) return; // Only if --cluster-name option was specified.
         Broker* broker = dynamic_cast<Broker*>(&target);
         if (!broker) return;
-        cluster = new Cluster(values.name, values.getUrl(broker->getPort()), *broker);
+        cluster = new Cluster(values.name, values.getUrl(broker->getPort(Broker::TCP_TRANSPORT)), *broker);
         broker->setConnectionFactory(
             boost::shared_ptr<sys::ConnectionCodec::Factory>(
                 new ConnectionCodec::Factory(broker->getConnectionFactory(), *cluster)));

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/DumpClient.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/DumpClient.cpp?rev=706381&r1=706380&r2=706381&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/DumpClient.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/DumpClient.cpp Mon Oct 20 12:37:06 2008
@@ -166,11 +166,9 @@
     shadowConnection = catchUpConnection();
 
     broker::Connection& bc = dumpConnection->getBrokerConnection();
-    // FIXME aconway 2008-09-19: Open with identical settings to dumpConnection: password, vhost, frame size,
-    // authentication etc. See ConnectionSettings.
-    shadowConnection.open(dumpeeUrl, bc.getUserId());
-
-    dumpConnection->getBrokerConnection().eachSessionHandler(boost::bind(&DumpClient::dumpSession, this, _1));
+    // FIXME aconway 2008-10-20: What authentication info to reconnect?
+    shadowConnection.open(dumpeeUrl, bc.getUserId(), ""/*password*/, "/"/*vhost*/, bc.getFrameMax());
+    bc.eachSessionHandler(boost::bind(&DumpClient::dumpSession, this, _1));
     ClusterConnectionProxy(shadowConnection).shadowReady(
         dumpConnection->getId().getMember(),
         reinterpret_cast<uint64_t>(dumpConnection->getId().getPointer()));
@@ -194,20 +192,30 @@
 
     // For reasons unknown, boost::bind does not work here with boost 1.33.
     ss->eachConsumer(std::bind1st(std::mem_fun(&DumpClient::dumpConsumer),this));
-    
-    // FIXME aconway 2008-09-19: update remaining session state.
+
+    boost::intrusive_ptr<Message> inProgress = ss->getMessageInProgress();
+
+    //  Adjust for message in progress, will be sent after state update.
+    SequenceNumber received = ss->receiverGetReceived().command;
+    if (inProgress)  
+        --received;
 
     // Reset command-sequence state.
     proxy.sessionState(
         ss->senderGetReplayPoint().command,
         ss->senderGetCommandPoint().command,
         ss->senderGetIncomplete(),
-        ss->receiverGetExpected().command,
-        ss->receiverGetReceived().command,
+        std::max(received, ss->receiverGetExpected().command),
+        received,
         ss->receiverGetUnknownComplete(),
         ss->receiverGetIncomplete()
     );
 
+    // Send frames for partial message in progress.
+    if (inProgress) {
+        inProgress->getFrames().map(simpl->out);
+    }
+
     // FIXME aconway 2008-09-23: update session replay list.
 
     QPID_LOG(debug, dumperId << " dumped session " << sh.getSession()->getId());

Modified: incubator/qpid/trunk/qpid/cpp/src/tests/ais_check
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/tests/ais_check?rev=706381&r1=706380&r2=706381&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/tests/ais_check (original)
+++ incubator/qpid/trunk/qpid/cpp/src/tests/ais_check Mon Oct 20 12:37:06 2008
@@ -20,7 +20,7 @@
 
 srcdir=`dirname $0`
 
-# Check AIS requirements tests if found.
+# Check AIS requirements and run tests if found.
 id -nG | grep '\<ais\>' >/dev/null || \
     NOGROUP="You are not a member of the ais group."
 ps -u root | grep aisexec >/dev/null || \

Modified: incubator/qpid/trunk/qpid/cpp/src/tests/cluster_test.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/tests/cluster_test.cpp?rev=706381&r1=706380&r2=706381&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/tests/cluster_test.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/tests/cluster_test.cpp Mon Oct 20 12:37:06 2008
@@ -31,6 +31,7 @@
 #include "qpid/framing/AMQBody.h"
 #include "qpid/framing/Uuid.h"
 #include "qpid/framing/reply_exceptions.h"
+#include "qpid/framing/enum.h"
 #include "qpid/log/Logger.h"
 
 #include <boost/bind.hpp>
@@ -201,6 +202,59 @@
     return s;
 }
 
+class Sender {
+  public:
+    Sender(boost::shared_ptr<ConnectionImpl> ci, uint16_t ch) : connection(ci), channel(ch) {}
+    void send(const AMQBody& body, bool firstSeg, bool lastSeg, bool firstFrame, bool lastFrame) {
+        AMQFrame f(body);
+        f.setChannel(channel);
+        f.setFirstSegment(firstSeg);
+        f.setLastSegment(lastSeg);
+        f.setFirstFrame(firstFrame);
+        f.setLastFrame(lastFrame);
+        connection->handle(f);
+    }
+
+  private:
+    boost::shared_ptr<ConnectionImpl> connection;
+    uint16_t channel;
+};
+
+QPID_AUTO_TEST_CASE(testDumpMessageBuilder) {
+    // Verify that we dump a partially recieved message to a new member.
+    ClusterFixture cluster(1);    
+    Client c0(cluster[0], "c0");
+    c0.session.queueDeclare("q");
+    Sender sender(ConnectionAccess::getImpl(c0.connection), c0.session.getChannel());
+
+    // Send first 2 frames of message.
+    MessageTransferBody transfer(
+        ProtocolVersion(), std::string(), // default exchange.
+        framing::message::ACCEPT_MODE_NONE,
+        framing::message::ACQUIRE_MODE_PRE_ACQUIRED);
+    sender.send(transfer, true, false, true, true);
+    AMQHeaderBody header;
+    header.get<DeliveryProperties>(true)->setRoutingKey("q");
+    sender.send(header, false, false, true, true);
+
+    // No reliable way to ensure the partial message has arrived
+    // before we start the new broker, so we sleep.
+    ::usleep(250); 
+    cluster.add();
+
+    // Send final 2 frames of message.
+    sender.send(AMQContentBody("ab"), false, true, true, false);
+    sender.send(AMQContentBody("cd"), false, true, false, true);
+    
+    // Verify message is enqued correctly on second member.
+    Message m;
+    Client c1(cluster[1], "c1");
+    BOOST_CHECK(c1.subs.get(m, "q", TIME_SEC));
+    BOOST_CHECK_EQUAL(m.getData(), "abcd");
+
+    BOOST_CHECK_EQUAL(2u, getGlobalCluster().getUrls().size());
+}
+
 QPID_AUTO_TEST_CASE(testConnectionKnownHosts) {
     ClusterFixture cluster(1);
     Client c0(cluster[0], "c0");