You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by gs...@apache.org on 2008/11/14 17:40:22 UTC
svn commit: r714065 - in /incubator/qpid/trunk/qpid/cpp/src: qpid/client/
tests/
Author: gsim
Date: Fri Nov 14 08:40:22 2008
New Revision: 714065
URL: http://svn.apache.org/viewvc?rev=714065&view=rev
Log:
Added some failover capable tests
Added grantCredit() method to subscription to allow simpler control of message delivery
Added:
incubator/qpid/trunk/qpid/cpp/src/tests/receiver.cpp (with props)
incubator/qpid/trunk/qpid/cpp/src/tests/sender.cpp (with props)
incubator/qpid/trunk/qpid/cpp/src/tests/txjob.cpp (with props)
incubator/qpid/trunk/qpid/cpp/src/tests/txshift.cpp (with props)
Modified:
incubator/qpid/trunk/qpid/cpp/src/qpid/client/Subscription.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/client/Subscription.h
incubator/qpid/trunk/qpid/cpp/src/qpid/client/SubscriptionImpl.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/client/SubscriptionImpl.h
incubator/qpid/trunk/qpid/cpp/src/qpid/client/SubscriptionManager.h
incubator/qpid/trunk/qpid/cpp/src/tests/ (props changed)
incubator/qpid/trunk/qpid/cpp/src/tests/Makefile.am
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/Subscription.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/Subscription.cpp?rev=714065&r1=714064&r2=714065&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/Subscription.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/Subscription.cpp Fri Nov 14 08:40:22 2008
@@ -22,6 +22,7 @@
#include "Subscription.h"
#include "SubscriptionImpl.h"
#include "HandlePrivate.h"
+#include "qpid/framing/enum.h"
namespace qpid {
namespace client {
@@ -42,7 +43,8 @@
Session Subscription::getSession() const { return impl->getSession(); }
SubscriptionManager&Subscription:: getSubscriptionManager() const { return impl->getSubscriptionManager(); }
void Subscription::cancel() { impl->cancel(); }
-
+void Subscription::grantMessageCredit(uint32_t value) { impl->grantCredit(framing::message::CREDIT_UNIT_MESSAGE, value); }
+void Subscription::grantByteCredit(uint32_t value) { impl->grantCredit(framing::message::CREDIT_UNIT_BYTE, value); }
}} // namespace qpid::client
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/Subscription.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/Subscription.h?rev=714065&r1=714064&r2=714065&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/Subscription.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/Subscription.h Fri Nov 14 08:40:22 2008
@@ -101,6 +101,12 @@
/** Cancel the subscription. */
void cancel();
+
+ /** Grant the specified amount of message credit */
+ void grantMessageCredit(uint32_t);
+
+ /** Grant the specified amount of byte credit */
+ void grantByteCredit(uint32_t);
};
}} // namespace qpid::client
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/SubscriptionImpl.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/SubscriptionImpl.cpp?rev=714065&r1=714064&r2=714065&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/SubscriptionImpl.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/SubscriptionImpl.cpp Fri Nov 14 08:40:22 2008
@@ -62,6 +62,10 @@
s.sync();
}
+void SubscriptionImpl::grantCredit(framing::message::CreditUnit unit, uint32_t value) {
+ async(manager.getSession()).messageFlow(name, unit, value);
+}
+
void SubscriptionImpl::setAutoAck(size_t n) {
Mutex::ScopedLock l(lock);
settings.autoAck = n;
@@ -103,7 +107,7 @@
Session SubscriptionImpl::getSession() const { return manager.getSession(); }
-SubscriptionManager&SubscriptionImpl:: getSubscriptionManager() const { return manager; }
+SubscriptionManager& SubscriptionImpl::getSubscriptionManager() const { return manager; }
void SubscriptionImpl::cancel() { manager.cancel(name); }
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/SubscriptionImpl.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/SubscriptionImpl.h?rev=714065&r1=714064&r2=714065&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/SubscriptionImpl.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/SubscriptionImpl.h Fri Nov 14 08:40:22 2008
@@ -25,6 +25,7 @@
#include "qpid/client/SubscriptionSettings.h"
#include "qpid/client/Session.h"
#include "qpid/client/MessageListener.h"
+#include "qpid/framing/enum.h"
#include "qpid/framing/SequenceSet.h"
#include "qpid/sys/Mutex.h"
#include "qpid/RefCounted.h"
@@ -88,6 +89,9 @@
/** Cancel the subscription. */
void cancel();
+ /** Grant specified credit for this subscription **/
+ void grantCredit(framing::message::CreditUnit unit, uint32_t value);
+
void received(Message&);
private:
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/SubscriptionManager.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/SubscriptionManager.h?rev=714065&r1=714064&r2=714065&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/SubscriptionManager.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/SubscriptionManager.h Fri Nov 14 08:40:22 2008
@@ -92,16 +92,6 @@
* </li>
* </ul>
*
- *
- * <h2>Setting Accept Mode, Acquire Mode, Ack Policy</h2>
- *
- * <p>setAcceptMode()</p>
- * <pre>subscriptions.setAcceptMode(true);</pre>
- * <p>setAcquireMode()</p>
- * <pre>subscriptions.setAcquireMode(false);</pre>
- *
- *
- *
*/
class SubscriptionManager : public sys::Runnable
{
Propchange: incubator/qpid/trunk/qpid/cpp/src/tests/
------------------------------------------------------------------------------
--- svn:ignore (original)
+++ svn:ignore Fri Nov 14 08:40:22 2008
@@ -26,3 +26,8 @@
cert.password
test_cert_db
header_test
+receiver
+sender
+txshift
+txjob
+
Modified: incubator/qpid/trunk/qpid/cpp/src/tests/Makefile.am
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/tests/Makefile.am?rev=714065&r1=714064&r2=714065&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/tests/Makefile.am (original)
+++ incubator/qpid/trunk/qpid/cpp/src/tests/Makefile.am Fri Nov 14 08:40:22 2008
@@ -134,6 +134,22 @@
header_test_SOURCES=header_test.cpp TestOptions.h ConnectionOptions.h
header_test_LDADD=$(lib_client)
+check_PROGRAMS+=txshift
+txshift_SOURCES=txshift.cpp TestOptions.h ConnectionOptions.h
+txshift_LDADD=$(lib_client)
+
+check_PROGRAMS+=txjob
+txjob_SOURCES=txjob.cpp TestOptions.h ConnectionOptions.h
+txjob_LDADD=$(lib_client)
+
+check_PROGRAMS+=receiver
+receiver_SOURCES=receiver.cpp TestOptions.h ConnectionOptions.h
+receiver_LDADD=$(lib_client)
+
+check_PROGRAMS+=sender
+sender_SOURCES=sender.cpp TestOptions.h ConnectionOptions.h
+sender_LDADD=$(lib_client)
+
TESTS_ENVIRONMENT = VALGRIND=$(VALGRIND) srcdir=$(srcdir) QPID_DATA_DIR= BOOST_TEST_SHOW_PROGRESS=yes $(srcdir)/run_test
system_tests = client_test quick_perftest quick_topictest run_header_test quick_txtest
Added: incubator/qpid/trunk/qpid/cpp/src/tests/receiver.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/tests/receiver.cpp?rev=714065&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/tests/receiver.cpp (added)
+++ incubator/qpid/trunk/qpid/cpp/src/tests/receiver.cpp Fri Nov 14 08:40:22 2008
@@ -0,0 +1,120 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include <qpid/client/FailoverManager.h>
+#include <qpid/client/Session.h>
+#include <qpid/client/Message.h>
+#include <qpid/client/SubscriptionManager.h>
+#include <qpid/client/SubscriptionManager.h>
+#include "TestOptions.h"
+
+#include <iostream>
+#include <fstream>
+
+
+using namespace qpid;
+using namespace qpid::client;
+using namespace qpid::framing;
+
+using namespace std;
+
+struct Args : public qpid::TestOptions
+{
+ string queue;
+ uint messages;
+ bool ignoreDuplicates;
+
+ Args() : queue("test-queue"), messages(0), ignoreDuplicates(false)
+ {
+ addOptions()
+ ("queue", qpid::optValue(queue, "QUEUE NAME"), "Queue from which to request messages")
+ ("messages", qpid::optValue(messages, "N"), "Number of messages to receive; 0 means receive indefinitely")
+ ("ignore-duplicates", qpid::optValue(ignoreDuplicates), "Detect and ignore duplicates (by checking 'sn' header)");
+ }
+};
+
+const string EOS("eos");
+
+class Receiver : public MessageListener, public FailoverManager::Command
+{
+ public:
+ Receiver(const string& queue, uint messages, bool ignoreDuplicates);
+ void received(Message& message);
+ void execute(AsyncSession& session, bool isRetry);
+ private:
+ const string queue;
+ const uint count;
+ const bool skipDups;
+ Subscription subscription;
+ uint processed;
+ uint lastSn;
+
+ bool isDuplicate(Message& message);
+};
+
+Receiver::Receiver(const string& q, uint messages, bool ignoreDuplicates) :
+ queue(q), count(messages), skipDups(ignoreDuplicates), processed(0), lastSn(0) {}
+
+void Receiver::received(Message & message)
+{
+ if (!(skipDups && isDuplicate(message))) {
+ bool eos = message.getData() == EOS;
+ if (!eos) std::cout << message.getData() << std::endl;
+ if (eos || ++processed == count) subscription.cancel();
+ }
+}
+
+bool Receiver::isDuplicate(Message& message)
+{
+ uint sn = message.getHeaders().getAsInt("sn");
+ if (lastSn < sn) {
+ lastSn = sn;
+ return false;
+ } else {
+ return true;
+ }
+}
+
+void Receiver::execute(AsyncSession& session, bool /*isRetry*/)
+{
+ SubscriptionManager subs(session);
+ subscription = subs.subscribe(*this, queue);
+ subs.run();
+}
+
+int main(int argc, char ** argv)
+{
+ Args opts;
+ try {
+ opts.parse(argc, argv);
+ FailoverManager connection(opts.con);
+ Receiver receiver(opts.queue, opts.messages, opts.ignoreDuplicates);
+ connection.execute(receiver);
+ connection.close();
+ return 0;
+ } catch(const std::exception& error) {
+ std::cerr << "Failure: " << error.what() << std::endl;
+ }
+ return 1;
+}
+
+
+
Propchange: incubator/qpid/trunk/qpid/cpp/src/tests/receiver.cpp
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: incubator/qpid/trunk/qpid/cpp/src/tests/receiver.cpp
------------------------------------------------------------------------------
svn:keywords = Rev Date
Added: incubator/qpid/trunk/qpid/cpp/src/tests/sender.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/tests/sender.cpp?rev=714065&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/tests/sender.cpp (added)
+++ incubator/qpid/trunk/qpid/cpp/src/tests/sender.cpp Fri Nov 14 08:40:22 2008
@@ -0,0 +1,100 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include <qpid/client/FailoverManager.h>
+#include <qpid/client/Session.h>
+#include <qpid/client/AsyncSession.h>
+#include <qpid/client/Message.h>
+#include <qpid/client/MessageReplayTracker.h>
+#include <qpid/Exception.h>
+#include "TestOptions.h"
+
+#include <iostream>
+
+using namespace qpid;
+using namespace qpid::client;
+using namespace qpid::framing;
+
+using namespace std;
+
+struct Args : public qpid::TestOptions
+{
+ string destination;
+ string key;
+ bool sendEos;
+
+ Args() : key("test-queue"), sendEos(false)
+ {
+ addOptions()
+ ("exchange", qpid::optValue(destination, "EXCHANGE"), "Exchange to send messages to")
+ ("routing-key", qpid::optValue(key, "KEY"), "Routing key to add to messages")
+ ("send-eos", qpid::optValue(sendEos), "Send EOS message to mark end of input");
+ }
+};
+
+const string EOS("eos");
+
+class Sender : public FailoverManager::Command
+{
+ public:
+ Sender(const std::string& destination, const std::string& key, bool sendEos);
+ void execute(AsyncSession& session, bool isRetry);
+ private:
+ MessageReplayTracker sender;
+ Message message;
+ const bool sendEos;
+ uint sent;
+};
+
+Sender::Sender(const std::string& destination, const std::string& key, bool eos) :
+ sender(10), message(destination, key), sendEos(eos), sent(0) {}
+
+void Sender::execute(AsyncSession& session, bool isRetry)
+{
+ if (isRetry) sender.replay(session);
+ else sender.init(session);
+ string data;
+ while (std::cin >> data) {
+ message.setData(data);
+ message.getHeaders().setInt("sn", ++sent);
+ sender.send(message);
+ }
+ if (sendEos) {
+ message.setData(EOS);
+ sender.send(message);
+ }
+}
+
+int main(int argc, char ** argv)
+{
+ Args opts;
+ try {
+ opts.parse(argc, argv);
+ FailoverManager connection(opts.con);
+ Sender sender(opts.destination, opts.key, opts.sendEos);
+ connection.execute(sender);
+ connection.close();
+ return 0;
+ } catch(const std::exception& error) {
+ std::cout << "Failed: " << error.what() << std::endl;
+ }
+ return 1;
+}
Propchange: incubator/qpid/trunk/qpid/cpp/src/tests/sender.cpp
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: incubator/qpid/trunk/qpid/cpp/src/tests/sender.cpp
------------------------------------------------------------------------------
svn:keywords = Rev Date
Added: incubator/qpid/trunk/qpid/cpp/src/tests/txjob.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/tests/txjob.cpp?rev=714065&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/tests/txjob.cpp (added)
+++ incubator/qpid/trunk/qpid/cpp/src/tests/txjob.cpp Fri Nov 14 08:40:22 2008
@@ -0,0 +1,95 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include <iostream>
+#include <boost/bind.hpp>
+#include <boost/ptr_container/ptr_vector.hpp>
+
+#include "TestOptions.h"
+#include "qpid/client/AsyncSession.h"
+#include "qpid/client/FailoverManager.h"
+#include "qpid/client/Message.h"
+#include "qpid/client/SubscriptionManager.h"
+#include "qpid/sys/Thread.h"
+
+using namespace qpid::client;
+using namespace qpid::sys;
+
+struct Args : public qpid::TestOptions
+{
+ string workQueue;
+ string source;
+ string dest;
+ uint messages;
+ uint jobs;
+ bool quit;
+ bool declareQueues;
+
+ Args() : workQueue("txshift-control"), source("txshift-1"), dest("txshift-2"), messages(0), jobs(0),
+ quit(false), declareQueues(false)
+ {
+ addOptions()
+ ("messages", qpid::optValue(messages, "N"), "Number of messages to shift")
+ ("jobs", qpid::optValue(jobs, "N"), "Number of shift jobs to request")
+ ("source", qpid::optValue(source, "QUEUE NAME"), "source queue from which messages will be shifted")
+ ("dest", qpid::optValue(dest, "QUEUE NAME"), "dest queue to which messages will be shifted")
+ ("work-queue", qpid::optValue(workQueue, "QUEUE NAME"), "work queue from which to take instructions")
+ ("add-quit", qpid::optValue(quit), "add a 'quit' instruction to the queue (after any other jobs)")
+ ("declare-queues", qpid::optValue(declareQueues), "issue a declare for all queues");
+ }
+};
+
+//TODO: might be nice to make this capable of failover as well at some
+//point; for now its just for the setup phase.
+int main(int argc, char** argv)
+{
+ Args opts;
+ try {
+ opts.parse(argc, argv);
+ Connection connection;
+ connection.open(opts.con);
+ Session session = connection.newSession();
+ if (opts.declareQueues) {
+ session.queueDeclare(arg::queue=opts.workQueue);
+ session.queueDeclare(arg::queue=opts.source);
+ session.queueDeclare(arg::queue=opts.dest);
+ }
+ for (uint i = 0; i < opts.jobs; ++i) {
+ Message job("transfer", opts.workQueue);
+ job.getHeaders().setString("src", opts.source);
+ job.getHeaders().setString("dest", opts.dest);
+ job.getHeaders().setInt("count", opts.messages);
+ async(session).messageTransfer(arg::content=job);
+ }
+
+ if (opts.quit) {
+ async(session).messageTransfer(arg::content=Message("quit", opts.workQueue));
+ }
+
+ session.sync();
+ session.close();
+
+ return 0;
+ } catch(const std::exception& e) {
+ std::cout << e.what() << std::endl;
+ return 1;
+ }
+}
Propchange: incubator/qpid/trunk/qpid/cpp/src/tests/txjob.cpp
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: incubator/qpid/trunk/qpid/cpp/src/tests/txjob.cpp
------------------------------------------------------------------------------
svn:keywords = Rev Date
Added: incubator/qpid/trunk/qpid/cpp/src/tests/txshift.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/tests/txshift.cpp?rev=714065&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/tests/txshift.cpp (added)
+++ incubator/qpid/trunk/qpid/cpp/src/tests/txshift.cpp Fri Nov 14 08:40:22 2008
@@ -0,0 +1,185 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include <iostream>
+#include <boost/bind.hpp>
+#include <boost/ptr_container/ptr_vector.hpp>
+
+#include "TestOptions.h"
+#include "qpid/client/AsyncSession.h"
+#include "qpid/client/FailoverManager.h"
+#include "qpid/client/Message.h"
+#include "qpid/client/SubscriptionManager.h"
+#include "qpid/log/Statement.h"
+#include "qpid/sys/Thread.h"
+
+using namespace qpid::client;
+using namespace qpid::sys;
+
+struct Args : public qpid::TestOptions
+{
+ string workQueue;
+ size_t workers;
+
+ Args() : workQueue("txshift-control"), workers(1)
+ {
+ addOptions()
+ ("workers", qpid::optValue(workers, "N"), "Number of separate worker sessions to start")
+ ("work-queue", qpid::optValue(workQueue, "NAME"), "work queue from which to take instructions");
+ }
+};
+
+struct Transfer : MessageListener
+{
+ std::string control;
+ std::string source;
+ std::string destination;
+ uint expected;
+ uint transfered;
+ SubscriptionSettings controlSettings;
+ Subscription controlSubscription;
+ SubscriptionSettings sourceSettings;
+ Subscription sourceSubscription;
+
+ Transfer(const std::string control_) : control(control_), expected(0), transfered(0) {}
+
+ void subscribeToSource(SubscriptionManager& manager)
+ {
+ sourceSettings.autoAck = 0;//will accept once at the end of the batch
+ sourceSettings.flowControl = FlowControl::messageCredit(expected);
+ sourceSubscription = manager.subscribe(*this, source, sourceSettings);
+ QPID_LOG(info, "Subscribed to source: " << source << " expecting: " << expected);
+ }
+
+ void subscribeToControl(SubscriptionManager& manager)
+ {
+ controlSettings.flowControl = FlowControl::messageCredit(1);
+ controlSubscription = manager.subscribe(*this, control, controlSettings);
+ QPID_LOG(info, "Subscribed to job queue");
+ }
+
+ void received(Message& message)
+ {
+ QPID_LOG(debug, "received: " << message.getData() << " for " << message.getDestination());
+ if (message.getDestination() == source) {
+ receivedFromSource(message);
+ } else if (message.getDestination() == control) {
+ receivedFromControl(message);
+ } else {
+ QPID_LOG(error, "Unexpected message: " << message.getData() << " to " << message.getDestination());
+ }
+ }
+
+ void receivedFromSource(Message& message)
+ {
+ QPID_LOG(debug, "transfering " << (transfered+1) << " of " << expected);
+ message.getDeliveryProperties().setRoutingKey(destination);
+ async(sourceSubscription.getSession()).messageTransfer(arg::content=message);
+ if (++transfered == expected) {
+ QPID_LOG(info, "completed job: " << transfered << " messages shifted from " <<
+ source << " to " << destination);
+ sourceSubscription.accept(sourceSubscription.getUnaccepted());
+ sourceSubscription.getSession().txCommit();
+ sourceSubscription.cancel();
+ //grant credit to allow broker to send us another control message
+ controlSubscription.grantMessageCredit(1);
+ }
+ }
+
+ void receivedFromControl(Message& message)
+ {
+ if (message.getData() == "transfer") {
+ source = message.getHeaders().getAsString("src");
+ destination = message.getHeaders().getAsString("dest");
+ expected = message.getHeaders().getAsInt("count");
+ transfered = 0;
+ QPID_LOG(info, "received transfer request: " << expected << " messages to be shifted from " <<
+ source << " to " << destination);
+ subscribeToSource(controlSubscription.getSubscriptionManager());
+ } else if (message.getData() == "quit") {
+ QPID_LOG(info, "received quit request");
+ controlSubscription.cancel();
+ } else {
+ std::cerr << "Rejecting invalid message: " << message.getData() << std::endl;
+ controlSubscription.getSession().messageReject(SequenceSet(message.getId()));
+ }
+ }
+
+};
+
+struct Worker : FailoverManager::Command, Runnable
+{
+ FailoverManager& connection;
+ Transfer transfer;
+ Thread runner;
+
+ Worker(FailoverManager& c, const std::string& controlQueue) : connection(c), transfer(controlQueue) {}
+
+ void run()
+ {
+ connection.execute(*this);
+ }
+
+ void start()
+ {
+ runner = Thread(this);
+ }
+
+ void join()
+ {
+ runner.join();
+ }
+
+ void execute(AsyncSession& session, bool isRetry)
+ {
+ if (isRetry) QPID_LOG(info, "Retrying...");
+ session.txSelect();
+ SubscriptionManager subs(session);
+ transfer.subscribeToControl(subs);
+ subs.run();
+ }
+};
+
+int main(int argc, char** argv)
+{
+ Args opts;
+ try {
+ opts.parse(argc, argv);
+ FailoverManager connection(opts.con);
+ connection.connect();
+ if (opts.workers == 1) {
+ Worker worker(connection, opts.workQueue);
+ worker.run();
+ } else {
+ boost::ptr_vector<Worker> workers;
+ for (size_t i = 0; i < opts.workers; i++) {
+ workers.push_back(new Worker(connection, opts.workQueue));
+ }
+ for_each(workers.begin(), workers.end(), boost::bind(&Worker::start, _1));
+ for_each(workers.begin(), workers.end(), boost::bind(&Worker::join, _1));
+ }
+
+ return 0;
+ } catch(const std::exception& e) {
+ std::cout << e.what() << std::endl;
+ return 1;
+ }
+}
Propchange: incubator/qpid/trunk/qpid/cpp/src/tests/txshift.cpp
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: incubator/qpid/trunk/qpid/cpp/src/tests/txshift.cpp
------------------------------------------------------------------------------
svn:keywords = Rev Date