You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zookeeper.apache.org by iv...@apache.org on 2012/10/01 15:36:32 UTC

svn commit: r1392319 [2/2] - in /zookeeper/bookkeeper/trunk: ./ hedwig-client/src/main/cpp/ hedwig-client/src/main/cpp/inc/hedwig/ hedwig-client/src/main/cpp/lib/ hedwig-client/src/main/cpp/scripts/ hedwig-client/src/main/cpp/test/

Modified: zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/test/pubsubtest.cpp
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/test/pubsubtest.cpp?rev=1392319&r1=1392318&r2=1392319&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/test/pubsubtest.cpp (original)
+++ zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/test/pubsubtest.cpp Mon Oct  1 13:36:31 2012
@@ -75,23 +75,29 @@ protected:
 class PubSubOrderCheckingMessageHandlerCallback : public Hedwig::MessageHandlerCallback {
 public:
   PubSubOrderCheckingMessageHandlerCallback(const std::string& topic, const std::string& subscriberId, const int startMsgId, const int sleepTimeInConsume)
-    : messagesReceived(0), topic(topic), subscriberId(subscriberId), startMsgId(startMsgId), 
-      isInOrder(true), sleepTimeInConsume(sleepTimeInConsume) {
+    : topic(topic), subscriberId(subscriberId), startMsgId(startMsgId),
+      nextMsgId(startMsgId), isInOrder(true), sleepTimeInConsume(sleepTimeInConsume) {
   }
 
   virtual void consume(const std::string& topic, const std::string& subscriberId,
 		       const Hedwig::Message& msg, Hedwig::OperationCallbackPtr& callback) {
     if (topic == this->topic && subscriberId == this->subscriberId) {
       boost::lock_guard<boost::mutex> lock(mutex);
-            
-      messagesReceived++;
 
       int newMsgId = atoi(msg.body().c_str());
+      if (newMsgId == nextMsgId + 1) {
+        // only calculate unduplicated entries
+        ++nextMsgId;
+      }
+
       // checking msgId
       LOG4CXX_DEBUG(logger, "received message " << newMsgId);
       if (startMsgId >= 0) { // need to check ordering if start msg id is larger than 0
 	if (isInOrder) {
-	  if (newMsgId != startMsgId + 1) {
+          // in some environments, ssl channel encountering error like Bad File Descriptor.
+          // the channel would disconnect and reconnect. A duplicated message would be received.
+          // so just checking we received a larger out-of-order message.
+	  if (newMsgId > startMsgId + 1) {
 	    LOG4CXX_ERROR(logger, "received out-of-order message : expected " << (startMsgId + 1) << ", actual " << newMsgId);
 	    isInOrder = false;
 	  } else {
@@ -106,10 +112,9 @@ public:
     }
   }
     
-  int numMessagesReceived() {
+  int nextExpectedMsgId() {
     boost::lock_guard<boost::mutex> lock(mutex);
-    int i = messagesReceived;
-    return i;
+    return nextMsgId;
   }    
 
   bool inOrder() {
@@ -119,10 +124,10 @@ public:
     
 protected:
   boost::mutex mutex;
-  int messagesReceived;
   std::string topic;
   std::string subscriberId;
   int startMsgId;
+  int nextMsgId;
   bool isInOrder;
   int sleepTimeInConsume;
 };
@@ -271,7 +276,7 @@ TEST(PubSubTest, testRandomDelivery) {
 
    for (int i = 0; i < 10; i++) {
      sleep(3);
-     if (cb->numMessagesReceived() == 2 * numMessages) {
+     if (cb->nextExpectedMsgId() == 2 * numMessages) {
        break;
      }
    }
@@ -329,7 +334,7 @@ TEST(PubSubTest, testRandomDelivery) {
      PubSubOrderCheckingMessageHandlerCallback *cb =
        (PubSubOrderCheckingMessageHandlerCallback *)(callbacks[j].get());
      for (int i = 0; i < 10; i++) {
-       if (cb->numMessagesReceived() == numMessages) {
+       if (cb->nextExpectedMsgId() == numMessages) {
 	 break;
        }
        sleep(3);
@@ -339,7 +344,6 @@ TEST(PubSubTest, testRandomDelivery) {
    callbacks.clear();
  }
 
-
  TEST(PubSubTest, testPubSubContinuousOverClose) {
    std::string topic = "pubSubTopic";
    std::string sid = "MySubscriberid-1";

Modified: zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/test/util.h
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/test/util.h?rev=1392319&r1=1392318&r2=1392319&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/test/util.h (original)
+++ zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/test/util.h Mon Oct  1 13:36:31 2012
@@ -136,9 +136,11 @@ private:
 
 class TestServerConfiguration : public Hedwig::Configuration {
 public:
-  TestServerConfiguration() : address("localhost:4081"), syncTimeout(10000), numThreads(2) {}
+  TestServerConfiguration() : address("localhost:4081:9877"),
+                              syncTimeout(10000), numThreads(2) {}
 
-  TestServerConfiguration(int syncTimeout, int numThreads = 2) : address("localhost:4081"), syncTimeout(syncTimeout), numThreads(numThreads) {}
+  TestServerConfiguration(int syncTimeout, int numThreads = 2)
+    : address("localhost:4081:9877"), syncTimeout(syncTimeout), numThreads(numThreads) {}
   
   virtual int getInt(const std::string& key, int defaultVal) const {
     if (key == Configuration::SYNC_REQUEST_TIMEOUT) {
@@ -152,15 +154,23 @@ public:
   virtual const std::string get(const std::string& key, const std::string& defaultVal) const {
     if (key == Configuration::DEFAULT_SERVER) {
       return address;
+    } else if (key == Configuration::SSL_PEM_FILE) {
+      return certFile;
     } else {
       return defaultVal;
     }
   }
 
-  virtual bool getBool(const std::string& /*key*/, bool defaultVal) const {
+  virtual bool getBool(const std::string& key, bool defaultVal) const {
+    if (key == Configuration::RUN_AS_SSL_MODE) {
+      return isSSL;
+    }
     return defaultVal;
   }
-  
+public:
+  // for testing
+  static bool isSSL;
+  static std::string certFile;
 private:
   const std::string address;
   const int syncTimeout;