You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zookeeper.apache.org by iv...@apache.org on 2011/11/08 19:09:49 UTC

svn commit: r1199372 - in /zookeeper/bookkeeper/trunk: CHANGES.txt hedwig-client/src/main/cpp/lib/subscriberimpl.cpp hedwig-client/src/main/cpp/lib/subscriberimpl.h hedwig-client/src/main/cpp/test/pubsubtest.cpp hedwig-client/src/main/cpp/test/util.h

Author: ivank
Date: Tue Nov  8 18:09:49 2011
New Revision: 1199372

URL: http://svn.apache.org/viewvc?rev=1199372&view=rev
Log:
BOOKKEEPER-80: subscription msg queue race condition in hedwig c++ client (Sijie Guo via ivank)

Modified:
    zookeeper/bookkeeper/trunk/CHANGES.txt
    zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/lib/subscriberimpl.cpp
    zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/lib/subscriberimpl.h
    zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/test/pubsubtest.cpp
    zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/test/util.h

Modified: zookeeper/bookkeeper/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/CHANGES.txt?rev=1199372&r1=1199371&r2=1199372&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/CHANGES.txt (original)
+++ zookeeper/bookkeeper/trunk/CHANGES.txt Tue Nov  8 18:09:49 2011
@@ -81,6 +81,8 @@ BUGFIXES:
   BOOKKEEPER-71: hedwig c++ client does not build . (ivank)
 
   BOOKKEEPER-107: memory leak in HostAddress of hedwig c++ client (Sijie Guo via ivank)
+ 
+  BOOKKEEPER-80: subscription msg queue race condition in hedwig c++ client (Sijie Guo via ivank)
 
 IMPROVEMENTS:
 

