You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by gs...@apache.org on 2008/11/14 17:40:22 UTC

svn commit: r714065 - in /incubator/qpid/trunk/qpid/cpp/src: qpid/client/ tests/

Author: gsim
Date: Fri Nov 14 08:40:22 2008
New Revision: 714065

URL: http://svn.apache.org/viewvc?rev=714065&view=rev
Log:
Added some failover capable tests
Added grantCredit() method to subscription to allow simpler control of message delivery


Added:
    incubator/qpid/trunk/qpid/cpp/src/tests/receiver.cpp   (with props)
    incubator/qpid/trunk/qpid/cpp/src/tests/sender.cpp   (with props)
    incubator/qpid/trunk/qpid/cpp/src/tests/txjob.cpp   (with props)
    incubator/qpid/trunk/qpid/cpp/src/tests/txshift.cpp   (with props)
Modified:
    incubator/qpid/trunk/qpid/cpp/src/qpid/client/Subscription.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/client/Subscription.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/client/SubscriptionImpl.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/client/SubscriptionImpl.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/client/SubscriptionManager.h
    incubator/qpid/trunk/qpid/cpp/src/tests/   (props changed)
    incubator/qpid/trunk/qpid/cpp/src/tests/Makefile.am

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/Subscription.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/Subscription.cpp?rev=714065&r1=714064&r2=714065&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/Subscription.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/Subscription.cpp Fri Nov 14 08:40:22 2008
@@ -22,6 +22,7 @@
 #include "Subscription.h"
 #include "SubscriptionImpl.h"
 #include "HandlePrivate.h"
+#include "qpid/framing/enum.h"
 
 namespace qpid {
 namespace client {
@@ -42,7 +43,8 @@
 Session Subscription::getSession() const { return impl->getSession(); }
 SubscriptionManager&Subscription:: getSubscriptionManager() const { return impl->getSubscriptionManager(); }
 void Subscription::cancel() { impl->cancel(); }
-
+void Subscription::grantMessageCredit(uint32_t value) { impl->grantCredit(framing::message::CREDIT_UNIT_MESSAGE, value); }
+void Subscription::grantByteCredit(uint32_t value) { impl->grantCredit(framing::message::CREDIT_UNIT_BYTE, value); }
 }} // namespace qpid::client
 
 

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/Subscription.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/Subscription.h?rev=714065&r1=714064&r2=714065&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/Subscription.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/Subscription.h Fri Nov 14 08:40:22 2008
@@ -101,6 +101,12 @@
 
     /** Cancel the subscription. */
     void cancel();
+
+    /** Grant the specified amount of message credit */
+    void grantMessageCredit(uint32_t);
+
+    /** Grant the specified amount of byte credit */
+    void grantByteCredit(uint32_t);
 };
 }} // namespace qpid::client
 

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/SubscriptionImpl.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/SubscriptionImpl.cpp?rev=714065&r1=714064&r2=714065&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/SubscriptionImpl.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/SubscriptionImpl.cpp Fri Nov 14 08:40:22 2008
@@ -62,6 +62,10 @@
     s.sync();
 }
 
