You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zookeeper.apache.org by iv...@apache.org on 2011/11/21 11:32:40 UTC

svn commit: r1204437 - in /zookeeper/bookkeeper/trunk: ./ hedwig-client/src/main/cpp/lib/ hedwig-client/src/main/cpp/scripts/ hedwig-client/src/main/cpp/test/

Author: ivank
Date: Mon Nov 21 10:32:40 2011
New Revision: 1204437

URL: http://svn.apache.org/viewvc?rev=1204437&view=rev
Log:
BOOKKEEPER-79: randomly startDelivery/stopDelivery will core dump in c++ hedwig client (Sijie Guo via ivank)

Modified:
    zookeeper/bookkeeper/trunk/CHANGES.txt
    zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/lib/channel.cpp
    zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/lib/channel.h
    zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/lib/subscriberimpl.cpp
    zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/scripts/server-control.sh
    zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/test/pubsubtest.cpp

Modified: zookeeper/bookkeeper/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/CHANGES.txt?rev=1204437&r1=1204436&r2=1204437&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/CHANGES.txt (original)
+++ zookeeper/bookkeeper/trunk/CHANGES.txt Mon Nov 21 10:32:40 2011
@@ -106,6 +106,8 @@ BUGFIXES:
 
   BOOKKEEPER-87: TestHedwigHub exhausts direct buffer memory with netty 3.2.4.Final (ivank via fpj)  
 
+  BOOKKEEPER-79: randomly startDelivery/stopDelivery will core dump in c++ hedwig client (Sijie Guo via ivank)
+
 IMPROVEMENTS:
 
  BOOKKEEPER-28: Create useful startup scripts for bookkeeper and hedwig (ivank)

Modified: zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/lib/channel.cpp
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/lib/channel.cpp?rev=1204437&r1=1204436&r2=1204437&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/lib/channel.cpp (original)
+++ zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/lib/channel.cpp Mon Nov 21 10:32:40 2011
@@ -52,7 +52,7 @@ DuplexChannel::DuplexChannel(EventDispat
 			     const Configuration& cfg, const ChannelHandlerPtr& handler)
   : dispatcher(dispatcher), address(addr), handler(handler), 
     socket(dispatcher.getService()), instream(&in_buf), copy_buf(NULL), copy_buf_length(0),
-    state(UNINITIALISED), receiving(false), sending(false)
+    state(UNINITIALISED), receiving(false), reading(false), sending(false)
 {
   LOG4CXX_DEBUG(logger, "Creating DuplexChannel(" << this << ")");
 }
@@ -140,6 +140,21 @@ void DuplexChannel::connect() {  
       h = channel->handler;
     }
   }
+
+  // channel did stopReceiving, we should not call #messageReceived
+  // store this response in outstanding_response variable and did stop receiving
+  // when we startReceiving again, we can process this last response.
+  {
+    boost::lock_guard<boost::mutex> lock(channel->receiving_lock);
+    if (!channel->isReceiving()) {
+      // queue the response
+      channel->outstanding_response = response;
+      channel->reading = false;
+      return;
+    }
+  }
+
+  // channel is still in receiving status
   if (h.get()) {
     h->messageReceived(channel, response);
   }
