You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zookeeper.apache.org by iv...@apache.org on 2012/10/01 15:36:32 UTC
svn commit: r1392319 [2/2] - in /zookeeper/bookkeeper/trunk: ./
hedwig-client/src/main/cpp/ hedwig-client/src/main/cpp/inc/hedwig/
hedwig-client/src/main/cpp/lib/ hedwig-client/src/main/cpp/scripts/
hedwig-client/src/main/cpp/test/
Modified: zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/test/pubsubtest.cpp
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/test/pubsubtest.cpp?rev=1392319&r1=1392318&r2=1392319&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/test/pubsubtest.cpp (original)
+++ zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/test/pubsubtest.cpp Mon Oct 1 13:36:31 2012
@@ -75,23 +75,29 @@ protected:
class PubSubOrderCheckingMessageHandlerCallback : public Hedwig::MessageHandlerCallback {
public:
PubSubOrderCheckingMessageHandlerCallback(const std::string& topic, const std::string& subscriberId, const int startMsgId, const int sleepTimeInConsume)
- : messagesReceived(0), topic(topic), subscriberId(subscriberId), startMsgId(startMsgId),
- isInOrder(true), sleepTimeInConsume(sleepTimeInConsume) {
+ : topic(topic), subscriberId(subscriberId), startMsgId(startMsgId),
+ nextMsgId(startMsgId), isInOrder(true), sleepTimeInConsume(sleepTimeInConsume) {
}
virtual void consume(const std::string& topic, const std::string& subscriberId,
const Hedwig::Message& msg, Hedwig::OperationCallbackPtr& callback) {
if (topic == this->topic && subscriberId == this->subscriberId) {
boost::lock_guard<boost::mutex> lock(mutex);
-
- messagesReceived++;
int newMsgId = atoi(msg.body().c_str());
+ if (newMsgId == nextMsgId + 1) {
+ // only calculate unduplicated entries
+ ++nextMsgId;
+ }
+
// checking msgId
LOG4CXX_DEBUG(logger, "received message " << newMsgId);
if (startMsgId >= 0) { // need to check ordering if start msg id is larger than 0
if (isInOrder) {
- if (newMsgId != startMsgId + 1) {
+ // in some environments, ssl channel encountering error like Bad File Descriptor.
+ // the channel would disconnect and reconnect. A duplicated message would be received.
+ // so just checking we received a larger out-of-order message.
+ if (newMsgId > startMsgId + 1) {
LOG4CXX_ERROR(logger, "received out-of-order message : expected " << (startMsgId + 1) << ", actual " << newMsgId);
isInOrder = false;
} else {
@@ -106,10 +112,9 @@ public:
}
}
- int numMessagesReceived() {
+ int nextExpectedMsgId() {
boost::lock_guard<boost::mutex> lock(mutex);
- int i = messagesReceived;
- return i;
+ return nextMsgId;
}
bool inOrder() {
@@ -119,10 +124,10 @@ public:
protected:
boost::mutex mutex;
- int messagesReceived;
std::string topic;
std::string subscriberId;
int startMsgId;
+ int nextMsgId;
bool isInOrder;
int sleepTimeInConsume;
};
@@ -271,7 +276,7 @@ TEST(PubSubTest, testRandomDelivery) {
for (int i = 0; i < 10; i++) {
sleep(3);
- if (cb->numMessagesReceived() == 2 * numMessages) {
+ if (cb->nextExpectedMsgId() == 2 * numMessages) {
break;
}
}
@@ -329,7 +334,7 @@ TEST(PubSubTest, testRandomDelivery) {
PubSubOrderCheckingMessageHandlerCallback *cb =
(PubSubOrderCheckingMessageHandlerCallback *)(callbacks[j].get());
for (int i = 0; i < 10; i++) {
- if (cb->numMessagesReceived() == numMessages) {
+ if (cb->nextExpectedMsgId() == numMessages) {
break;
}
sleep(3);
@@ -339,7 +344,6 @@ TEST(PubSubTest, testRandomDelivery) {
callbacks.clear();
}
-
TEST(PubSubTest, testPubSubContinuousOverClose) {
std::string topic = "pubSubTopic";
std::string sid = "MySubscriberid-1";
Modified: zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/test/util.h
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/test/util.h?rev=1392319&r1=1392318&r2=1392319&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/test/util.h (original)
+++ zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/test/util.h Mon Oct 1 13:36:31 2012
@@ -136,9 +136,11 @@ private:
class TestServerConfiguration : public Hedwig::Configuration {
public:
- TestServerConfiguration() : address("localhost:4081"), syncTimeout(10000), numThreads(2) {}
+ TestServerConfiguration() : address("localhost:4081:9877"),
+ syncTimeout(10000), numThreads(2) {}
- TestServerConfiguration(int syncTimeout, int numThreads = 2) : address("localhost:4081"), syncTimeout(syncTimeout), numThreads(numThreads) {}
+ TestServerConfiguration(int syncTimeout, int numThreads = 2)
+ : address("localhost:4081:9877"), syncTimeout(syncTimeout), numThreads(numThreads) {}
virtual int getInt(const std::string& key, int defaultVal) const {
if (key == Configuration::SYNC_REQUEST_TIMEOUT) {
@@ -152,15 +154,23 @@ public:
virtual const std::string get(const std::string& key, const std::string& defaultVal) const {
if (key == Configuration::DEFAULT_SERVER) {
return address;
+ } else if (key == Configuration::SSL_PEM_FILE) {
+ return certFile;
} else {
return defaultVal;
}
}
- virtual bool getBool(const std::string& /*key*/, bool defaultVal) const {
+ virtual bool getBool(const std::string& key, bool defaultVal) const {
+ if (key == Configuration::RUN_AS_SSL_MODE) {
+ return isSSL;
+ }
return defaultVal;
}
-
+public:
+ // for testing
+ static bool isSSL;
+ static std::string certFile;
private:
const std::string address;
const int syncTimeout;