You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zookeeper.apache.org by iv...@apache.org on 2011/11/08 19:09:49 UTC
svn commit: r1199372 - in /zookeeper/bookkeeper/trunk: CHANGES.txt
hedwig-client/src/main/cpp/lib/subscriberimpl.cpp
hedwig-client/src/main/cpp/lib/subscriberimpl.h
hedwig-client/src/main/cpp/test/pubsubtest.cpp
hedwig-client/src/main/cpp/test/util.h
Author: ivank
Date: Tue Nov 8 18:09:49 2011
New Revision: 1199372
URL: http://svn.apache.org/viewvc?rev=1199372&view=rev
Log:
BOOKKEEPER-80: subscription msg queue race condition in hedwig c++ client (Sijie Guo via ivank)
Modified:
zookeeper/bookkeeper/trunk/CHANGES.txt
zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/lib/subscriberimpl.cpp
zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/lib/subscriberimpl.h
zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/test/pubsubtest.cpp
zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/test/util.h
Modified: zookeeper/bookkeeper/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/CHANGES.txt?rev=1199372&r1=1199371&r2=1199372&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/CHANGES.txt (original)
+++ zookeeper/bookkeeper/trunk/CHANGES.txt Tue Nov 8 18:09:49 2011
@@ -81,6 +81,8 @@ BUGFIXES:
BOOKKEEPER-71: hedwig c++ client does not build . (ivank)
BOOKKEEPER-107: memory leak in HostAddress of hedwig c++ client (Sijie Guo via ivank)
+
+ BOOKKEEPER-80: subscription msg queue race condition in hedwig c++ client (Sijie Guo via ivank)
IMPROVEMENTS:
Modified: zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/lib/subscriberimpl.cpp
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/lib/subscriberimpl.cpp?rev=1199372&r1=1199371&r2=1199372&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/lib/subscriberimpl.cpp (original)
+++ zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/lib/subscriberimpl.cpp Tue Nov 8 18:09:49 2011
@@ -155,6 +155,7 @@ SubscriberClientChannelHandler::~Subscri
void SubscriberClientChannelHandler::messageReceived(const DuplexChannelPtr& channel, const PubSubResponsePtr& m) {
if (m->has_message()) {
+ boost::lock_guard<boost::shared_mutex> lock(queue_lock);
LOG4CXX_DEBUG(logger, "Message received (topic:" << origData->getTopic() << ", subscriberId:" << origData->getSubscriberId() << ")");
if (this->handler.get()) {
@@ -234,8 +235,16 @@ void SubscriberClientChannelHandler::cha
}
void SubscriberClientChannelHandler::startDelivery(const MessageHandlerCallbackPtr& handler) {
+ boost::lock_guard<boost::shared_mutex> lock(queue_lock);
+
this->handler = handler;
-
+
+ if (!(this->handler.get())) {
+ // no message handler callback
+ LOG4CXX_WARN(logger, "Handler " << this << " try to start an empty message handler");
+ return;
+ }
+
while (!queue.empty()) {
PubSubResponsePtr m = queue.front();
queue.pop_front();
@@ -250,6 +259,7 @@ void SubscriberClientChannelHandler::sta
void SubscriberClientChannelHandler::stopDelivery() {
channel->stopReceiving();
+ boost::lock_guard<boost::shared_mutex> lock(queue_lock);
this->handler = MessageHandlerCallbackPtr();
}
Modified: zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/lib/subscriberimpl.h
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/lib/subscriberimpl.h?rev=1199372&r1=1199371&r2=1199372&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/lib/subscriberimpl.h (original)
+++ zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/lib/subscriberimpl.h Tue Nov 8 18:09:49 2011
@@ -128,8 +128,9 @@ namespace Hedwig {
private:
SubscriberImpl& subscriber;
+ boost::shared_mutex queue_lock;
std::deque<PubSubResponsePtr> queue;
-
+
MessageHandlerCallbackPtr handler;
PubSubDataPtr origData;
DuplexChannelPtr channel;
Modified: zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/test/pubsubtest.cpp
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/test/pubsubtest.cpp?rev=1199372&r1=1199371&r2=1199372&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/test/pubsubtest.cpp (original)
+++ zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/test/pubsubtest.cpp Tue Nov 8 18:09:49 2011
@@ -19,6 +19,8 @@
#include <config.h>
#endif
+#include <sstream>
+
#include <cppunit/Test.h>
#include <cppunit/TestSuite.h>
#include <cppunit/extensions/HelperMacros.h>
@@ -39,6 +41,7 @@ static log4cxx::LoggerPtr logger(log4cxx
class PubSubTestSuite : public CppUnit::TestFixture {
private:
CPPUNIT_TEST_SUITE( PubSubTestSuite );
+ CPPUNIT_TEST(testPubSubOrderChecking);
CPPUNIT_TEST(testPubSubContinuousOverClose);
// CPPUNIT_TEST(testPubSubContinuousOverServerDown);
CPPUNIT_TEST(testMultiTopic);
@@ -96,7 +99,140 @@ public:
std::string topic;
std::string subscriberId;
};
-
+
+ // order checking callback
+ class MyOrderCheckingMessageHandlerCallback : public Hedwig::MessageHandlerCallback {
+ public:
+ MyOrderCheckingMessageHandlerCallback(const std::string& topic, const std::string& subscriberId, const int startMsgId, const int sleepTimeInConsume)
+ : messagesReceived(0), topic(topic), subscriberId(subscriberId), startMsgId(startMsgId),
+ isInOrder(true), sleepTimeInConsume(sleepTimeInConsume) {
+ }
+
+ virtual void consume(const std::string& topic, const std::string& subscriberId,
+ const Hedwig::Message& msg, Hedwig::OperationCallbackPtr& callback) {
+ if (topic == this->topic && subscriberId == this->subscriberId) {
+ boost::lock_guard<boost::mutex> lock(mutex);
+
+ messagesReceived++;
+
+ int newMsgId = atoi(msg.body().c_str());
+ // checking msgId
+ LOG4CXX_DEBUG(logger, "received message " << newMsgId);
+ if (isInOrder) {
+ if (newMsgId != startMsgId + 1) {
+ LOG4CXX_ERROR(logger, "received out-of-order message : expected " << (startMsgId + 1) << ", actual " << newMsgId);
+ isInOrder = false;
+ } else {
+ startMsgId = newMsgId;
+ }
+ }
+ callback->operationComplete();
+ sleep(sleepTimeInConsume);
+ }
+ }
+
+ int numMessagesReceived() {
+ boost::lock_guard<boost::mutex> lock(mutex);
+ int i = messagesReceived;
+ return i;
+ }
+
+ bool inOrder() {
+ boost::lock_guard<boost::mutex> lock(mutex);
+ return isInOrder;
+ }
+
+ protected:
+ boost::mutex mutex;
+ int messagesReceived;
+ std::string topic;
+ std::string subscriberId;
+ int startMsgId;
+ bool isInOrder;
+ int sleepTimeInConsume;
+ };
+
+ class PubForOrderChecking {
+ public:
+ PubForOrderChecking(std::string &topic, int startMsgId, int numMsgs, int sleepTime, Hedwig::Publisher &pub)
+ : topic(topic), startMsgId(startMsgId), numMsgs(numMsgs), sleepTime(sleepTime), pub(pub) {
+ }
+
+ void operator()() {
+ for (int i=0; i<numMsgs; i++) {
+ int msg = startMsgId + i;
+ std::stringstream ss;
+ ss << msg;
+ pub.publish(topic, ss.str());
+ sleep(sleepTime);
+ }
+ }
+
+
+ private:
+ std::string topic;
+ int startMsgId;
+ int numMsgs;
+ int sleepTime;
+ Hedwig::Publisher& pub;
+ };
+
+ // check message ordering
+ void testPubSubOrderChecking() {
+ std::string topic = "orderCheckingTopic";
+ std::string sid = "mysub-0";
+
+ int numMessages = 5;
+ int sleepTimeInConsume = 1;
+ // sync timeout
+ int syncTimeout = 10000;
+
+ // in order to guarantee message order, message queue should be locked
+ // so message received in io thread would be blocked, which also block
+ // sent operations (publish). because we have only one io thread now
+ // so increase sync timeout to 10s, which is more than numMessages * sleepTimeInConsume
+ Hedwig::Configuration* conf = new TestServerConfiguration(syncTimeout);
+ std::auto_ptr<Hedwig::Configuration> confptr(conf);
+
+ Hedwig::Client* client = new Hedwig::Client(*conf);
+ std::auto_ptr<Hedwig::Client> clientptr(client);
+
+ Hedwig::Subscriber& sub = client->getSubscriber();
+ Hedwig::Publisher& pub = client->getPublisher();
+
+ sub.subscribe(topic, sid, Hedwig::SubscribeRequest::CREATE_OR_ATTACH);
+
+ // we don't start delivery first, so the message will be queued
+ // publish ${numMessages} messages, so the messages will be queued
+ for (int i=0; i<numMessages; i++) {
+ std::stringstream ss;
+ ss << i;
+ pub.publish(topic, ss.str());
+ }
+
+ MyOrderCheckingMessageHandlerCallback* cb = new MyOrderCheckingMessageHandlerCallback(topic, sid, -1, sleepTimeInConsume);
+ Hedwig::MessageHandlerCallbackPtr handler(cb);
+
+ // create a thread to publish another ${numMessages} messages
+ boost::thread pubThread(PubForOrderChecking(topic, numMessages, numMessages, sleepTimeInConsume, pub));
+
+ // start delivery will consumed the queued messages
+ // new message will recevied and the queued message should be consumed
+ // hedwig should ensure the message are received in order
+ sub.startDelivery(topic, sid, handler);
+
+ // wait until message are all published
+ pubThread.join();
+
+ for (int i = 0; i < 10; i++) {
+ sleep(3);
+ if (cb->numMessagesReceived() == 2 * numMessages) {
+ break;
+ }
+ }
+ CPPUNIT_ASSERT(cb->inOrder());
+ }
+
void testPubSubContinuousOverClose() {
std::string topic = "pubSubTopic";
std::string sid = "MySubscriberid-1";
Modified: zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/test/util.h
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/test/util.h?rev=1199372&r1=1199371&r2=1199372&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/test/util.h (original)
+++ zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/test/util.h Tue Nov 8 18:09:49 2011
@@ -95,9 +95,14 @@ private:
class TestServerConfiguration : public Hedwig::Configuration {
public:
- TestServerConfiguration() : address("localhost:4081") {}
+ TestServerConfiguration() : address("localhost:4081"), syncTimeout(10000) {}
+
+ TestServerConfiguration(int syncTimeout) : address("localhost:4081"), syncTimeout(syncTimeout) {}
- virtual int getInt(const std::string& /*key*/, int defaultVal) const {
+ virtual int getInt(const std::string& key, int defaultVal) const {
+ if (key == Configuration::SYNC_REQUEST_TIMEOUT) {
+ return syncTimeout;
+ }
return defaultVal;
}
@@ -115,6 +120,7 @@ public:
private:
const std::string address;
+ const int syncTimeout;
};