@@ -188,10 +203,6 @@ void DuplexChannel::connect() {  
 }
 
 /*static*/ void DuplexChannel::readSize(DuplexChannelPtr channel) {
-  if (!channel->isReceiving()) {
-    return;
-  }
-
   int toread = sizeof(uint32_t) - channel->in_buf.size();
   LOG4CXX_DEBUG(logger, " size of incoming message " << sizeof(uint32_t) 
 		<< ", currently in buffer " << channel->in_buf.size() 
@@ -212,14 +223,56 @@ void DuplexChannel::connect() {  
 
 void DuplexChannel::startReceiving() {
   LOG4CXX_DEBUG(logger, "DuplexChannel::startReceiving channel(" << this << ") currently receiving = " << receiving);
-  
-  boost::lock_guard<boost::mutex> lock(receiving_lock);
-  if (receiving) {
-    return;
-  } 
-  receiving = true;
-  
-  DuplexChannel::readSize(shared_from_this());
+
+  PubSubResponsePtr response;
+  bool inReadingState;
+  {
+    boost::lock_guard<boost::mutex> lock(receiving_lock);
+    // receiving before just return
+    if (receiving) {
+      return;
+    } 
+    receiving = true;
+
+    // if we have last response collected in previous startReceiving
+    // we need to process it, but we should process it under receiving_lock
+    // otherwise we enter dead lock
+    // subscriber#startDelivery(subscriber#queue_lock) =>
+    // channel#startReceiving(channel#receiving_lock) =>
+    // sbuscriber#messageReceived(subscriber#queue_lock)
+    if (outstanding_response.get()) {
+      response = outstanding_response;
+      outstanding_response = PubSubResponsePtr();
+    }
+
+    // if channel is in reading status wait data from remote server
+    // we don't need to insert another readSize op
+    inReadingState = reading;
+    if (!reading) {
+      reading = true;
+    }
+  }
+
+  // consume message buffered in receiving queue
+  // there is at most one message buffered when we
+  // stopReceiving between #readSize and #readMsgBody
+  if (response.get()) {
+    ChannelHandlerPtr h;
+    {
+      boost::shared_lock<boost::shared_mutex> lock(this->destruction_lock);
+      if (this->handler.get()) {
+        h = this->handler;
+      }
+    }
+    if (h.get()) {
+      h->messageReceived(shared_from_this(), response);
+    }
+  }
+
+  // if channel is not in reading state, #readSize
+  if (!inReadingState) {
+    DuplexChannel::readSize(shared_from_this());
+  }
 }
 
 bool DuplexChannel::isReceiving() {
@@ -320,9 +373,19 @@ void DuplexChannel::kill() {
   if (connected) {
     setState(DEAD);
     
-    socket.cancel();
-    socket.shutdown(boost::asio::ip::tcp::socket::shutdown_both);
-    socket.close();
+    boost::system::error_code ec;
+    socket.cancel(ec);
+    if (ec) {
+      LOG4CXX_WARN(logger, "Channel " << this << " canceling io error : " << ec.message().c_str());
+    }
+    socket.shutdown(boost::asio::ip::tcp::socket::shutdown_both, ec);
+    if (ec) {
+      LOG4CXX_WARN(logger, "Channel " << this << " shutdown error : " << ec.message().c_str());
+    }
+    socket.close(ec);
+    if (ec) {
+      LOG4CXX_WARN(logger, "Channel " << this << " close error : " << ec.message().c_str());
+    }
   }
   handler = ChannelHandlerPtr(); // clear the handler in case it ever referenced the channel*/
 }

Modified: zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/lib/channel.h
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/lib/channel.h?rev=1204437&r1=1204436&r2=1204437&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/lib/channel.h (original)
+++ zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/lib/channel.h Mon Nov 21 10:32:40 2011
@@ -139,6 +139,8 @@ namespace Hedwig {
     boost::shared_mutex state_lock;
 
     bool receiving;
+    bool reading;
+    PubSubResponsePtr outstanding_response;
     boost::mutex receiving_lock;
     
     bool sending;

Modified: zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/lib/subscriberimpl.cpp
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/lib/subscriberimpl.cpp?rev=1204437&r1=1204436&r2=1204437&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/lib/subscriberimpl.cpp (original)
+++ zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/lib/subscriberimpl.cpp Mon Nov 21 10:32:40 2011
@@ -235,24 +235,31 @@ void SubscriberClientChannelHandler::cha
 }
 
 void SubscriberClientChannelHandler::startDelivery(const MessageHandlerCallbackPtr& handler) {
-  boost::lock_guard<boost::shared_mutex> lock(queue_lock);
+  {
+    boost::lock_guard<boost::shared_mutex> lock(queue_lock);
 
-  this->handler = handler;
+    this->handler = handler;
 
-  if (!(this->handler.get())) {
-    // no message handler callback
-    LOG4CXX_WARN(logger, "Handler " << this << " try to start an empty message handler");
-    return;
-  }
+    if (!(this->handler.get())) {
+      // no message handler callback
+      LOG4CXX_WARN(logger, "Handler " << this << " try to start an empty message handler");
+      return;
+    }
 
-  while (!queue.empty()) {    
-    PubSubResponsePtr m = queue.front();
-    queue.pop_front();
+    while (!queue.empty()) {    
+      PubSubResponsePtr m = queue.front();
+      queue.pop_front();
 
-    OperationCallbackPtr callback(new SubscriberConsumeCallback(client, shared_from_this(), origData, m));
+      OperationCallbackPtr callback(new SubscriberConsumeCallback(client, shared_from_this(), origData, m));
 
-    this->handler->consume(origData->getTopic(), origData->getSubscriberId(), m->message(), callback);
+      this->handler->consume(origData->getTopic(), origData->getSubscriberId(), m->message(), callback);
+    }
   }
+
+  // put channel#startReceiving out of lock of subscriber#queue_lock
+  // otherwise we enter dead lock
+  // subscriber#startDelivery(subscriber#queue_lock) =>
+  // channel#startReceiving(channel#receiving_lock) =>
   channel->startReceiving();
 }
 

Modified: zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/scripts/server-control.sh
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/scripts/server-control.sh?rev=1204437&r1=1204436&r2=1204437&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/scripts/server-control.sh (original)
+++ zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/scripts/server-control.sh Mon Nov 21 10:32:40 2011
@@ -95,6 +95,25 @@ start_hw_server () {
     COUNT=$2
     PORT=$((4080+$COUNT))
 
+    export HEDWIG_LOG_CONF=/tmp/hw-log4j-$COUNT.properties
+    cat > $HEDWIG_LOG_CONF <<EOF
+log4j.rootLogger=INFO, ROLLINGFILE
+#
+# Add ROLLINGFILE to rootLogger to get log file output
+#    Log DEBUG level and above messages to a log file
+log4j.appender.ROLLINGFILE=org.apache.log4j.DailyRollingFileAppender
+log4j.appender.ROLLINGFILE.Threshold=DEBUG
+log4j.appender.ROLLINGFILE.File=/tmp/hedwig-server-$COUNT.log
+log4j.appender.ROLLINGFILE.layout=org.apache.log4j.PatternLayout
+log4j.appender.ROLLINGFILE.layout.ConversionPattern=%d{ISO8601} - %-5p - [%t:%C{1}@%L] - %m%n
+# Max log file size of 10MB
+log4j.appender.ROLLINGFILE.MaxFileSize=10MB
+# uncomment the next line to limit number of backup files
+#log4j.appender.ROLLINGFILE.MaxBackupIndex=10
+log4j.appender.ROLLINGFILE.layout=org.apache.log4j.PatternLayout
+log4j.appender.ROLLINGFILE.layout.ConversionPattern=%d{ISO8601} - %-5p [%t:%C{1}@%L] - %m%n
+EOF
+
     export HEDWIG_SERVER_CONF=/tmp/hw-server-$COUNT.conf
     cat > $HEDWIG_SERVER_CONF <<EOF
 zk_host=localhost:2181

Modified: zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/test/pubsubtest.cpp
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/test/pubsubtest.cpp?rev=1204437&r1=1204436&r2=1204437&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/test/pubsubtest.cpp (original)
+++ zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/test/pubsubtest.cpp Mon Nov 21 10:32:40 2011
@@ -42,6 +42,7 @@ class PubSubTestSuite : public CppUnit::
 private:
   CPPUNIT_TEST_SUITE( PubSubTestSuite );
   CPPUNIT_TEST(testPubSubOrderChecking);
+  CPPUNIT_TEST(testRandomDelivery);
   CPPUNIT_TEST(testPubSubContinuousOverClose);
   //  CPPUNIT_TEST(testPubSubContinuousOverServerDown);
   CPPUNIT_TEST(testMultiTopic);
@@ -118,13 +119,17 @@ public:
         int newMsgId = atoi(msg.body().c_str());
         // checking msgId
         LOG4CXX_DEBUG(logger, "received message " << newMsgId);
-        if (isInOrder) {
-          if (newMsgId != startMsgId + 1) {
-            LOG4CXX_ERROR(logger, "received out-of-order message : expected " << (startMsgId + 1) << ", actual " << newMsgId);
-            isInOrder = false;
-          } else {
-            startMsgId = newMsgId;
+        if (startMsgId >= 0) { // need to check ordering if start msg id is larger than 0
+          if (isInOrder) {
+            if (newMsgId != startMsgId + 1) {
+              LOG4CXX_ERROR(logger, "received out-of-order message : expected " << (startMsgId + 1) << ", actual " << newMsgId);
+              isInOrder = false;
+            } else {
+              startMsgId = newMsgId;
+            }
           }
+        } else { // we set first msg id as startMsgId when startMsgId is -1
+          startMsgId = newMsgId;
         }
         callback->operationComplete();
         sleep(sleepTimeInConsume);
@@ -152,22 +157,48 @@ public:
     int sleepTimeInConsume;
   };
 
-  class PubForOrderChecking {
+  // Publisher integer until finished
+  class IntegerPublisher {
   public:
-    PubForOrderChecking(std::string &topic, int startMsgId, int numMsgs, int sleepTime, Hedwig::Publisher &pub)
-      : topic(topic), startMsgId(startMsgId), numMsgs(numMsgs), sleepTime(sleepTime), pub(pub) {
+    IntegerPublisher(std::string &topic, int startMsgId, int numMsgs, int sleepTime, Hedwig::Publisher &pub, long runTime)
+      : topic(topic), startMsgId(startMsgId), numMsgs(numMsgs), sleepTime(sleepTime), pub(pub), running(true), runTime(runTime) {
     }
 
     void operator()() {
-      for (int i=0; i<numMsgs; i++) {
-        int msg = startMsgId + i;
-        std::stringstream ss;
-        ss << msg;
-        pub.publish(topic, ss.str());
-        sleep(sleepTime);
-      }
+      int i = 1;
+      long beginTime = curTime();
+      long elapsedTime = 0;
+
+      while (running) {
+        try {
+          int msg = startMsgId + i;
+          std::stringstream ss;
+          ss << msg;
+          pub.publish(topic, ss.str());
+          sleep(sleepTime);
+          if (numMsgs > 0 && i >= numMsgs) {
+            running = false;
+          } else {
+            if (i % 100 == 0 &&
+                (elapsedTime = (curTime() - beginTime)) >= runTime) {
+              LOG4CXX_DEBUG(logger, "Elapsed time : " << elapsedTime);
+              running = false;
+            }
+          }
+          ++i;
+        } catch (std::exception &e) {
+          LOG4CXX_WARN(logger, "Exception when publishing messages : " << e.what());
+        }
+      } 
     }
 
+    long curTime() {
+      struct timeval tv;
+      long mtime;
+      gettimeofday(&tv, NULL);
+      mtime = tv.tv_sec * 1000 + tv.tv_usec / 1000.0 + 0.5;
+      return mtime;
+    }
 
   private:
     std::string topic;
@@ -175,8 +206,52 @@ public:
     int numMsgs;
     int sleepTime;
     Hedwig::Publisher& pub;
+    bool running;
+    long runTime;
   };
 
+  // test startDelivery / stopDelivery randomly
+  void testRandomDelivery() {
+    std::string topic = "randomDeliveryTopic";
+    std::string subscriber = "mysub-randomDelivery";
+
+    int nLoops = 300;
+    int sleepTimePerLoop = 1;
+    int syncTimeout = 10000;
+
+    Hedwig::Configuration* conf = new TestServerConfiguration(syncTimeout);
+    std::auto_ptr<Hedwig::Configuration> confptr(conf);
+
+    Hedwig::Client* client = new Hedwig::Client(*conf);
+    std::auto_ptr<Hedwig::Client> clientptr(client);
+
+    Hedwig::Subscriber& sub = client->getSubscriber();
+    Hedwig::Publisher& pub = client->getPublisher();
+
+    // subscribe topic
+    sub.subscribe(topic, subscriber, Hedwig::SubscribeRequest::CREATE_OR_ATTACH);
+
+    // start thread to publish message
+    IntegerPublisher intPublisher = IntegerPublisher(topic, 0, 0, 0, pub, nLoops * sleepTimePerLoop * 1000);
+    boost::thread pubThread(intPublisher);
+
+    // start random delivery
+    MyOrderCheckingMessageHandlerCallback* cb =
+      new MyOrderCheckingMessageHandlerCallback(topic, subscriber, 0, 0);
+    Hedwig::MessageHandlerCallbackPtr handler(cb);
+
+    for (int i = 0; i < nLoops; i++) {
+      LOG4CXX_DEBUG(logger, "Randomly Delivery : " << i);
+      sub.startDelivery(topic, subscriber, handler);
+      // sleep random time
+      sleep(sleepTimePerLoop);
+      sub.stopDelivery(topic, subscriber);
+      CPPUNIT_ASSERT(cb->inOrder());
+    }
+    
+    pubThread.join();
+  }
+
   // check message ordering
   void testPubSubOrderChecking() {
     std::string topic = "orderCheckingTopic";
@@ -204,17 +279,17 @@ public:
     
     // we don't start delivery first, so the message will be queued
     // publish ${numMessages} messages, so the messages will be queued
-    for (int i=0; i<numMessages; i++) {
+    for (int i=1; i<=numMessages; i++) {
       std::stringstream ss;
       ss << i;
       pub.publish(topic, ss.str()); 
     }
 
-    MyOrderCheckingMessageHandlerCallback* cb = new MyOrderCheckingMessageHandlerCallback(topic, sid, -1, sleepTimeInConsume);
+    MyOrderCheckingMessageHandlerCallback* cb = new MyOrderCheckingMessageHandlerCallback(topic, sid, 0, sleepTimeInConsume);
     Hedwig::MessageHandlerCallbackPtr handler(cb);
 
     // create a thread to publish another ${numMessages} messages
-    boost::thread pubThread(PubForOrderChecking(topic, numMessages, numMessages, sleepTimeInConsume, pub));
+    boost::thread pubThread(IntegerPublisher(topic, numMessages, numMessages, sleepTimeInConsume, pub, 0));
 
     // start delivery will consumed the queued messages
     // new message will recevied and the queued message should be consumed