You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zookeeper.apache.org by iv...@apache.org on 2011/11/21 11:32:40 UTC
svn commit: r1204437 - in /zookeeper/bookkeeper/trunk: ./
hedwig-client/src/main/cpp/lib/ hedwig-client/src/main/cpp/scripts/
hedwig-client/src/main/cpp/test/
Author: ivank
Date: Mon Nov 21 10:32:40 2011
New Revision: 1204437
URL: http://svn.apache.org/viewvc?rev=1204437&view=rev
Log:
BOOKKEEPER-79: randomly startDelivery/stopDelivery will core dump in c++ hedwig client (Sijie Guo via ivank)
Modified:
zookeeper/bookkeeper/trunk/CHANGES.txt
zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/lib/channel.cpp
zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/lib/channel.h
zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/lib/subscriberimpl.cpp
zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/scripts/server-control.sh
zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/test/pubsubtest.cpp
Modified: zookeeper/bookkeeper/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/CHANGES.txt?rev=1204437&r1=1204436&r2=1204437&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/CHANGES.txt (original)
+++ zookeeper/bookkeeper/trunk/CHANGES.txt Mon Nov 21 10:32:40 2011
@@ -106,6 +106,8 @@ BUGFIXES:
BOOKKEEPER-87: TestHedwigHub exhausts direct buffer memory with netty 3.2.4.Final (ivank via fpj)
+ BOOKKEEPER-79: randomly startDelivery/stopDelivery will core dump in c++ hedwig client (Sijie Guo via ivank)
+
IMPROVEMENTS:
BOOKKEEPER-28: Create useful startup scripts for bookkeeper and hedwig (ivank)
Modified: zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/lib/channel.cpp
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/lib/channel.cpp?rev=1204437&r1=1204436&r2=1204437&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/lib/channel.cpp (original)
+++ zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/lib/channel.cpp Mon Nov 21 10:32:40 2011
@@ -52,7 +52,7 @@ DuplexChannel::DuplexChannel(EventDispat
const Configuration& cfg, const ChannelHandlerPtr& handler)
: dispatcher(dispatcher), address(addr), handler(handler),
socket(dispatcher.getService()), instream(&in_buf), copy_buf(NULL), copy_buf_length(0),
- state(UNINITIALISED), receiving(false), sending(false)
+ state(UNINITIALISED), receiving(false), reading(false), sending(false)
{
LOG4CXX_DEBUG(logger, "Creating DuplexChannel(" << this << ")");
}
@@ -140,6 +140,21 @@ void DuplexChannel::connect() {
h = channel->handler;
}
}
+
+ // channel did stopReceiving, we should not call #messageReceived
+ // store this response in outstanding_response variable and did stop receiving
+ // when we startReceiving again, we can process this last response.
+ {
+ boost::lock_guard<boost::mutex> lock(channel->receiving_lock);
+ if (!channel->isReceiving()) {
+ // queue the response
+ channel->outstanding_response = response;
+ channel->reading = false;
+ return;
+ }
+ }
+
+ // channel is still in receiving status
if (h.get()) {
h->messageReceived(channel, response);
}
@@ -188,10 +203,6 @@ void DuplexChannel::connect() {
}
/*static*/ void DuplexChannel::readSize(DuplexChannelPtr channel) {
- if (!channel->isReceiving()) {
- return;
- }
-
int toread = sizeof(uint32_t) - channel->in_buf.size();
LOG4CXX_DEBUG(logger, " size of incoming message " << sizeof(uint32_t)
<< ", currently in buffer " << channel->in_buf.size()
@@ -212,14 +223,56 @@ void DuplexChannel::connect() {
void DuplexChannel::startReceiving() {
LOG4CXX_DEBUG(logger, "DuplexChannel::startReceiving channel(" << this << ") currently receiving = " << receiving);
-
- boost::lock_guard<boost::mutex> lock(receiving_lock);
- if (receiving) {
- return;
- }
- receiving = true;
-
- DuplexChannel::readSize(shared_from_this());
+
+ PubSubResponsePtr response;
+ bool inReadingState;
+ {
+ boost::lock_guard<boost::mutex> lock(receiving_lock);
+ // receiving before just return
+ if (receiving) {
+ return;
+ }
+ receiving = true;
+
+ // if we have last response collected in previous startReceiving
+ // we need to process it, but we should process it under receiving_lock
+ // otherwise we enter dead lock
+ // subscriber#startDelivery(subscriber#queue_lock) =>
+ // channel#startReceiving(channel#receiving_lock) =>
+ // sbuscriber#messageReceived(subscriber#queue_lock)
+ if (outstanding_response.get()) {
+ response = outstanding_response;
+ outstanding_response = PubSubResponsePtr();
+ }
+
+ // if channel is in reading status wait data from remote server
+ // we don't need to insert another readSize op
+ inReadingState = reading;
+ if (!reading) {
+ reading = true;
+ }
+ }
+
+ // consume message buffered in receiving queue
+ // there is at most one message buffered when we
+ // stopReceiving between #readSize and #readMsgBody
+ if (response.get()) {
+ ChannelHandlerPtr h;
+ {
+ boost::shared_lock<boost::shared_mutex> lock(this->destruction_lock);
+ if (this->handler.get()) {
+ h = this->handler;
+ }
+ }
+ if (h.get()) {
+ h->messageReceived(shared_from_this(), response);
+ }
+ }
+
+ // if channel is not in reading state, #readSize
+ if (!inReadingState) {
+ DuplexChannel::readSize(shared_from_this());
+ }
}
bool DuplexChannel::isReceiving() {
@@ -320,9 +373,19 @@ void DuplexChannel::kill() {
if (connected) {
setState(DEAD);
- socket.cancel();
- socket.shutdown(boost::asio::ip::tcp::socket::shutdown_both);
- socket.close();
+ boost::system::error_code ec;
+ socket.cancel(ec);
+ if (ec) {
+ LOG4CXX_WARN(logger, "Channel " << this << " canceling io error : " << ec.message().c_str());
+ }
+ socket.shutdown(boost::asio::ip::tcp::socket::shutdown_both, ec);
+ if (ec) {
+ LOG4CXX_WARN(logger, "Channel " << this << " shutdown error : " << ec.message().c_str());
+ }
+ socket.close(ec);
+ if (ec) {
+ LOG4CXX_WARN(logger, "Channel " << this << " close error : " << ec.message().c_str());
+ }
}
handler = ChannelHandlerPtr(); // clear the handler in case it ever referenced the channel*/
}
Modified: zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/lib/channel.h
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/lib/channel.h?rev=1204437&r1=1204436&r2=1204437&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/lib/channel.h (original)
+++ zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/lib/channel.h Mon Nov 21 10:32:40 2011
@@ -139,6 +139,8 @@ namespace Hedwig {
boost::shared_mutex state_lock;
bool receiving;
+ bool reading;
+ PubSubResponsePtr outstanding_response;
boost::mutex receiving_lock;
bool sending;
Modified: zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/lib/subscriberimpl.cpp
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/lib/subscriberimpl.cpp?rev=1204437&r1=1204436&r2=1204437&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/lib/subscriberimpl.cpp (original)
+++ zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/lib/subscriberimpl.cpp Mon Nov 21 10:32:40 2011
@@ -235,24 +235,31 @@ void SubscriberClientChannelHandler::cha
}
void SubscriberClientChannelHandler::startDelivery(const MessageHandlerCallbackPtr& handler) {
- boost::lock_guard<boost::shared_mutex> lock(queue_lock);
+ {
+ boost::lock_guard<boost::shared_mutex> lock(queue_lock);
- this->handler = handler;
+ this->handler = handler;
- if (!(this->handler.get())) {
- // no message handler callback
- LOG4CXX_WARN(logger, "Handler " << this << " try to start an empty message handler");
- return;
- }
+ if (!(this->handler.get())) {
+ // no message handler callback
+ LOG4CXX_WARN(logger, "Handler " << this << " try to start an empty message handler");
+ return;
+ }
- while (!queue.empty()) {
- PubSubResponsePtr m = queue.front();
- queue.pop_front();
+ while (!queue.empty()) {
+ PubSubResponsePtr m = queue.front();
+ queue.pop_front();
- OperationCallbackPtr callback(new SubscriberConsumeCallback(client, shared_from_this(), origData, m));
+ OperationCallbackPtr callback(new SubscriberConsumeCallback(client, shared_from_this(), origData, m));
- this->handler->consume(origData->getTopic(), origData->getSubscriberId(), m->message(), callback);
+ this->handler->consume(origData->getTopic(), origData->getSubscriberId(), m->message(), callback);
+ }
}
+
+ // put channel#startReceiving out of lock of subscriber#queue_lock
+ // otherwise we enter dead lock
+ // subscriber#startDelivery(subscriber#queue_lock) =>
+ // channel#startReceiving(channel#receiving_lock) =>
channel->startReceiving();
}
Modified: zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/scripts/server-control.sh
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/scripts/server-control.sh?rev=1204437&r1=1204436&r2=1204437&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/scripts/server-control.sh (original)
+++ zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/scripts/server-control.sh Mon Nov 21 10:32:40 2011
@@ -95,6 +95,25 @@ start_hw_server () {
COUNT=$2
PORT=$((4080+$COUNT))
+ export HEDWIG_LOG_CONF=/tmp/hw-log4j-$COUNT.properties
+ cat > $HEDWIG_LOG_CONF <<EOF
+log4j.rootLogger=INFO, ROLLINGFILE
+#
+# Add ROLLINGFILE to rootLogger to get log file output
+# Log DEBUG level and above messages to a log file
+log4j.appender.ROLLINGFILE=org.apache.log4j.DailyRollingFileAppender
+log4j.appender.ROLLINGFILE.Threshold=DEBUG
+log4j.appender.ROLLINGFILE.File=/tmp/hedwig-server-$COUNT.log
+log4j.appender.ROLLINGFILE.layout=org.apache.log4j.PatternLayout
+log4j.appender.ROLLINGFILE.layout.ConversionPattern=%d{ISO8601} - %-5p - [%t:%C{1}@%L] - %m%n
+# Max log file size of 10MB
+log4j.appender.ROLLINGFILE.MaxFileSize=10MB
+# uncomment the next line to limit number of backup files
+#log4j.appender.ROLLINGFILE.MaxBackupIndex=10
+log4j.appender.ROLLINGFILE.layout=org.apache.log4j.PatternLayout
+log4j.appender.ROLLINGFILE.layout.ConversionPattern=%d{ISO8601} - %-5p [%t:%C{1}@%L] - %m%n
+EOF
+
export HEDWIG_SERVER_CONF=/tmp/hw-server-$COUNT.conf
cat > $HEDWIG_SERVER_CONF <<EOF
zk_host=localhost:2181
Modified: zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/test/pubsubtest.cpp
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/test/pubsubtest.cpp?rev=1204437&r1=1204436&r2=1204437&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/test/pubsubtest.cpp (original)
+++ zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/test/pubsubtest.cpp Mon Nov 21 10:32:40 2011
@@ -42,6 +42,7 @@ class PubSubTestSuite : public CppUnit::
private:
CPPUNIT_TEST_SUITE( PubSubTestSuite );
CPPUNIT_TEST(testPubSubOrderChecking);
+ CPPUNIT_TEST(testRandomDelivery);
CPPUNIT_TEST(testPubSubContinuousOverClose);
// CPPUNIT_TEST(testPubSubContinuousOverServerDown);
CPPUNIT_TEST(testMultiTopic);
@@ -118,13 +119,17 @@ public:
int newMsgId = atoi(msg.body().c_str());
// checking msgId
LOG4CXX_DEBUG(logger, "received message " << newMsgId);
- if (isInOrder) {
- if (newMsgId != startMsgId + 1) {
- LOG4CXX_ERROR(logger, "received out-of-order message : expected " << (startMsgId + 1) << ", actual " << newMsgId);
- isInOrder = false;
- } else {
- startMsgId = newMsgId;
+ if (startMsgId >= 0) { // need to check ordering if start msg id is larger than 0
+ if (isInOrder) {
+ if (newMsgId != startMsgId + 1) {
+ LOG4CXX_ERROR(logger, "received out-of-order message : expected " << (startMsgId + 1) << ", actual " << newMsgId);
+ isInOrder = false;
+ } else {
+ startMsgId = newMsgId;
+ }
}
+ } else { // we set first msg id as startMsgId when startMsgId is -1
+ startMsgId = newMsgId;
}
callback->operationComplete();
sleep(sleepTimeInConsume);
@@ -152,22 +157,48 @@ public:
int sleepTimeInConsume;
};
- class PubForOrderChecking {
+ // Publisher integer until finished
+ class IntegerPublisher {
public:
- PubForOrderChecking(std::string &topic, int startMsgId, int numMsgs, int sleepTime, Hedwig::Publisher &pub)
- : topic(topic), startMsgId(startMsgId), numMsgs(numMsgs), sleepTime(sleepTime), pub(pub) {
+ IntegerPublisher(std::string &topic, int startMsgId, int numMsgs, int sleepTime, Hedwig::Publisher &pub, long runTime)
+ : topic(topic), startMsgId(startMsgId), numMsgs(numMsgs), sleepTime(sleepTime), pub(pub), running(true), runTime(runTime) {
}
void operator()() {
- for (int i=0; i<numMsgs; i++) {
- int msg = startMsgId + i;
- std::stringstream ss;
- ss << msg;
- pub.publish(topic, ss.str());
- sleep(sleepTime);
- }
+ int i = 1;
+ long beginTime = curTime();
+ long elapsedTime = 0;
+
+ while (running) {
+ try {
+ int msg = startMsgId + i;
+ std::stringstream ss;
+ ss << msg;
+ pub.publish(topic, ss.str());
+ sleep(sleepTime);
+ if (numMsgs > 0 && i >= numMsgs) {
+ running = false;
+ } else {
+ if (i % 100 == 0 &&
+ (elapsedTime = (curTime() - beginTime)) >= runTime) {
+ LOG4CXX_DEBUG(logger, "Elapsed time : " << elapsedTime);
+ running = false;
+ }
+ }
+ ++i;
+ } catch (std::exception &e) {
+ LOG4CXX_WARN(logger, "Exception when publishing messages : " << e.what());
+ }
+ }
}
+ long curTime() {
+ struct timeval tv;
+ long mtime;
+ gettimeofday(&tv, NULL);
+ mtime = tv.tv_sec * 1000 + tv.tv_usec / 1000.0 + 0.5;
+ return mtime;
+ }
private:
std::string topic;
@@ -175,8 +206,52 @@ public:
int numMsgs;
int sleepTime;
Hedwig::Publisher& pub;
+ bool running;
+ long runTime;
};
+ // test startDelivery / stopDelivery randomly
+ void testRandomDelivery() {
+ std::string topic = "randomDeliveryTopic";
+ std::string subscriber = "mysub-randomDelivery";
+
+ int nLoops = 300;
+ int sleepTimePerLoop = 1;
+ int syncTimeout = 10000;
+
+ Hedwig::Configuration* conf = new TestServerConfiguration(syncTimeout);
+ std::auto_ptr<Hedwig::Configuration> confptr(conf);
+
+ Hedwig::Client* client = new Hedwig::Client(*conf);
+ std::auto_ptr<Hedwig::Client> clientptr(client);
+
+ Hedwig::Subscriber& sub = client->getSubscriber();
+ Hedwig::Publisher& pub = client->getPublisher();
+
+ // subscribe topic
+ sub.subscribe(topic, subscriber, Hedwig::SubscribeRequest::CREATE_OR_ATTACH);
+
+ // start thread to publish message
+ IntegerPublisher intPublisher = IntegerPublisher(topic, 0, 0, 0, pub, nLoops * sleepTimePerLoop * 1000);
+ boost::thread pubThread(intPublisher);
+
+ // start random delivery
+ MyOrderCheckingMessageHandlerCallback* cb =
+ new MyOrderCheckingMessageHandlerCallback(topic, subscriber, 0, 0);
+ Hedwig::MessageHandlerCallbackPtr handler(cb);
+
+ for (int i = 0; i < nLoops; i++) {
+ LOG4CXX_DEBUG(logger, "Randomly Delivery : " << i);
+ sub.startDelivery(topic, subscriber, handler);
+ // sleep random time
+ sleep(sleepTimePerLoop);
+ sub.stopDelivery(topic, subscriber);
+ CPPUNIT_ASSERT(cb->inOrder());
+ }
+
+ pubThread.join();
+ }
+
// check message ordering
void testPubSubOrderChecking() {
std::string topic = "orderCheckingTopic";
@@ -204,17 +279,17 @@ public:
// we don't start delivery first, so the message will be queued
// publish ${numMessages} messages, so the messages will be queued
- for (int i=0; i<numMessages; i++) {
+ for (int i=1; i<=numMessages; i++) {
std::stringstream ss;
ss << i;
pub.publish(topic, ss.str());
}
- MyOrderCheckingMessageHandlerCallback* cb = new MyOrderCheckingMessageHandlerCallback(topic, sid, -1, sleepTimeInConsume);
+ MyOrderCheckingMessageHandlerCallback* cb = new MyOrderCheckingMessageHandlerCallback(topic, sid, 0, sleepTimeInConsume);
Hedwig::MessageHandlerCallbackPtr handler(cb);
// create a thread to publish another ${numMessages} messages
- boost::thread pubThread(PubForOrderChecking(topic, numMessages, numMessages, sleepTimeInConsume, pub));
+ boost::thread pubThread(IntegerPublisher(topic, numMessages, numMessages, sleepTimeInConsume, pub, 0));
// start delivery will consumed the queued messages
// new message will recevied and the queued message should be consumed