Modified: zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/lib/subscriberimpl.cpp
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/lib/subscriberimpl.cpp?rev=1199372&r1=1199371&r2=1199372&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/lib/subscriberimpl.cpp (original)
+++ zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/lib/subscriberimpl.cpp Tue Nov  8 18:09:49 2011
@@ -155,6 +155,7 @@ SubscriberClientChannelHandler::~Subscri
 
 void SubscriberClientChannelHandler::messageReceived(const DuplexChannelPtr& channel, const PubSubResponsePtr& m) {
   if (m->has_message()) {
+    boost::lock_guard<boost::shared_mutex> lock(queue_lock);
     LOG4CXX_DEBUG(logger, "Message received (topic:" << origData->getTopic() << ", subscriberId:" << origData->getSubscriberId() << ")");
 
     if (this->handler.get()) {
@@ -234,8 +235,16 @@ void SubscriberClientChannelHandler::cha
 }
 
 void SubscriberClientChannelHandler::startDelivery(const MessageHandlerCallbackPtr& handler) {
+  boost::lock_guard<boost::shared_mutex> lock(queue_lock);
+
   this->handler = handler;
-  
+
+  if (!(this->handler.get())) {
+    // no message handler callback
+    LOG4CXX_WARN(logger, "Handler " << this << " try to start an empty message handler");
+    return;
+  }
+
   while (!queue.empty()) {    
     PubSubResponsePtr m = queue.front();
     queue.pop_front();
@@ -250,6 +259,7 @@ void SubscriberClientChannelHandler::sta
 void SubscriberClientChannelHandler::stopDelivery() {
   channel->stopReceiving();
 
+  boost::lock_guard<boost::shared_mutex> lock(queue_lock);
   this->handler = MessageHandlerCallbackPtr();
 }
 

Modified: zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/lib/subscriberimpl.h
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/lib/subscriberimpl.h?rev=1199372&r1=1199371&r2=1199372&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/lib/subscriberimpl.h (original)
+++ zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/lib/subscriberimpl.h Tue Nov  8 18:09:49 2011
@@ -128,8 +128,9 @@ namespace Hedwig {
   private:
 
     SubscriberImpl& subscriber;
+    boost::shared_mutex queue_lock;
     std::deque<PubSubResponsePtr> queue;
-    
+
     MessageHandlerCallbackPtr handler;
     PubSubDataPtr origData;
     DuplexChannelPtr channel;

Modified: zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/test/pubsubtest.cpp
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/test/pubsubtest.cpp?rev=1199372&r1=1199371&r2=1199372&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/test/pubsubtest.cpp (original)
+++ zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/test/pubsubtest.cpp Tue Nov  8 18:09:49 2011
@@ -19,6 +19,8 @@
 #include <config.h>
 #endif
 
+#include <sstream>
+
 #include <cppunit/Test.h>
 #include <cppunit/TestSuite.h>
 #include <cppunit/extensions/HelperMacros.h>
@@ -39,6 +41,7 @@ static log4cxx::LoggerPtr logger(log4cxx
 class PubSubTestSuite : public CppUnit::TestFixture {
 private:
   CPPUNIT_TEST_SUITE( PubSubTestSuite );
+  CPPUNIT_TEST(testPubSubOrderChecking);
   CPPUNIT_TEST(testPubSubContinuousOverClose);
   //  CPPUNIT_TEST(testPubSubContinuousOverServerDown);
   CPPUNIT_TEST(testMultiTopic);
@@ -96,7 +99,140 @@ public:
     std::string topic;
     std::string subscriberId;
   };
- 
+
+  // order checking callback
+  class MyOrderCheckingMessageHandlerCallback : public Hedwig::MessageHandlerCallback {
+  public:
+    MyOrderCheckingMessageHandlerCallback(const std::string& topic, const std::string& subscriberId, const int startMsgId, const int sleepTimeInConsume)
+      : messagesReceived(0), topic(topic), subscriberId(subscriberId), startMsgId(startMsgId), 
+        isInOrder(true), sleepTimeInConsume(sleepTimeInConsume) {
+    }
+
+    virtual void consume(const std::string& topic, const std::string& subscriberId,
+                         const Hedwig::Message& msg, Hedwig::OperationCallbackPtr& callback) {
+      if (topic == this->topic && subscriberId == this->subscriberId) {
+        boost::lock_guard<boost::mutex> lock(mutex);
+            
+        messagesReceived++;
+
+        int newMsgId = atoi(msg.body().c_str());
+        // checking msgId
+        LOG4CXX_DEBUG(logger, "received message " << newMsgId);
+        if (isInOrder) {
+          if (newMsgId != startMsgId + 1) {
+            LOG4CXX_ERROR(logger, "received out-of-order message : expected " << (startMsgId + 1) << ", actual " << newMsgId);
+            isInOrder = false;
+          } else {
+            startMsgId = newMsgId;
+          }
+        }
+        callback->operationComplete();
+        sleep(sleepTimeInConsume);
+      }
+    }
+    
+    int numMessagesReceived() {
+      boost::lock_guard<boost::mutex> lock(mutex);
+      int i = messagesReceived;
+      return i;
+    }    
+
+    bool inOrder() {
+      boost::lock_guard<boost::mutex> lock(mutex);
+      return isInOrder;
+    }
+    
+  protected:
+    boost::mutex mutex;
+    int messagesReceived;
+    std::string topic;
+    std::string subscriberId;
+    int startMsgId;
+    bool isInOrder;
+    int sleepTimeInConsume;
+  };
+
+  class PubForOrderChecking {
+  public:
+    PubForOrderChecking(std::string &topic, int startMsgId, int numMsgs, int sleepTime, Hedwig::Publisher &pub)
+      : topic(topic), startMsgId(startMsgId), numMsgs(numMsgs), sleepTime(sleepTime), pub(pub) {
+    }
+
+    void operator()() {
+      for (int i=0; i<numMsgs; i++) {
+        int msg = startMsgId + i;
+        std::stringstream ss;
+        ss << msg;
+        pub.publish(topic, ss.str());
+        sleep(sleepTime);
+      }
+    }
+
+
+  private:
+    std::string topic;
+    int startMsgId;
+    int numMsgs;
+    int sleepTime;
+    Hedwig::Publisher& pub;
+  };
+
+  // check message ordering
+  void testPubSubOrderChecking() {
+    std::string topic = "orderCheckingTopic";
+    std::string sid = "mysub-0";
+
+    int numMessages = 5;
+    int sleepTimeInConsume = 1;
+    // sync timeout
+    int syncTimeout = 10000;
+
+    // in order to guarantee message order, message queue should be locked
+    // so message received in io thread would be blocked, which also block
+    // sent operations (publish). because we have only one io thread now
+    // so increase sync timeout to 10s, which is more than numMessages * sleepTimeInConsume
+    Hedwig::Configuration* conf = new TestServerConfiguration(syncTimeout);
+    std::auto_ptr<Hedwig::Configuration> confptr(conf);
+
+    Hedwig::Client* client = new Hedwig::Client(*conf);
+    std::auto_ptr<Hedwig::Client> clientptr(client);
+
+    Hedwig::Subscriber& sub = client->getSubscriber();
+    Hedwig::Publisher& pub = client->getPublisher();
+
+    sub.subscribe(topic, sid, Hedwig::SubscribeRequest::CREATE_OR_ATTACH);
+    
+    // we don't start delivery first, so the message will be queued
+    // publish ${numMessages} messages, so the messages will be queued
+    for (int i=0; i<numMessages; i++) {
+      std::stringstream ss;
+      ss << i;
+      pub.publish(topic, ss.str()); 
+    }
+
+    MyOrderCheckingMessageHandlerCallback* cb = new MyOrderCheckingMessageHandlerCallback(topic, sid, -1, sleepTimeInConsume);
+    Hedwig::MessageHandlerCallbackPtr handler(cb);
+
+    // create a thread to publish another ${numMessages} messages
+    boost::thread pubThread(PubForOrderChecking(topic, numMessages, numMessages, sleepTimeInConsume, pub));
+
+    // start delivery will consumed the queued messages
+    // new message will recevied and the queued message should be consumed
+    // hedwig should ensure the message are received in order
+    sub.startDelivery(topic, sid, handler);
+
+    // wait until message are all published
+    pubThread.join();
+
+    for (int i = 0; i < 10; i++) {
+      sleep(3);
+      if (cb->numMessagesReceived() == 2 * numMessages) {
+        break;
+      }
+    }
+    CPPUNIT_ASSERT(cb->inOrder());
+  }
+
   void testPubSubContinuousOverClose() {
     std::string topic = "pubSubTopic";
     std::string sid = "MySubscriberid-1";

Modified: zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/test/util.h
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/test/util.h?rev=1199372&r1=1199371&r2=1199372&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/test/util.h (original)
+++ zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/test/util.h Tue Nov  8 18:09:49 2011
@@ -95,9 +95,14 @@ private:
 
 class TestServerConfiguration : public Hedwig::Configuration {
 public:
-  TestServerConfiguration() : address("localhost:4081") {}
+  TestServerConfiguration() : address("localhost:4081"), syncTimeout(10000) {}
+
+  TestServerConfiguration(int syncTimeout) : address("localhost:4081"), syncTimeout(syncTimeout) {}
   
-  virtual int getInt(const std::string& /*key*/, int defaultVal) const {
+  virtual int getInt(const std::string& key, int defaultVal) const {
+    if (key == Configuration::SYNC_REQUEST_TIMEOUT) {
+      return syncTimeout;
+    }
     return defaultVal;
   }
 
@@ -115,6 +120,7 @@ public:
   
 private:
   const std::string address;
+  const int syncTimeout;
 };