+void SubscriptionImpl::grantCredit(framing::message::CreditUnit unit, uint32_t value) {
+    async(manager.getSession()).messageFlow(name, unit, value);
+}
+
 void SubscriptionImpl::setAutoAck(size_t n) {
     Mutex::ScopedLock l(lock);
     settings.autoAck = n;
@@ -103,7 +107,7 @@
 
 Session SubscriptionImpl::getSession() const { return manager.getSession(); }
 
-SubscriptionManager&SubscriptionImpl:: getSubscriptionManager() const { return manager; }
+SubscriptionManager& SubscriptionImpl::getSubscriptionManager() const { return manager; }
 
 void SubscriptionImpl::cancel() { manager.cancel(name); }
 

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/SubscriptionImpl.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/SubscriptionImpl.h?rev=714065&r1=714064&r2=714065&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/SubscriptionImpl.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/SubscriptionImpl.h Fri Nov 14 08:40:22 2008
@@ -25,6 +25,7 @@
 #include "qpid/client/SubscriptionSettings.h"
 #include "qpid/client/Session.h"
 #include "qpid/client/MessageListener.h"
+#include "qpid/framing/enum.h"
 #include "qpid/framing/SequenceSet.h"
 #include "qpid/sys/Mutex.h"
 #include "qpid/RefCounted.h"
@@ -88,6 +89,9 @@
     /** Cancel the subscription. */
     void cancel();
 
+    /** Grant specified credit for this subscription **/
+    void grantCredit(framing::message::CreditUnit unit, uint32_t value);
+
     void received(Message&);
     
   private:

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/SubscriptionManager.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/SubscriptionManager.h?rev=714065&r1=714064&r2=714065&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/SubscriptionManager.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/SubscriptionManager.h Fri Nov 14 08:40:22 2008
@@ -92,16 +92,6 @@
  * </li>
  * </ul>
  * 
- * 
- * <h2>Setting Accept Mode, Acquire Mode, Ack Policy</h2>
- * 
- * <p>setAcceptMode()</p>
- * <pre>subscriptions.setAcceptMode(true);</pre>
- * <p>setAcquireMode()</p>
- * <pre>subscriptions.setAcquireMode(false);</pre>
- * 
- * 
- * 
  */
 class SubscriptionManager : public sys::Runnable
 {

Propchange: incubator/qpid/trunk/qpid/cpp/src/tests/
------------------------------------------------------------------------------
--- svn:ignore (original)
+++ svn:ignore Fri Nov 14 08:40:22 2008
@@ -26,3 +26,8 @@
 cert.password
 test_cert_db
 header_test
+receiver
+sender
+txshift
+txjob
+

Modified: incubator/qpid/trunk/qpid/cpp/src/tests/Makefile.am
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/tests/Makefile.am?rev=714065&r1=714064&r2=714065&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/tests/Makefile.am (original)
+++ incubator/qpid/trunk/qpid/cpp/src/tests/Makefile.am Fri Nov 14 08:40:22 2008
@@ -134,6 +134,22 @@
 header_test_SOURCES=header_test.cpp TestOptions.h ConnectionOptions.h
 header_test_LDADD=$(lib_client) 
 
+check_PROGRAMS+=txshift
+txshift_SOURCES=txshift.cpp  TestOptions.h ConnectionOptions.h
+txshift_LDADD=$(lib_client) 
+
+check_PROGRAMS+=txjob
+txjob_SOURCES=txjob.cpp  TestOptions.h ConnectionOptions.h
+txjob_LDADD=$(lib_client) 
+
+check_PROGRAMS+=receiver
+receiver_SOURCES=receiver.cpp  TestOptions.h ConnectionOptions.h
+receiver_LDADD=$(lib_client) 
+
+check_PROGRAMS+=sender
+sender_SOURCES=sender.cpp  TestOptions.h ConnectionOptions.h
+sender_LDADD=$(lib_client) 
+
 TESTS_ENVIRONMENT = VALGRIND=$(VALGRIND) srcdir=$(srcdir) QPID_DATA_DIR= BOOST_TEST_SHOW_PROGRESS=yes $(srcdir)/run_test 
 
 system_tests = client_test quick_perftest quick_topictest run_header_test quick_txtest

Added: incubator/qpid/trunk/qpid/cpp/src/tests/receiver.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/tests/receiver.cpp?rev=714065&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/tests/receiver.cpp (added)
+++ incubator/qpid/trunk/qpid/cpp/src/tests/receiver.cpp Fri Nov 14 08:40:22 2008
@@ -0,0 +1,120 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include <qpid/client/FailoverManager.h>
+#include <qpid/client/Session.h>
+#include <qpid/client/Message.h>
+#include <qpid/client/SubscriptionManager.h>
+#include <qpid/client/SubscriptionManager.h>
+#include "TestOptions.h"
+
+#include <iostream>
+#include <fstream>
+
+
+using namespace qpid;
+using namespace qpid::client;
+using namespace qpid::framing;
+
+using namespace std;
+
+struct Args : public qpid::TestOptions 
+{
+    string queue;
+    uint messages;
+    bool ignoreDuplicates;
+
+    Args() : queue("test-queue"), messages(0), ignoreDuplicates(false)
+    {
+        addOptions()            
+            ("queue", qpid::optValue(queue, "QUEUE NAME"), "Queue from which to request messages")
+            ("messages", qpid::optValue(messages, "N"), "Number of messages to receive; 0 means receive indefinitely")
+            ("ignore-duplicates", qpid::optValue(ignoreDuplicates), "Detect and ignore duplicates (by checking 'sn' header)");
+    }
+};
+
+const string EOS("eos");
+
+class Receiver : public MessageListener, public FailoverManager::Command
+{
+  public:
+    Receiver(const string& queue, uint messages, bool ignoreDuplicates);
+    void received(Message& message);
+    void execute(AsyncSession& session, bool isRetry);
+  private:
+    const string queue;
+    const uint count;
+    const bool skipDups;
+    Subscription subscription;
+    uint processed;
+    uint lastSn;
+
+    bool isDuplicate(Message& message);
+};
+
+Receiver::Receiver(const string& q, uint messages, bool ignoreDuplicates) : 
+    queue(q), count(messages), skipDups(ignoreDuplicates), processed(0), lastSn(0) {}
+
+void Receiver::received(Message & message) 
+{
+    if (!(skipDups && isDuplicate(message))) {
+        bool eos = message.getData() == EOS;
+        if (!eos) std::cout << message.getData() << std::endl;
+        if (eos || ++processed == count) subscription.cancel();
+    }
+}
+
+bool Receiver::isDuplicate(Message& message) 
+{
+    uint sn = message.getHeaders().getAsInt("sn");
+    if (lastSn < sn) {
+        lastSn = sn;
+        return false;
+    } else {
+        return true;
+    }
+}
+
+void Receiver::execute(AsyncSession& session, bool /*isRetry*/)
+{
+    SubscriptionManager subs(session);
+    subscription = subs.subscribe(*this, queue);
+    subs.run();
+}
+
+int main(int argc, char ** argv)
+{
+    Args opts;
+    try {
+        opts.parse(argc, argv);
+        FailoverManager connection(opts.con);
+        Receiver receiver(opts.queue, opts.messages, opts.ignoreDuplicates);
+        connection.execute(receiver);
+        connection.close();
+        return 0;
+    } catch(const std::exception& error) {
+        std::cerr << "Failure: " << error.what() << std::endl;
+    }
+    return 1;
+}
+
+
+

Propchange: incubator/qpid/trunk/qpid/cpp/src/tests/receiver.cpp
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/qpid/trunk/qpid/cpp/src/tests/receiver.cpp
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Added: incubator/qpid/trunk/qpid/cpp/src/tests/sender.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/tests/sender.cpp?rev=714065&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/tests/sender.cpp (added)
+++ incubator/qpid/trunk/qpid/cpp/src/tests/sender.cpp Fri Nov 14 08:40:22 2008
@@ -0,0 +1,100 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include <qpid/client/FailoverManager.h>
+#include <qpid/client/Session.h>
+#include <qpid/client/AsyncSession.h>
+#include <qpid/client/Message.h>
+#include <qpid/client/MessageReplayTracker.h>
+#include <qpid/Exception.h>
+#include "TestOptions.h"
+
+#include <iostream>
+
+using namespace qpid;
+using namespace qpid::client;
+using namespace qpid::framing;
+
+using namespace std;
+
+struct Args : public qpid::TestOptions 
+{
+    string destination;
+    string key;
+    bool sendEos;
+
+    Args() : key("test-queue"), sendEos(false)
+    {
+        addOptions()            
+            ("exchange", qpid::optValue(destination, "EXCHANGE"), "Exchange to send messages to")
+            ("routing-key", qpid::optValue(key, "KEY"), "Routing key to add to messages")
+            ("send-eos", qpid::optValue(sendEos), "Send EOS message to mark end of input");
+    }
+};
+
+const string EOS("eos");
+
+class Sender : public FailoverManager::Command
+{
+  public:
+    Sender(const std::string& destination, const std::string& key, bool sendEos);
+    void execute(AsyncSession& session, bool isRetry);
+  private:
+    MessageReplayTracker sender;
+    Message message;  
+    const bool sendEos;
+    uint sent;
+};
+
+Sender::Sender(const std::string& destination, const std::string& key, bool eos) : 
+    sender(10), message(destination, key), sendEos(eos), sent(0) {}
+
+void Sender::execute(AsyncSession& session, bool isRetry)
+{
+    if (isRetry) sender.replay(session);
+    else sender.init(session);
+    string data;
+    while (std::cin >> data) {
+        message.setData(data);
+        message.getHeaders().setInt("sn", ++sent);
+        sender.send(message);
+    }
+    if (sendEos) {
+        message.setData(EOS);
+        sender.send(message);
+    }
+}
+
+int main(int argc, char ** argv) 
+{
+    Args opts;
+    try {
+        opts.parse(argc, argv);
+        FailoverManager connection(opts.con);
+        Sender sender(opts.destination, opts.key, opts.sendEos);
+        connection.execute(sender);
+        connection.close();
+        return 0;  
+    } catch(const std::exception& error) {
+        std::cout << "Failed: " << error.what() << std::endl;
+    }
+    return 1;
+}

Propchange: incubator/qpid/trunk/qpid/cpp/src/tests/sender.cpp
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/qpid/trunk/qpid/cpp/src/tests/sender.cpp
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Added: incubator/qpid/trunk/qpid/cpp/src/tests/txjob.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/tests/txjob.cpp?rev=714065&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/tests/txjob.cpp (added)
+++ incubator/qpid/trunk/qpid/cpp/src/tests/txjob.cpp Fri Nov 14 08:40:22 2008
@@ -0,0 +1,95 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include <iostream>
+#include <boost/bind.hpp>
+#include <boost/ptr_container/ptr_vector.hpp>
+
+#include "TestOptions.h"
+#include "qpid/client/AsyncSession.h"
+#include "qpid/client/FailoverManager.h"
+#include "qpid/client/Message.h"
+#include "qpid/client/SubscriptionManager.h"
+#include "qpid/sys/Thread.h"
+
+using namespace qpid::client;
+using namespace qpid::sys;
+
+struct Args : public qpid::TestOptions 
+{
+    string workQueue;
+    string source;
+    string dest;
+    uint messages;
+    uint jobs;
+    bool quit;
+    bool declareQueues;
+
+    Args() : workQueue("txshift-control"), source("txshift-1"), dest("txshift-2"), messages(0), jobs(0), 
+             quit(false), declareQueues(false)
+    {
+        addOptions()            
+            ("messages", qpid::optValue(messages, "N"), "Number of messages to shift")
+            ("jobs", qpid::optValue(jobs, "N"), "Number of shift jobs to request")
+            ("source", qpid::optValue(source, "QUEUE NAME"), "source queue from which messages will be shifted")
+            ("dest", qpid::optValue(dest, "QUEUE NAME"), "dest queue to which messages will be shifted")
+            ("work-queue", qpid::optValue(workQueue, "QUEUE NAME"), "work queue from which to take instructions")
+            ("add-quit", qpid::optValue(quit), "add a 'quit' instruction to the queue (after any other jobs)")
+            ("declare-queues", qpid::optValue(declareQueues), "issue a declare for all queues");
+    }
+};
+
+//TODO: might be nice to make this capable of failover as well at some
+//point; for now its just for the setup phase.
+int main(int argc, char** argv)
+{
+    Args opts;
+    try {
+        opts.parse(argc, argv);
+        Connection connection;
+        connection.open(opts.con);
+        Session session = connection.newSession();
+        if (opts.declareQueues) {
+            session.queueDeclare(arg::queue=opts.workQueue);
+            session.queueDeclare(arg::queue=opts.source);
+            session.queueDeclare(arg::queue=opts.dest);
+        }
+        for (uint i = 0; i < opts.jobs; ++i) {
+            Message job("transfer", opts.workQueue);
+            job.getHeaders().setString("src", opts.source);
+            job.getHeaders().setString("dest", opts.dest);
+            job.getHeaders().setInt("count", opts.messages);
+            async(session).messageTransfer(arg::content=job);
+        }
+
+        if (opts.quit) {
+            async(session).messageTransfer(arg::content=Message("quit", opts.workQueue));            
+        }
+
+        session.sync();
+        session.close();
+
+        return 0;
+    } catch(const std::exception& e) {
+	std::cout << e.what() << std::endl;
+        return 1;
+    }
+}

Propchange: incubator/qpid/trunk/qpid/cpp/src/tests/txjob.cpp
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/qpid/trunk/qpid/cpp/src/tests/txjob.cpp
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Added: incubator/qpid/trunk/qpid/cpp/src/tests/txshift.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/tests/txshift.cpp?rev=714065&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/tests/txshift.cpp (added)
+++ incubator/qpid/trunk/qpid/cpp/src/tests/txshift.cpp Fri Nov 14 08:40:22 2008
@@ -0,0 +1,185 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include <iostream>
+#include <boost/bind.hpp>
+#include <boost/ptr_container/ptr_vector.hpp>
+
+#include "TestOptions.h"
+#include "qpid/client/AsyncSession.h"
+#include "qpid/client/FailoverManager.h"
+#include "qpid/client/Message.h"
+#include "qpid/client/SubscriptionManager.h"
+#include "qpid/log/Statement.h"
+#include "qpid/sys/Thread.h"
+
+using namespace qpid::client;
+using namespace qpid::sys;
+
+struct Args : public qpid::TestOptions 
+{
+    string workQueue;
+    size_t workers;
+
+    Args() : workQueue("txshift-control"), workers(1)
+    {
+        addOptions()            
+            ("workers", qpid::optValue(workers, "N"), "Number of separate worker sessions to start")
+            ("work-queue", qpid::optValue(workQueue, "NAME"), "work queue from which to take instructions");
+    }
+};
+
+struct Transfer : MessageListener
+{
+    std::string control;
+    std::string source;
+    std::string destination;
+    uint expected;
+    uint transfered;
+    SubscriptionSettings controlSettings;
+    Subscription controlSubscription;
+    SubscriptionSettings sourceSettings;
+    Subscription sourceSubscription;
+
+    Transfer(const std::string control_) : control(control_), expected(0), transfered(0) {}
+
+    void subscribeToSource(SubscriptionManager& manager) 
+    {
+        sourceSettings.autoAck = 0;//will accept once at the end of the batch
+        sourceSettings.flowControl = FlowControl::messageCredit(expected);
+        sourceSubscription = manager.subscribe(*this, source, sourceSettings);
+        QPID_LOG(info, "Subscribed to source: " << source << " expecting: " << expected);
+    }
+
+    void subscribeToControl(SubscriptionManager& manager) 
+    {
+        controlSettings.flowControl = FlowControl::messageCredit(1);
+        controlSubscription = manager.subscribe(*this, control, controlSettings);
+        QPID_LOG(info, "Subscribed to job queue");
+    }
+
+    void received(Message& message)
+    {
+        QPID_LOG(debug, "received: " << message.getData() << " for " << message.getDestination());
+        if (message.getDestination() == source) {
+            receivedFromSource(message);
+        } else if (message.getDestination() == control) {
+            receivedFromControl(message);
+        } else {
+            QPID_LOG(error, "Unexpected message: " << message.getData() << " to " << message.getDestination());
+        }
+    }
+
+    void receivedFromSource(Message& message)
+    {
+        QPID_LOG(debug, "transfering  " << (transfered+1) << " of " << expected);
+        message.getDeliveryProperties().setRoutingKey(destination);
+        async(sourceSubscription.getSession()).messageTransfer(arg::content=message);
+        if (++transfered == expected) {
+            QPID_LOG(info, "completed job: " << transfered << " messages shifted from " << 
+                     source << " to " << destination);
+            sourceSubscription.accept(sourceSubscription.getUnaccepted());
+            sourceSubscription.getSession().txCommit();
+            sourceSubscription.cancel();
+            //grant credit to allow broker to send us another control message
+            controlSubscription.grantMessageCredit(1);
+        }
+    }
+
+    void receivedFromControl(Message& message)
+    {
+        if (message.getData() == "transfer") {
+            source = message.getHeaders().getAsString("src");
+            destination = message.getHeaders().getAsString("dest");
+            expected = message.getHeaders().getAsInt("count");
+            transfered = 0;
+            QPID_LOG(info, "received transfer request: " << expected << " messages to be shifted from " << 
+                     source << " to " << destination);
+            subscribeToSource(controlSubscription.getSubscriptionManager());
+        } else if (message.getData() == "quit") {
+            QPID_LOG(info, "received quit request");
+            controlSubscription.cancel();
+        } else {
+            std::cerr << "Rejecting invalid message: " << message.getData() << std::endl;
+            controlSubscription.getSession().messageReject(SequenceSet(message.getId()));
+        }
+    }
+
+};
+
+struct Worker : FailoverManager::Command, Runnable
+{
+    FailoverManager& connection;
+    Transfer transfer;
+    Thread runner;
+
+    Worker(FailoverManager& c, const std::string& controlQueue) : connection(c), transfer(controlQueue) {}
+
+    void run() 
+    {
+        connection.execute(*this);
+    }
+
+    void start()
+    {
+        runner = Thread(this);
+    }
+
+    void join()
+    {
+        runner.join();
+    }
+
+    void execute(AsyncSession& session, bool isRetry) 
+    {
+        if (isRetry) QPID_LOG(info, "Retrying...");
+        session.txSelect();
+        SubscriptionManager subs(session);
+        transfer.subscribeToControl(subs);
+        subs.run();
+    }
+};
+
+int main(int argc, char** argv)
+{
+    Args opts;
+    try {
+        opts.parse(argc, argv);
+        FailoverManager connection(opts.con);
+        connection.connect();
+        if (opts.workers == 1) {
+            Worker worker(connection, opts.workQueue);
+            worker.run();
+        } else {
+            boost::ptr_vector<Worker> workers;
+            for (size_t i = 0; i < opts.workers; i++) {
+                workers.push_back(new Worker(connection, opts.workQueue));
+            }
+            for_each(workers.begin(), workers.end(), boost::bind(&Worker::start, _1));
+            for_each(workers.begin(), workers.end(), boost::bind(&Worker::join, _1));
+        }
+
+        return 0;
+    } catch(const std::exception& e) {
+	std::cout << e.what() << std::endl;
+        return 1;
+    }
+}

Propchange: incubator/qpid/trunk/qpid/cpp/src/tests/txshift.cpp
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/qpid/trunk/qpid/cpp/src/tests/txshift.cpp
------------------------------------------------------------------------------
    svn:keywords = Rev Date