You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zookeeper.apache.org by iv...@apache.org on 2012/10/01 15:36:32 UTC

svn commit: r1392319 [1/2] - in /zookeeper/bookkeeper/trunk: ./ hedwig-client/src/main/cpp/ hedwig-client/src/main/cpp/inc/hedwig/ hedwig-client/src/main/cpp/lib/ hedwig-client/src/main/cpp/scripts/ hedwig-client/src/main/cpp/test/

Author: ivank
Date: Mon Oct  1 13:36:31 2012
New Revision: 1392319

URL: http://svn.apache.org/viewvc?rev=1392319&view=rev
Log:
BOOKKEEPER-143: Add SSL support for hedwig cpp client (sijie via ivank)

Modified:
    zookeeper/bookkeeper/trunk/CHANGES.txt
    zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/Makefile.am
    zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/configure.ac
    zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/inc/hedwig/client.h
    zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/lib/channel.cpp
    zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/lib/channel.h
    zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/lib/client.cpp
    zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/lib/clientimpl.cpp
    zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/lib/clientimpl.h
    zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/lib/eventdispatcher.cpp
    zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/lib/eventdispatcher.h
    zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/lib/subscriberimpl.cpp
    zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/lib/subscriberimpl.h
    zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/lib/util.cpp
    zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/lib/util.h
    zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/scripts/log4cxx.conf
    zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/scripts/server-control.sh
    zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/scripts/tester.sh
    zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/test/Makefile.am
    zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/test/main.cpp
    zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/test/messageboundtest.cpp
    zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/test/messagefiltertest.cpp
    zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/test/pubsubtest.cpp
    zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/test/util.h

Modified: zookeeper/bookkeeper/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/CHANGES.txt?rev=1392319&r1=1392318&r2=1392319&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/CHANGES.txt (original)
+++ zookeeper/bookkeeper/trunk/CHANGES.txt Mon Oct  1 13:36:31 2012
@@ -176,6 +176,8 @@ Trunk (unreleased changes)
 
         BOOKKEEPER-364: re-factor hedwig java client to support both one-subscription-per-channel and multiplex-subscriptions-per-channel. (sijie via ivank)
 
+        BOOKKEEPER-143: Add SSL support for hedwig cpp client (sijie via ivank)
+
 Release 4.1.0 - 2012-06-07
 
   Non-backward compatible changes:

Modified: zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/Makefile.am
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/Makefile.am?rev=1392319&r1=1392318&r2=1392319&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/Makefile.am (original)
+++ zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/Makefile.am Mon Oct  1 13:36:31 2012
@@ -27,3 +27,12 @@ pkgconfigdir = $(libdir)/pkgconfig
 nodist_pkgconfig_DATA = hedwig-0.1.pc
 
 EXTRA_DIST = $(DX_CONFIG) doc/html
+
+check:
+	cd test; make check
+
+sslcheck:
+	cd test; make sslcheck
+
+simplecheck:
+	cd test; make simplecheck

Modified: zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/configure.ac
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/configure.ac?rev=1392319&r1=1392318&r2=1392319&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/configure.ac (original)
+++ zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/configure.ac Mon Oct  1 13:36:31 2012
@@ -26,7 +26,7 @@ AC_LANG([C++])
 AC_CONFIG_FILES([Makefile lib/Makefile test/Makefile hedwig-0.1.pc])
 AC_PROG_LIBTOOL
 AC_CONFIG_MACRO_DIR([m4])
-PKG_CHECK_MODULES([DEPS], [liblog4cxx protobuf])
+PKG_CHECK_MODULES([DEPS], [liblog4cxx protobuf openssl])
 
 GTEST_LIB_CHECK([1.5.0], [AC_MSG_RESULT([GoogleTest found, Tests Enabled])],
                          [AC_MSG_WARN([GoogleTest not found, Tests disabled])])

Modified: zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/inc/hedwig/client.h
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/inc/hedwig/client.h?rev=1392319&r1=1392318&r2=1392319&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/inc/hedwig/client.h (original)
+++ zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/inc/hedwig/client.h Mon Oct  1 13:36:31 2012
@@ -47,6 +47,8 @@ namespace Hedwig {
     static const std::string SYNC_REQUEST_TIMEOUT;
     static const std::string SUBSCRIBER_AUTOCONSUME;
     static const std::string NUM_DISPATCH_THREADS;
+    static const std::string RUN_AS_SSL_MODE;
+    static const std::string SSL_PEM_FILE;
     /**
      * The maximum number of messages the hub will queue for subscriptions
      * created using this configuration. The hub will always queue the most

Modified: zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/lib/channel.cpp
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/lib/channel.cpp?rev=1392319&r1=1392318&r2=1392319&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/lib/channel.cpp (original)
+++ zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/lib/channel.cpp Mon Oct  1 13:36:31 2012
@@ -15,7 +15,6 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-	
 
 #ifdef HAVE_CONFIG_H
 #include <config.h>
@@ -48,63 +47,109 @@ static log4cxx::LoggerPtr logger(log4cxx
 
 using namespace Hedwig;
 
-DuplexChannel::DuplexChannel(EventDispatcher& dispatcher, const HostAddress& addr, 
-			     const Configuration& cfg, const ChannelHandlerPtr& handler)
-  : dispatcher(dispatcher), address(addr), handler(handler), service(dispatcher.getService()),
-    socket(service), instream(&in_buf), copy_buf(NULL), copy_buf_length(0),
-    state(UNINITIALISED), receiving(false), reading(false), sending(false)
-{
-  LOG4CXX_DEBUG(logger, "Creating DuplexChannel(" << this << ")");
+const bool DEFAULT_SSL_ENABLED = false;
+const std::string DEFAULT_SSL_PEM_FILE = "";
+
+AbstractDuplexChannel::AbstractDuplexChannel(IOServicePtr& service,
+                                             const HostAddress& addr, 
+                                             const ChannelHandlerPtr& handler)
+  : address(addr), handler(handler), service(service->getService()),
+    instream(&in_buf), copy_buf(NULL), copy_buf_length(0),
+    state(UNINITIALISED), receiving(false), reading(false), sending(false),
+    closed(false)
+{}
+
+AbstractDuplexChannel::~AbstractDuplexChannel() {
+  free(copy_buf);
+  copy_buf = NULL;
+  copy_buf_length = 0;
+
+  LOG4CXX_INFO(logger, "Destroying DuplexChannel(" << this << ")");
 }
 
-/*static*/ void DuplexChannel::connectCallbackHandler(DuplexChannelPtr channel,
-						      const boost::system::error_code& error) {
-  LOG4CXX_DEBUG(logger,"DuplexChannel::connectCallbackHandler error(" << error 
-		<< ") channel(" << channel.get() << ")");
+/*static*/ void AbstractDuplexChannel::connectCallbackHandler(
+                  AbstractDuplexChannelPtr channel,
+                  OperationCallbackPtr callback,
+                  const boost::system::error_code& error) {
+  channel->doAfterConnect(callback, error);
+}
 
+void AbstractDuplexChannel::connect() {
+  connect(OperationCallbackPtr());
+}
+
+void AbstractDuplexChannel::connect(const OperationCallbackPtr& callback) {  
+  setState(CONNECTING);
+  doConnect(callback);
+}
+
+void AbstractDuplexChannel::doAfterConnect(const OperationCallbackPtr& callback,
+                                           const boost::system::error_code& error) {
   if (error) {
-    channel->channelDisconnected(ChannelConnectException());
-    channel->setState(DEAD);
+    LOG4CXX_ERROR(logger, "Channel " << this << " connect error : " << error.message().c_str());
+    channelConnectFailed(ChannelConnectException(), callback);
     return;
   }
 
-  channel->setState(CONNECTED);
-
+  // set no delay option
   boost::system::error_code ec;
-  boost::asio::ip::tcp::no_delay option(true);
-
-  channel->socket.set_option(option, ec);
+  setSocketOption(ec);
   if (ec) {
-    channel->channelDisconnected(ChannelSetupException());
-    channel->setState(DEAD);
+    LOG4CXX_WARN(logger, "Channel " << this << " set up socket error : " << ec.message().c_str());
+    channelConnectFailed(ChannelSetupException(), callback);
     return;
   } 
-  
-  channel->startSending();
-  channel->startReceiving();
-}
 
-void DuplexChannel::connect() {  
-  setState(CONNECTING);
+  boost::asio::ip::tcp::endpoint localEp;
+  boost::asio::ip::tcp::endpoint remoteEp;
+  localEp = getLocalAddress(ec);
+  remoteEp = getRemoteAddress(ec);
+
+  if (!ec) {
+    LOG4CXX_INFO(logger, "Channel " << this << " connected :"
+                         << localEp.address().to_string() << ":" << localEp.port() << "=>"
+                         << remoteEp.address().to_string() << ":" << remoteEp.port());
+    // update ip address since if might connect to VIP
+    address.updateIP(remoteEp.address().to_v4().to_ulong());
+  }
+  // the channel is connected
+  channelConnected(callback);
+}
+
+void AbstractDuplexChannel::channelConnectFailed(const std::exception& e,
+                                                 const OperationCallbackPtr& callback) {
+  channelDisconnected(e);
+  setState(DEAD);
+  if (callback.get()) {
+    callback->operationFailed(e);
+  }
+}
 
-  boost::asio::ip::tcp::endpoint endp(boost::asio::ip::address_v4(address.ip()), address.port());
-  boost::system::error_code error = boost::asio::error::host_not_found;
+void AbstractDuplexChannel::channelConnected(const OperationCallbackPtr& callback) {
+  // for normal channel, we have done here
+  setState(CONNECTED);
+  if (callback.get()) {
+    callback->operationComplete();
+  }
 
-  socket.async_connect(endp, boost::bind(&DuplexChannel::connectCallbackHandler, 
-					 shared_from_this(), 
-					 boost::asio::placeholders::error)); 
+  // enable sending & receiving
+  startSending();
+  startReceiving();
 }
 
-/*static*/ void DuplexChannel::messageReadCallbackHandler(DuplexChannelPtr channel, 
-							  std::size_t message_size,
-							  const boost::system::error_code& error, 
-							  std::size_t bytes_transferred) {
+/*static*/ void AbstractDuplexChannel::messageReadCallbackHandler(
+                AbstractDuplexChannelPtr channel, 
+                std::size_t message_size,
+                const boost::system::error_code& error, 
+                std::size_t bytes_transferred) {
   LOG4CXX_DEBUG(logger, "DuplexChannel::messageReadCallbackHandler " << error << ", " 
-		<< bytes_transferred << " channel(" << channel.get() << ")");
-		  
+                        << bytes_transferred << " channel(" << channel.get() << ")");
+
   if (error) {
-    LOG4CXX_ERROR(logger, "Invalid read error (" << error << ") bytes_transferred (" 
-		  << bytes_transferred << ") channel(" << channel.get() << ")");
+    if (!channel->isClosed()) {
+      LOG4CXX_INFO(logger, "Invalid read error (" << error << ") bytes_transferred (" 
+                           << bytes_transferred << ") channel(" << channel.get() << ")");
+    }
     channel->channelDisconnected(ChannelReadException());
     return;
   }
@@ -114,6 +159,10 @@ void DuplexChannel::connect() {  
     channel->copy_buf = (char*)realloc(channel->copy_buf, channel->copy_buf_length);
     if (channel->copy_buf == NULL) {
       LOG4CXX_ERROR(logger, "Error allocating buffer. channel(" << channel.get() << ")");
+      // if failed to realloc memory, we should disconnect the channel.
+      // then it would enter disconnect logic, which would close channel and release
+      // its resources includes the copy_buf memory.
+      channel->channelDisconnected(ChannelOutOfMemoryException());
       return;
     }
   }
@@ -122,15 +171,13 @@ void DuplexChannel::connect() {  
   PubSubResponsePtr response(new PubSubResponse());
   bool err = response->ParseFromArray(channel->copy_buf, message_size);
 
-
   if (!err) {
     LOG4CXX_ERROR(logger, "Error parsing message. channel(" << channel.get() << ")");
-
     channel->channelDisconnected(ChannelReadException());
     return;
   } else {
     LOG4CXX_DEBUG(logger,  "channel(" << channel.get() << ") : " << channel->in_buf.size() 
-		  << " bytes left in buffer");
+                           << " bytes left in buffer");
   }
 
   ChannelHandlerPtr h;
@@ -159,25 +206,28 @@ void DuplexChannel::connect() {  
     h->messageReceived(channel, response);
   }
 
-  DuplexChannel::readSize(channel);
+  AbstractDuplexChannel::readSize(channel);
 }
 
-/*static*/ void DuplexChannel::sizeReadCallbackHandler(DuplexChannelPtr channel, 
-						       const boost::system::error_code& error, 
-						       std::size_t bytes_transferred) {
+/*static*/ void AbstractDuplexChannel::sizeReadCallbackHandler(
+                   AbstractDuplexChannelPtr channel, 
+                   const boost::system::error_code& error, 
+                   std::size_t bytes_transferred) {
   LOG4CXX_DEBUG(logger, "DuplexChannel::sizeReadCallbackHandler " << error << ", " 
-		<< bytes_transferred << " channel(" << channel.get() << ")");
+                        << bytes_transferred << " channel(" << channel.get() << ")");
 
   if (error) {
-    LOG4CXX_ERROR(logger, "Invalid read error (" << error << ") bytes_transferred (" 
-		  << bytes_transferred << ") channel(" << channel.get() << ")");
+    if (!channel->isClosed()) {
+      LOG4CXX_INFO(logger, "Invalid read error (" << error << ") bytes_transferred (" 
+                           << bytes_transferred << ") channel(" << channel.get() << ")");
+    }
     channel->channelDisconnected(ChannelReadException());
     return;
   }
   
   if (channel->in_buf.size() < sizeof(uint32_t)) {
     LOG4CXX_ERROR(logger, "Not enough data in stream. Must have been an error reading. " 
-		  << " Closing channel(" << channel.get() << ")");
+                          << " Closing channel(" << channel.get() << ")");
     channel->channelDisconnected(ChannelReadException());
     return;
   }
@@ -189,40 +239,30 @@ void DuplexChannel::connect() {  
 
   int toread = size - channel->in_buf.size();
   LOG4CXX_DEBUG(logger, " size of incoming message " << size << ", currently in buffer " 
-		<< channel->in_buf.size() << " channel(" << channel.get() << ")");
+                        << channel->in_buf.size() << " channel(" << channel.get() << ")");
   if (toread <= 0) {
-    DuplexChannel::messageReadCallbackHandler(channel, size, error, 0);
+    AbstractDuplexChannel::messageReadCallbackHandler(channel, size, error, 0);
   } else {
-    boost::asio::async_read(channel->socket, channel->in_buf,
-			    boost::asio::transfer_at_least(toread),
-			    boost::bind(&DuplexChannel::messageReadCallbackHandler, 
-					channel, size,
-					boost::asio::placeholders::error, 
-					boost::asio::placeholders::bytes_transferred));
+    channel->readMsgBody(channel->in_buf, toread, size);
   }
 }
 
-/*static*/ void DuplexChannel::readSize(DuplexChannelPtr channel) {
+/*static*/ void AbstractDuplexChannel::readSize(AbstractDuplexChannelPtr channel) {
   int toread = sizeof(uint32_t) - channel->in_buf.size();
   LOG4CXX_DEBUG(logger, " size of incoming message " << sizeof(uint32_t) 
-		<< ", currently in buffer " << channel->in_buf.size() 
-		<< " channel(" << channel.get() << ")");
+                        << ", currently in buffer " << channel->in_buf.size() 
+                        << " channel(" << channel.get() << ")");
 
   if (toread < 0) {
-    DuplexChannel::sizeReadCallbackHandler(channel, boost::system::error_code(), 0);
+    AbstractDuplexChannel::sizeReadCallbackHandler(channel, boost::system::error_code(), 0);
   } else {
-    //  in_buf_size.prepare(sizeof(uint32_t));
-    boost::asio::async_read(channel->socket, channel->in_buf, 
-			    boost::asio::transfer_at_least(sizeof(uint32_t)),
-			    boost::bind(&DuplexChannel::sizeReadCallbackHandler, 
-					channel, 
-					boost::asio::placeholders::error, 
-					boost::asio::placeholders::bytes_transferred));
+    channel->readMsgSize(channel->in_buf);
   }
 }
 
-void DuplexChannel::startReceiving() {
-  LOG4CXX_DEBUG(logger, "DuplexChannel::startReceiving channel(" << this << ") currently receiving = " << receiving);
+void AbstractDuplexChannel::startReceiving() {
+  LOG4CXX_DEBUG(logger, "DuplexChannel::startReceiving channel(" << this
+                        << ") currently receiving = " << receiving);
 
   PubSubResponsePtr response;
   bool inReadingState;
@@ -271,22 +311,26 @@ void DuplexChannel::startReceiving() {
 
   // if channel is not in reading state, #readSize
   if (!inReadingState) {
-    DuplexChannel::readSize(shared_from_this());
+    AbstractDuplexChannel::readSize(shared_from_this());
   }
 }
 
-bool DuplexChannel::isReceiving() {
+bool AbstractDuplexChannel::isReceiving() {
   return receiving;
 }
 
-void DuplexChannel::stopReceiving() {
+bool AbstractDuplexChannel::isClosed() {
+  return closed;
+}
+
+void AbstractDuplexChannel::stopReceiving() {
   LOG4CXX_DEBUG(logger, "DuplexChannel::stopReceiving channel(" << this << ")");
   
   boost::lock_guard<boost::mutex> lock(receiving_lock);
   receiving = false;
 }
 
-void DuplexChannel::startSending() {
+void AbstractDuplexChannel::startSending() {
   {
     boost::shared_lock<boost::shared_mutex> lock(state_lock);
     if (state != CONNECTED) {
@@ -298,7 +342,7 @@ void DuplexChannel::startSending() {
   if (sending) {
     return;
   }
-  LOG4CXX_DEBUG(logger, "DuplexChannel::startSending channel(" << this << ")");
+  LOG4CXX_DEBUG(logger, "AbstractDuplexChannel::startSending channel(" << this << ")");
   
   WriteRequest w;
   { 
@@ -323,20 +367,14 @@ void DuplexChannel::startSending() {
     return;
   }
 
-  boost::asio::async_write(socket, out_buf, 
-			   boost::bind(&DuplexChannel::writeCallbackHandler, 
-				       shared_from_this(), 
-				       w.second,
-				       boost::asio::placeholders::error, 
-				       boost::asio::placeholders::bytes_transferred));
+  writeBuffer(out_buf, w.second);
 }
 
-
-const HostAddress& DuplexChannel::getHostAddress() const {
+const HostAddress& AbstractDuplexChannel::getHostAddress() const {
   return address;
 }
 
-void DuplexChannel::channelDisconnected(const std::exception& e) {
+void AbstractDuplexChannel::channelDisconnected(const std::exception& e) {
   setState(DEAD);
   
   {
@@ -360,54 +398,39 @@ void DuplexChannel::channelDisconnected(
   }
 }
 
-void DuplexChannel::kill() {
-  LOG4CXX_DEBUG(logger, "Killing duplex channel (" << this << ")");
-    
-  bool connected = false;
+void AbstractDuplexChannel::close() {
   {
     boost::shared_lock<boost::shared_mutex> statelock(state_lock);
-    connected = (state == CONNECTING || state == CONNECTED);
+    state = DEAD;
   }
 
-  boost::lock_guard<boost::shared_mutex> lock(destruction_lock);
-  if (connected) {
-    setState(DEAD);
-    
-    boost::system::error_code ec;
-    socket.cancel(ec);
-    if (ec) {
-      LOG4CXX_WARN(logger, "Channel " << this << " canceling io error : " << ec.message().c_str());
-    }
-    socket.shutdown(boost::asio::ip::tcp::socket::shutdown_both, ec);
-    if (ec) {
-      LOG4CXX_WARN(logger, "Channel " << this << " shutdown error : " << ec.message().c_str());
-    }
-    socket.close(ec);
-    if (ec) {
-      LOG4CXX_WARN(logger, "Channel " << this << " close error : " << ec.message().c_str());
+  {
+    boost::lock_guard<boost::shared_mutex> lock(destruction_lock);
+    if (closed) {
+      // some one has closed the socket.
+      return;
     }
+    closed = true;
+    handler = ChannelHandlerPtr(); // clear the handler in case it ever referenced the channel*/
   }
-  handler = ChannelHandlerPtr(); // clear the handler in case it ever referenced the channel*/
-}
 
-DuplexChannel::~DuplexChannel() {
-  /** If we are going away, fail all transactions that haven't been completed */
-  failAllTransactions();
-  kill();
-  free(copy_buf);
-  copy_buf = NULL;
-  copy_buf_length = 0;
+  LOG4CXX_INFO(logger, "Killing duplex channel (" << this << ")");
 
-  LOG4CXX_DEBUG(logger, "Destroying DuplexChannel(" << this << ")");
+  // If we are going away, fail all transactions that haven't been completed
+  failAllTransactions();
+  closeSocket();  
 }
 
-/*static*/ void DuplexChannel::writeCallbackHandler(DuplexChannelPtr channel, OperationCallbackPtr callback,
-						    const boost::system::error_code& error, 
-						    std::size_t bytes_transferred) {
-  LOG4CXX_DEBUG(logger, "DuplexChannel::writeCallbackHandler " << error << ", " 
-		<< bytes_transferred << " channel(" << channel.get() << ")");
-
+/*static*/ void AbstractDuplexChannel::writeCallbackHandler(
+                  AbstractDuplexChannelPtr channel,
+                  OperationCallbackPtr callback,
+                  const boost::system::error_code& error, 
+                  std::size_t bytes_transferred) {
   if (error) {
+    if (!channel->isClosed()) {
+      LOG4CXX_DEBUG(logger, "AbstractDuplexChannel::writeCallbackHandler " << error << ", " 
+                            << bytes_transferred << " channel(" << channel.get() << ")");
+    }
     callback->operationFailed(ChannelWriteException());
     channel->channelDisconnected(ChannelWriteException());
     return;
@@ -425,17 +448,15 @@ DuplexChannel::~DuplexChannel() {
   channel->startSending();
 }
 
-void DuplexChannel::writeRequest(const PubSubRequestPtr& m, const OperationCallbackPtr& callback) {
-  LOG4CXX_DEBUG(logger, "DuplexChannel::writeRequest channel(" << this << ") txnid(" 
-		<< m->txnid() << ") shouldClaim("<< m->has_shouldclaim() << ", " 
-		<< m->shouldclaim() << ")");
-
+void AbstractDuplexChannel::writeRequest(const PubSubRequestPtr& m,
+                                         const OperationCallbackPtr& callback) {
   {
     boost::shared_lock<boost::shared_mutex> lock(state_lock);
     if (state != CONNECTED && state != CONNECTING) {
       LOG4CXX_ERROR(logger,"Tried to write transaction [" << m->txnid() << "] to a channel [" 
-		    << this << "] which is " << (state == DEAD ? "DEAD" : "UNINITIALISED"));
+                           << this << "] which is " << (state == DEAD ? "DEAD" : "UNINITIALISED"));
       callback->operationFailed(UninitialisedChannelException());
+      return;
     }
   }
 
@@ -448,10 +469,14 @@ void DuplexChannel::writeRequest(const P
   startSending();
 }
 
+//
+// Transaction operations
+//
+
 /**
    Store the transaction data for a request.
 */
-void DuplexChannel::storeTransaction(const PubSubDataPtr& data) {
+void AbstractDuplexChannel::storeTransaction(const PubSubDataPtr& data) {
   LOG4CXX_DEBUG(logger, "Storing txnid(" << data->getTxnId() << ") for channel(" << this << ")");
 
   boost::lock_guard<boost::mutex> lock(txnid2data_lock);
@@ -461,20 +486,20 @@ void DuplexChannel::storeTransaction(con
 /**
    Give the transaction back to the caller. 
 */
-PubSubDataPtr DuplexChannel::retrieveTransaction(long txnid) {
+PubSubDataPtr AbstractDuplexChannel::retrieveTransaction(long txnid) {
   boost::lock_guard<boost::mutex> lock(txnid2data_lock);
 
   PubSubDataPtr data = txnid2data[txnid];
   txnid2data.erase(txnid);
   if (data == NULL) {
     LOG4CXX_ERROR(logger, "Transaction txnid(" << txnid 
-		  << ") doesn't exist in channel (" << this << ")");
+                          << ") doesn't exist in channel (" << this << ")");
   }
 
   return data;
 }
 
-void DuplexChannel::failAllTransactions() {
+void AbstractDuplexChannel::failAllTransactions() {
   boost::lock_guard<boost::mutex> lock(txnid2data_lock);
   for (TransactionMap::iterator iter = txnid2data.begin(); iter != txnid2data.end(); ++iter) {
     PubSubDataPtr& data = (*iter).second;
@@ -483,7 +508,322 @@ void DuplexChannel::failAllTransactions(
   txnid2data.clear();
 }
 
-void DuplexChannel::setState(State s) {
+// Set state for the channel
+void AbstractDuplexChannel::setState(State s) {
   boost::lock_guard<boost::shared_mutex> lock(state_lock);
   state = s;
 }
+
+//
+// Basic Asio Channel Implementation
+//
+
+AsioDuplexChannel::AsioDuplexChannel(IOServicePtr& service,
+                                     const HostAddress& addr, 
+                                     const ChannelHandlerPtr& handler)
+  : AbstractDuplexChannel(service, addr, handler) {
+  this->socket = boost_socket_ptr(new boost_socket(getService()));
+  LOG4CXX_DEBUG(logger, "Creating DuplexChannel(" << this << ")");
+}
+
+AsioDuplexChannel::~AsioDuplexChannel() {
+}
+
+void AsioDuplexChannel::doConnect(const OperationCallbackPtr& callback) {
+  boost::system::error_code error = boost::asio::error::host_not_found;
+  uint32_t ip2conn = address.ip();
+  uint16_t port2conn = address.port();
+  boost::asio::ip::tcp::endpoint endp(boost::asio::ip::address_v4(ip2conn), port2conn);
+
+  socket->async_connect(endp, boost::bind(&AbstractDuplexChannel::connectCallbackHandler, 
+                                          shared_from_this(), callback,
+                                          boost::asio::placeholders::error));
+  LOG4CXX_INFO(logger, "Channel (" << this << ") fire connect operation to ip (" 
+                                   << ip2conn << ") port (" << port2conn << ")");
+}
+
+void AsioDuplexChannel::setSocketOption(boost::system::error_code& ec) {
+  boost::asio::ip::tcp::no_delay option(true);
+  socket->set_option(option, ec);
+}
+
+boost::asio::ip::tcp::endpoint AsioDuplexChannel::getLocalAddress(
+    boost::system::error_code& ec) {
+  return socket->local_endpoint(ec);
+}
+
+boost::asio::ip::tcp::endpoint AsioDuplexChannel::getRemoteAddress(
+    boost::system::error_code& ec) {
+  return socket->remote_endpoint(ec);
+}
+
+void AsioDuplexChannel::writeBuffer(boost::asio::streambuf& buffer,
+                                    const OperationCallbackPtr& callback) {
+  boost::asio::async_write(*socket, buffer,
+    boost::bind(&AbstractDuplexChannel::writeCallbackHandler, 
+    shared_from_this(), callback,
+    boost::asio::placeholders::error, 
+    boost::asio::placeholders::bytes_transferred)
+  );
+}
+
+void AsioDuplexChannel::readMsgSize(boost::asio::streambuf& buffer) {
+  boost::asio::async_read(*socket, buffer, boost::asio::transfer_at_least(sizeof(uint32_t)),
+                          boost::bind(&AbstractDuplexChannel::sizeReadCallbackHandler,
+                                      shared_from_this(),
+                                      boost::asio::placeholders::error,
+                                      boost::asio::placeholders::bytes_transferred));
+}
+
+void AsioDuplexChannel::readMsgBody(boost::asio::streambuf& buffer,
+                                    int toread, uint32_t msgSize) {
+  boost::asio::async_read(*socket, buffer, boost::asio::transfer_at_least(toread),
+                          boost::bind(&AbstractDuplexChannel::messageReadCallbackHandler,
+                                      shared_from_this(), msgSize,
+                                      boost::asio::placeholders::error,
+                                      boost::asio::placeholders::bytes_transferred));
+}
+
+void AsioDuplexChannel::closeSocket() {
+  boost::system::error_code ec;
+
+  socket->cancel(ec);
+  if (ec) {
+    LOG4CXX_WARN(logger, "Channel " << this << " canceling io error : " << ec.message().c_str());
+  }
+
+  socket->shutdown(boost::asio::ip::tcp::socket::shutdown_both, ec);
+  if (ec) {
+    LOG4CXX_WARN(logger, "Channel " << this << " shutdown error : " << ec.message().c_str());
+  }
+
+  socket->close(ec);
+  if (ec) {
+    LOG4CXX_WARN(logger, "Channel " << this << " close error : " << ec.message().c_str());
+  }
+}
+
+// SSL Context Factory
+
+SSLContextFactory::SSLContextFactory(const Configuration& conf)
+  : conf(conf),
+    sslPemFile(conf.get(Configuration::SSL_PEM_FILE,
+                        DEFAULT_SSL_PEM_FILE)) {
+}
+
+SSLContextFactory::~SSLContextFactory() {}
+
+boost_ssl_context_ptr SSLContextFactory::createSSLContext(boost::asio::io_service& service) {
+  boost_ssl_context_ptr sslCtx(new boost_ssl_context(service,
+                               boost::asio::ssl::context::sslv23_client));
+  sslCtx->set_verify_mode(boost::asio::ssl::context::verify_none);
+  if (!sslPemFile.empty()) {
+    boost::system::error_code err;
+    sslCtx->load_verify_file(sslPemFile, err);
+
+    if (err) {
+      LOG4CXX_ERROR(logger, "Failed to load verify ssl pem file : "
+                            << sslPemFile);
+      throw InvalidSSLPermFileException();
+    }
+  }
+  return sslCtx;
+}
+
+//
+// SSL Channl Implementation
+//
+
+#ifndef __APPLE__
+AsioSSLDuplexChannel::AsioSSLDuplexChannel(IOServicePtr& service,
+                                           const boost_ssl_context_ptr& sslCtx,
+                                           const HostAddress& addr,
+                                           const ChannelHandlerPtr& handler)
+  : AbstractDuplexChannel(service, addr, handler), ssl_ctx(sslCtx),
+    sslclosed(false) {
+#else
+AsioSSLDuplexChannel::AsioSSLDuplexChannel(IOServicePtr& service,
+                                           const boost_ssl_context_ptr& sslCtx,
+                                           const HostAddress& addr,
+                                           const ChannelHandlerPtr& handler)
+  : AbstractDuplexChannel(service, addr, handler), ssl_ctx(sslCtx) {
+#endif
+  ssl_socket = boost_ssl_socket_ptr(new boost_ssl_socket(getService(), *ssl_ctx));
+  LOG4CXX_DEBUG(logger, "Created SSL DuplexChannel(" << this << ")");
+}
+
+AsioSSLDuplexChannel::~AsioSSLDuplexChannel() {
+}
+
+void AsioSSLDuplexChannel::doConnect(const OperationCallbackPtr& callback) {
+  boost::system::error_code error = boost::asio::error::host_not_found;
+  uint32_t ip2conn = address.ip();
+  uint16_t port2conn = address.sslPort();
+  boost::asio::ip::tcp::endpoint endp(boost::asio::ip::address_v4(ip2conn), port2conn);
+
+  ssl_socket->lowest_layer().async_connect(endp,
+      boost::bind(&AbstractDuplexChannel::connectCallbackHandler, 
+                  shared_from_this(), callback,
+                  boost::asio::placeholders::error));
+  LOG4CXX_INFO(logger, "SSL Channel (" << this << ") fire connect operation to ip (" 
+                                       << ip2conn << ") port (" << port2conn << ")");
+}
+
+void AsioSSLDuplexChannel::setSocketOption(boost::system::error_code& ec) {
+  boost::asio::ip::tcp::no_delay option(true);
+  ssl_socket->lowest_layer().set_option(option, ec);
+}
+
+boost::asio::ip::tcp::endpoint AsioSSLDuplexChannel::getLocalAddress(
+    boost::system::error_code& ec) {
+  return ssl_socket->lowest_layer().local_endpoint(ec);
+}
+
+boost::asio::ip::tcp::endpoint AsioSSLDuplexChannel::getRemoteAddress(
+    boost::system::error_code& ec) {
+  return ssl_socket->lowest_layer().remote_endpoint(ec);
+}
+
+void AsioSSLDuplexChannel::channelConnected(const OperationCallbackPtr& callback) {
+  // for SSL channel, we had to do SSL hand shake
+  startHandShake(callback);
+  LOG4CXX_INFO(logger, "SSL Channel " << this << " fire hand shake operation");
+}
+
+void AsioSSLDuplexChannel::sslChannelConnected(const OperationCallbackPtr& callback) {
+  LOG4CXX_INFO(logger, "SSL Channel " << this << " hand shake finish!!");
+  AbstractDuplexChannel::channelConnected(callback);
+}
+
+void AsioSSLDuplexChannel::startHandShake(const OperationCallbackPtr& callback) {
+  ssl_socket->async_handshake(boost::asio::ssl::stream_base::client,
+                              boost::bind(&AsioSSLDuplexChannel::handleHandshake,
+                                          boost::shared_dynamic_cast<AsioSSLDuplexChannel>(shared_from_this()),
+                                          callback, boost::asio::placeholders::error));
+}
+
+void AsioSSLDuplexChannel::handleHandshake(AsioSSLDuplexChannelPtr channel,
+                                           OperationCallbackPtr callback,
+                                           const boost::system::error_code& error) {
+  if (error) {
+    LOG4CXX_ERROR(logger, "SSL Channel " << channel.get() << " hand shake error : "
+                          << error.message().c_str());
+    channel->channelConnectFailed(ChannelConnectException(), callback);
+    return;
+  }
+  channel->sslChannelConnected(callback);
+}
+
+void AsioSSLDuplexChannel::writeBuffer(boost::asio::streambuf& buffer,
+                                       const OperationCallbackPtr& callback) {
+  boost::asio::async_write(*ssl_socket, buffer,
+    boost::bind(&AbstractDuplexChannel::writeCallbackHandler, 
+    shared_from_this(), callback,
+    boost::asio::placeholders::error, 
+    boost::asio::placeholders::bytes_transferred)
+  );
+}
+
+void AsioSSLDuplexChannel::readMsgSize(boost::asio::streambuf& buffer) {
+  boost::asio::async_read(*ssl_socket, buffer, boost::asio::transfer_at_least(sizeof(uint32_t)),
+                          boost::bind(&AbstractDuplexChannel::sizeReadCallbackHandler, 
+                                      shared_from_this(),
+                                      boost::asio::placeholders::error, 
+                                      boost::asio::placeholders::bytes_transferred));
+}
+
+void AsioSSLDuplexChannel::readMsgBody(boost::asio::streambuf& buffer,
+                                       int toread, uint32_t msgSize) {
+  boost::asio::async_read(*ssl_socket, buffer, boost::asio::transfer_at_least(toread),
+                          boost::bind(&AbstractDuplexChannel::messageReadCallbackHandler, 
+                                      shared_from_this(), msgSize,
+                                      boost::asio::placeholders::error, 
+                                      boost::asio::placeholders::bytes_transferred));
+}
+
+#ifndef __APPLE__
+// boost asio doesn't provide time out mechanism to shutdown ssl
+void AsioSSLDuplexChannel::sslShutdown() {
+  ssl_socket->async_shutdown(boost::bind(&AsioSSLDuplexChannel::handleSSLShutdown,
+                                         boost::shared_dynamic_cast<AsioSSLDuplexChannel>(shared_from_this()),
+                                         boost::asio::placeholders::error));
+}
+
+void AsioSSLDuplexChannel::handleSSLShutdown(const boost::system::error_code& error) {
+  if (error) {
+    LOG4CXX_ERROR(logger, "SSL Channel " << this << " shutdown error : "
+                          << error.message().c_str());
+  }
+  {
+    boost::lock_guard<boost::mutex> lock(sslclosed_lock);
+    sslclosed = true;
+  }
+  sslclosed_cond.notify_all();
+}
+#endif
+
+void AsioSSLDuplexChannel::closeSocket() {
+#ifndef __APPLE__
+  // Shutdown ssl
+  sslShutdown();
+  // time wait 
+  {
+    boost::mutex::scoped_lock lock(sslclosed_lock);
+    if (!sslclosed) {
+      sslclosed_cond.timed_wait(lock, boost::posix_time::milliseconds(1000)); 
+    }
+  }
+#endif
+  closeLowestLayer();
+}
+
+void AsioSSLDuplexChannel::closeLowestLayer() {
+  boost::system::error_code ec;
+
+  ssl_socket->lowest_layer().cancel(ec);
+  if (ec) {
+    LOG4CXX_WARN(logger, "Channel " << this << " canceling io error : " << ec.message().c_str());
+  }
+
+  ssl_socket->lowest_layer().shutdown(boost::asio::ip::tcp::socket::shutdown_both, ec);
+  if (ec) {
+    LOG4CXX_WARN(logger, "Channel " << this << " shutdown error : " << ec.message().c_str());
+  }
+
+  ssl_socket->lowest_layer().close(ec);
+  if (ec) {
+    LOG4CXX_WARN(logger, "Channel " << this << " close error : " << ec.message().c_str());
+  }
+}
+
+DuplexChannelManagerPtr DuplexChannelManager::create(const Configuration& conf,
+                                                     EventDispatcher& dispatcher) {
+  DuplexChannelManagerPtr factory(new DuplexChannelManager(conf, dispatcher));
+  LOG4CXX_DEBUG(logger, "Created DuplexChannelManager " << factory);
+  return factory;
+}
+
+DuplexChannelManager::DuplexChannelManager(const Configuration& conf,
+                                           EventDispatcher& dispatcher)
+  : conf(conf), dispatcher(dispatcher) {
+  sslEnabled = conf.getBool(Configuration::RUN_AS_SSL_MODE, DEFAULT_SSL_ENABLED); 
+  if (sslEnabled) {
+    sslCtxFactory = SSLContextFactoryPtr(new SSLContextFactory(conf));
+  }
+}
+
+DuplexChannelManager::~DuplexChannelManager() {
+}
+
+DuplexChannelPtr DuplexChannelManager::createChannel(const HostAddress& addr,
+                                                     const ChannelHandlerPtr& handler) {
+  LOG4CXX_DEBUG(logger, "Creating channel with handler " << handler.get());
+  IOServicePtr& service = dispatcher.getService();
+  if (sslEnabled) {
+    boost_ssl_context_ptr sslCtx =
+      sslCtxFactory->createSSLContext(service->getService());
+    return DuplexChannelPtr(new AsioSSLDuplexChannel(service, sslCtx, addr, handler));
+  } else {
+    return DuplexChannelPtr(new AsioDuplexChannel(service, addr, handler));
+  }
+}

Modified: zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/lib/channel.h
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/lib/channel.h?rev=1392319&r1=1392318&r2=1392319&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/lib/channel.h (original)
+++ zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/lib/channel.h Mon Oct  1 13:36:31 2012
@@ -40,6 +40,8 @@
 
 #include <boost/asio.hpp>
 #include <boost/asio/ip/tcp.hpp>
+#include <boost/asio/ssl.hpp>
+#include <boost/function.hpp>
 #include <boost/thread/mutex.hpp>
 #include <boost/thread/shared_mutex.hpp>
 
@@ -57,9 +59,16 @@ namespace Hedwig {
   class ChannelWriteException : public ChannelException {};
   class ChannelReadException : public ChannelException {};
   class ChannelThreadException : public ChannelException {};
+  class ChannelOutOfMemoryException : public ChannelException {};
+
+  class InvalidSSLPermFileException : public std::exception {};
 
   class DuplexChannel;
   typedef boost::shared_ptr<DuplexChannel> DuplexChannelPtr;
+  typedef boost::asio::ip::tcp::socket boost_socket;
+  typedef boost::shared_ptr<boost_socket> boost_socket_ptr;
+  typedef boost::asio::ssl::stream<boost_socket> boost_ssl_socket;
+  typedef boost::shared_ptr<boost_ssl_socket> boost_ssl_socket_ptr;
 
   class ChannelHandler {
   public:
@@ -74,90 +83,364 @@ namespace Hedwig {
 
   typedef boost::shared_ptr<ChannelHandler> ChannelHandlerPtr;
 
+  // A channel interface to send requests
+  class DuplexChannel {
+  public:
+    virtual ~DuplexChannel() {}
+
+    // Issues a connect request to the target host
+    // User could writeRequest after issued connect request, those requests should
+    // be buffered and written until the channel is connected.
+    virtual void connect() = 0;
+
+    // Issues a connect request to the target host
+    // User could writeRequest after issued connect request, those requests should
+    // be buffered and written until the channel is connected.
+    // The provided callback would be triggered after connected.
+    virtual void connect(const OperationCallbackPtr& callback) = 0;
+
+    // Write the request to underlying channel
+    // If the channel is not established, all write requests would be buffered
+    // until channel is connected.
+    virtual void writeRequest(const PubSubRequestPtr& m,
+                              const OperationCallbackPtr& callback) = 0; 
+
+    // Returns the remote address where this channel is connected to.
+    virtual const HostAddress& getHostAddress() const = 0;
+
+    // Resumes the read operation of this channel asynchronously
+    virtual void startReceiving() = 0;
+
+    // Suspends the read operation of this channel asynchronously
+    virtual void stopReceiving() = 0;
+
+    // Returns if and only if the channel will read a message
+    virtual bool isReceiving() = 0;
+
+    //
+    // Transaction operations
+    //
+
+    // Store a pub/sub request
+    virtual void storeTransaction(const PubSubDataPtr& data) = 0;
+
+    // Remove a pub/sub request
+    virtual PubSubDataPtr retrieveTransaction(long txnid) = 0;
+
+    // Fail all transactions
+    virtual void failAllTransactions() = 0;
+
+    // Handle the case that the channel is disconnected due issues found
+    // when reading or writing
+    virtual void channelDisconnected(const std::exception& e) = 0;
+
+    // Close the channel to release the resources
+    // Once a channel is closed, it can not be open again. Calling this
+    // method on a closed channel has no efffect.
+    virtual void close() = 0;
+  };
 
-  class DuplexChannel : public boost::enable_shared_from_this<DuplexChannel> {
+  typedef boost::asio::ssl::context boost_ssl_context;
+  typedef boost::shared_ptr<boost_ssl_context> boost_ssl_context_ptr;
+
+  class SSLContextFactory {
   public:
-    DuplexChannel(EventDispatcher& dispatcher, const HostAddress& addr, 
-		  const Configuration& cfg, const ChannelHandlerPtr& handler);
-    static void connectCallbackHandler(DuplexChannelPtr channel, 
-				       const boost::system::error_code& error);
-    void connect();
-
-    static void writeCallbackHandler(DuplexChannelPtr channel, OperationCallbackPtr callback, 
-				     const boost::system::error_code& error, 
-				     std::size_t bytes_transferred);
-    void writeRequest(const PubSubRequestPtr& m, const OperationCallbackPtr& callback);
-    
-    const HostAddress& getHostAddress() const;
-
-    void storeTransaction(const PubSubDataPtr& data);
-    PubSubDataPtr retrieveTransaction(long txnid);
-    void failAllTransactions();
-
-    static void sizeReadCallbackHandler(DuplexChannelPtr channel, 
-					const boost::system::error_code& error, 
-					std::size_t bytes_transferred);
-    static void messageReadCallbackHandler(DuplexChannelPtr channel, std::size_t messagesize, 
-					   const boost::system::error_code& error, 
-					   std::size_t bytes_transferred);
-    static void readSize(DuplexChannelPtr channel);
-
-    void startReceiving();
-    bool isReceiving();
-    void stopReceiving();
-    
-    void startSending();
+    SSLContextFactory(const Configuration& conf);
+    ~SSLContextFactory();
+
+    boost_ssl_context_ptr createSSLContext(boost::asio::io_service& service);
+  private:
+    const Configuration& conf;
+    std::string sslPemFile;
+  };
+
+  typedef boost::shared_ptr<SSLContextFactory> SSLContextFactoryPtr;
+
+  class DuplexChannelManager;
+  typedef boost::shared_ptr<DuplexChannelManager> DuplexChannelManagerPtr;
 
-    void channelDisconnected(const std::exception& e);
-    virtual void kill();
+  class DuplexChannelManager : public boost::enable_shared_from_this<DuplexChannelManager> {
+  public:
+    static DuplexChannelManagerPtr create(const Configuration& conf,
+                                          EventDispatcher& dispatcher);
+    ~DuplexChannelManager();
+
+    DuplexChannelPtr createChannel(const HostAddress& addr, const ChannelHandlerPtr& handler);
+  private:
+    DuplexChannelManager(const Configuration& conf, EventDispatcher& dispatcher);
 
-    inline boost::asio::io_service & getService() {
+    const Configuration& conf;
+    EventDispatcher& dispatcher;
+    bool sslEnabled;
+    SSLContextFactoryPtr sslCtxFactory;
+  };
+
+  class AbstractDuplexChannel;
+  typedef boost::shared_ptr<AbstractDuplexChannel> AbstractDuplexChannelPtr;
+
+  class AbstractDuplexChannel : public DuplexChannel,
+                                public boost::enable_shared_from_this<AbstractDuplexChannel> {
+  public:
+    AbstractDuplexChannel(IOServicePtr& service,
+                          const HostAddress& addr, 
+                          const ChannelHandlerPtr& handler);
+    virtual ~AbstractDuplexChannel();
+
+    //
+    // Connect Operation
+    //
+
+    // Asio Connect Callback Handler
+    static void connectCallbackHandler(AbstractDuplexChannelPtr channel, 
+                                       OperationCallbackPtr callback,
+                                       const boost::system::error_code& error);
+    virtual void connect();
+    virtual void connect(const OperationCallbackPtr& callback);
+
+    //
+    // Write Operation
+    //
+
+    // Asio Write Callback Handler
+    static void writeCallbackHandler(AbstractDuplexChannelPtr channel,
+                                     OperationCallbackPtr callback, 
+                                     const boost::system::error_code& error, 
+                                     std::size_t bytes_transferred);
+    // Write request
+    virtual void writeRequest(const PubSubRequestPtr& m,
+                              const OperationCallbackPtr& callback);
+
+    // get the target host
+    virtual const HostAddress& getHostAddress() const;
+
+    static void sizeReadCallbackHandler(AbstractDuplexChannelPtr channel, 
+                                        const boost::system::error_code& error, 
+                                        std::size_t bytes_transferred);
+    static void messageReadCallbackHandler(AbstractDuplexChannelPtr channel,
+                                           std::size_t messagesize, 
+                                           const boost::system::error_code& error, 
+                                           std::size_t bytes_transferred);
+    static void readSize(AbstractDuplexChannelPtr channel);
+
+    // start receiving responses from underlying channel
+    virtual void startReceiving();
+    // is the underlying channel in receiving state
+    virtual bool isReceiving();
+    // stop receiving responses from underlying channel
+    virtual void stopReceiving();
+
+    // Store a pub/sub request
+    virtual void storeTransaction(const PubSubDataPtr& data);
+
+    // Remove a pub/sub request
+    virtual PubSubDataPtr retrieveTransaction(long txnid);
+
+    // Fail all transactions
+    virtual void failAllTransactions();
+
+    // channel is disconnected for a specified exception
+    virtual void channelDisconnected(const std::exception& e);
+
+    // close the channel
+    virtual void close();
+
+    inline boost::asio::io_service & getService() const {
       return service;
     }
 
-    virtual ~DuplexChannel();
-  private:
-    enum State { UNINITIALISED, CONNECTING, CONNECTED, DEAD };
+  protected:
+    // execute the connect operation
+    virtual void doConnect(const OperationCallbackPtr& callback) = 0;
+
+    virtual void doAfterConnect(const OperationCallbackPtr& callback,
+                                const boost::system::error_code& error);
+
+    // Execute the action after channel connect
+    // It would be executed in asio connect callback handler
+    virtual void setSocketOption(boost::system::error_code& ec) = 0;
+    virtual boost::asio::ip::tcp::endpoint
+            getRemoteAddress(boost::system::error_code& ec) = 0;
+    virtual boost::asio::ip::tcp::endpoint
+            getLocalAddress(boost::system::error_code& ec) = 0;
+
+    // Channel failed to connect
+    virtual void channelConnectFailed(const std::exception& e,
+                                      const OperationCallbackPtr& callback);
+    // Channel connected
+    virtual void channelConnected(const OperationCallbackPtr& callback);
 
-    void setState(State s);
+    // Start sending buffered requests to target host
+    void startSending();
 
-    EventDispatcher& dispatcher;
+    // Write a buffer to underlying socket
+    virtual void writeBuffer(boost::asio::streambuf& buffer,
+                             const OperationCallbackPtr& callback) = 0;
+
+    // Read a message from underlying socket
+    virtual void readMsgSize(boost::asio::streambuf& buffer) = 0;
+    virtual void readMsgBody(boost::asio::streambuf& buffer,
+                             int toread, uint32_t msgSize) = 0;
 
+    // is the channel under closing
+    bool isClosed();
+
+    // close the underlying socket to release resource 
+    virtual void closeSocket() = 0;
+
+    enum State { UNINITIALISED, CONNECTING, CONNECTED, DEAD };
+    void setState(State s);
+
+    // Address
     HostAddress address;
+  private:
     ChannelHandlerPtr handler;
 
     boost::asio::io_service &service;
-    boost::asio::ip::tcp::socket socket;
+
+    // buffers for input stream
     boost::asio::streambuf in_buf;
     std::istream instream;
-    
-    // only exists because protobufs can't play nice with streams (if there's more than message len in it, it tries to read all)
+
+    // only exists because protobufs can't play nice with streams
+    // (if there's more than message len in it, it tries to read all)
     char* copy_buf;
     std::size_t copy_buf_length;
 
+    // buffers for output stream
     boost::asio::streambuf out_buf;
-    
+    // write requests queue 
     typedef std::pair<PubSubRequestPtr, OperationCallbackPtr> WriteRequest;
     boost::mutex write_lock;
     std::deque<WriteRequest> write_queue;
 
+    // channel state
     State state;
     boost::shared_mutex state_lock;
 
+    // reading state
     bool receiving;
     bool reading;
     PubSubResponsePtr outstanding_response;
     boost::mutex receiving_lock;
-    
+
+    // sending state
     bool sending;
     boost::mutex sending_lock;
 
+    // flag indicates the channel is closed
+    // some callback might return when closing
+    bool closed;
+
+    // transactions
     typedef std::tr1::unordered_map<long, PubSubDataPtr> TransactionMap;
 
     TransactionMap txnid2data;
     boost::mutex txnid2data_lock;
     boost::shared_mutex destruction_lock;
   };
+
+  class AsioDuplexChannel : public AbstractDuplexChannel {
+  public:
+    AsioDuplexChannel(IOServicePtr& service,
+                      const HostAddress& addr, 
+                      const ChannelHandlerPtr& handler);
+    virtual ~AsioDuplexChannel();
+  protected:
+    // execute the connect operation
+    virtual void doConnect(const OperationCallbackPtr& callback);
+
+    // Execute the action after channel connect
+    // It would be executed in asio connect callback handler
+    virtual void setSocketOption(boost::system::error_code& ec);
+    virtual boost::asio::ip::tcp::endpoint
+            getRemoteAddress(boost::system::error_code& ec);
+    virtual boost::asio::ip::tcp::endpoint
+            getLocalAddress(boost::system::error_code& ec);
+
+    // Write a buffer to underlying socket
+    virtual void writeBuffer(boost::asio::streambuf& buffer,
+                             const OperationCallbackPtr& callback);
+
+    // Read a message from underlying socket
+    virtual void readMsgSize(boost::asio::streambuf& buffer);
+    virtual void readMsgBody(boost::asio::streambuf& buffer,
+                             int toread, uint32_t msgSize);
+
+    // close the underlying socket to release resource 
+    virtual void closeSocket();
+  private:
+    // underlying socket
+    boost_socket_ptr socket;
+  };
+
+  typedef boost::shared_ptr<AsioDuplexChannel> AsioDuplexChannelPtr;
+
+  class AsioSSLDuplexChannel;
+  typedef boost::shared_ptr<AsioSSLDuplexChannel> AsioSSLDuplexChannelPtr;
+
+  class AsioSSLDuplexChannel : public AbstractDuplexChannel {
+  public:
+    AsioSSLDuplexChannel(IOServicePtr& service,
+                         const boost_ssl_context_ptr& sslCtx,
+                         const HostAddress& addr, 
+                         const ChannelHandlerPtr& handler);
+    virtual ~AsioSSLDuplexChannel();
+  protected:
+    // execute the connect operation
+    virtual void doConnect(const OperationCallbackPtr& callback);
+    // Execute the action after channel connect
+    // It would be executed in asio connect callback handler
+    virtual void setSocketOption(boost::system::error_code& ec);
+    virtual boost::asio::ip::tcp::endpoint
+            getRemoteAddress(boost::system::error_code& ec);
+    virtual boost::asio::ip::tcp::endpoint
+            getLocalAddress(boost::system::error_code& ec);
+
+    virtual void channelConnected(const OperationCallbackPtr& callback);
+
+    // Start SSL Hand Shake after the channel is connected
+    void startHandShake(const OperationCallbackPtr& callback);
+    // Asio Callback After Hand Shake
+    static void handleHandshake(AsioSSLDuplexChannelPtr channel,
+                                OperationCallbackPtr callback,
+                                const boost::system::error_code& error);
+
+    void sslChannelConnected(const OperationCallbackPtr& callback);
+
+    // Write a buffer to underlying socket
+    virtual void writeBuffer(boost::asio::streambuf& buffer,
+                             const OperationCallbackPtr& callback);
+
+    // Read a message from underlying socket
+    virtual void readMsgSize(boost::asio::streambuf& buffer);
+    virtual void readMsgBody(boost::asio::streambuf& buffer,
+                             int toread, uint32_t msgSize);
+
+    // close the underlying socket to release resource 
+    virtual void closeSocket();
+
+  private:
+#ifndef __APPLE__
+    // Shutdown ssl
+    void sslShutdown();
+    // Handle ssl shutdown
+    void handleSSLShutdown(const boost::system::error_code& error);
+#endif
+    // Close lowest layer
+    void closeLowestLayer();
+
+    // underlying ssl socket
+    boost_ssl_socket_ptr ssl_socket;
+    // ssl context
+    boost_ssl_context_ptr ssl_ctx;
+
+#ifndef __APPLE__
+    // Flag indicated ssl is closed.
+    bool sslclosed;
+    boost::mutex sslclosed_lock;
+    boost::condition_variable sslclosed_cond;
+#endif
+  };
   
 
   struct DuplexChannelPtrHash : public std::unary_function<DuplexChannelPtr, size_t> {

Modified: zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/lib/client.cpp
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/lib/client.cpp?rev=1392319&r1=1392318&r2=1392319&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/lib/client.cpp (original)
+++ zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/lib/client.cpp Mon Oct  1 13:36:31 2012
@@ -39,6 +39,8 @@ const std::string Configuration::SYNC_RE
 const std::string Configuration::SUBSCRIBER_AUTOCONSUME = "hedwig.cpp.subscriber_autoconsume";
 const std::string Configuration::NUM_DISPATCH_THREADS = "hedwig.cpp.num_dispatch_threads";
 const std::string Configuration::SUBSCRIPTION_MESSAGE_BOUND = "hedwig.cpp.subscription_message_bound";
+const std::string Configuration::RUN_AS_SSL_MODE = "hedwig.cpp.ssl_mode";
+const std::string Configuration::SSL_PEM_FILE = "hedwig.cpp.ssl_pem";
 
 Client::Client(const Configuration& conf) {
   LOG4CXX_DEBUG(logger, "Client::Client (" << this << ")");

Modified: zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/lib/clientimpl.cpp
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/lib/clientimpl.cpp?rev=1392319&r1=1392318&r2=1392319&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/lib/clientimpl.cpp (original)
+++ zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/lib/clientimpl.cpp Mon Oct  1 13:36:31 2012
@@ -30,7 +30,6 @@ static log4cxx::LoggerPtr logger(log4cxx
 using namespace Hedwig;
 
 const std::string DEFAULT_SERVER_DEFAULT_VAL = "";
-const int DEFAULT_NUM_DISPATCH_THREADS = 1;
 
 void SyncOperationCallback::wait() {
   boost::unique_lock<boost::mutex> lock(mut);
@@ -182,25 +181,25 @@ long ClientTxnCounter::next() {  // woul
 ClientImplPtr ClientImpl::Create(const Configuration& conf) {
   ClientImplPtr impl(new ClientImpl(conf));
   LOG4CXX_DEBUG(logger, "Creating Clientimpl " << impl);
-
   impl->dispatcher->start();
-
   return impl;
 }
 
 void ClientImpl::Destroy() {
   LOG4CXX_DEBUG(logger, "destroying Clientimpl " << this);
 
-  dispatcher->stop();
   {
     boost::lock_guard<boost::shared_mutex> lock(allchannels_lock);
     
     shuttingDownFlag = true;
     for (ChannelMap::iterator iter = allchannels.begin(); iter != allchannels.end(); ++iter ) {
-      (*iter)->kill();
+      (*iter)->close();
     }  
     allchannels.clear();
   }
+  // SSL Channel shutdown needs to send packets to server
+  // so we only stop dispatcher after all channels are closed
+  dispatcher->stop();
 
   /* destruction of the maps will clean up any items they hold */
   
@@ -217,8 +216,10 @@ void ClientImpl::Destroy() {
 ClientImpl::ClientImpl(const Configuration& conf) 
   : conf(conf), publisher(NULL), subscriber(NULL), counterobj(), shuttingDownFlag(false)
 {
-  defaultHost = HostAddress::fromString(conf.get(Configuration::DEFAULT_SERVER, DEFAULT_SERVER_DEFAULT_VAL));
-  dispatcher = EventDispatcherPtr(new EventDispatcher(conf.getInt(Configuration::NUM_DISPATCH_THREADS, DEFAULT_NUM_DISPATCH_THREADS)));
+  defaultHost = HostAddress::fromString(conf.get(Configuration::DEFAULT_SERVER,
+                                                 DEFAULT_SERVER_DEFAULT_VAL));
+  dispatcher = EventDispatcherPtr(new EventDispatcher(conf));
+  channelManager = DuplexChannelManager::create(conf, *dispatcher);
 }
 
 Subscriber& ClientImpl::getSubscriber() {
@@ -275,15 +276,13 @@ void ClientImpl::redirectRequest(const D
     if (data->getType() == SUBSCRIBE) {
       // a redirect for subscription, kill old channel and remove old channel from all channels list
       // otherwise old channel will not be destroyed, caused lost of CLOSE_WAIT connections
-      channel->kill();
-      {
-        boost::lock_guard<boost::shared_mutex> aclock(allchannels_lock);
-        allchannels.erase(channel); // channel should be deleted here
-      }
+      removeAndCloseChannel(channel);
+
       SubscriberClientChannelHandlerPtr handler(new SubscriberClientChannelHandler(shared_from_this(), 
 										   this->getSubscriberImpl(), data));
       newchannel = createChannel(data->getTopic(), handler);
       handler->setChannel(newchannel);
+      newchannel->connect();
       getSubscriberImpl().doSubscribe(newchannel, data, handler);
     } else if (data->getType() == PUBLISH) {
       newchannel = getChannel(data->getTopic());
@@ -314,14 +313,15 @@ DuplexChannelPtr ClientImpl::createChann
     setHostForTopic(topic, addr);
   }
 
-  DuplexChannelPtr channel(new DuplexChannel(*dispatcher, addr, conf, handler));
+  DuplexChannelPtr channel = channelManager->createChannel(addr, handler);
 
   boost::lock_guard<boost::shared_mutex> lock(allchannels_lock);
   if (shuttingDownFlag) {
-    channel->kill();
+    channel->close();
     throw ShuttingDownException();
   }
-  channel->connect();
+  // Don't connect here, otherwise connect callback may be triggered before setChannel
+  // channel->connect();
 
   allchannels.insert(channel);
   LOG4CXX_DEBUG(logger, "(create) All channels size: " << allchannels.size());
@@ -345,6 +345,7 @@ DuplexChannelPtr ClientImpl::getChannel(
     LOG4CXX_DEBUG(logger, " No channel for topic, creating new channel.get() " << channel.get() << " addr " << addr.getAddressString());
     ChannelHandlerPtr handler(new HedwigClientChannelHandler(shared_from_this()));
     channel = createChannel(topic, handler);
+    channel->connect();
 
     boost::lock_guard<boost::shared_mutex> lock(host2channel_lock);
     host2channel[addr] = channel;
@@ -376,7 +377,6 @@ void ClientImpl::channelDied(const Duple
   boost::lock_guard<boost::shared_mutex> h2tlock(host2topics_lock);
   boost::lock_guard<boost::shared_mutex> h2clock(host2channel_lock);
   boost::lock_guard<boost::shared_mutex> t2hlock(topic2host_lock);
-  boost::lock_guard<boost::shared_mutex> aclock(allchannels_lock);
   // get host
   HostAddress addr = channel->getHostAddress();
   
@@ -385,8 +385,13 @@ void ClientImpl::channelDied(const Duple
   }
   host2topics.erase(addr);
   host2channel.erase(addr);
+  removeAndCloseChannel(channel);
+}
 
+void ClientImpl::removeAndCloseChannel(const DuplexChannelPtr& channel) {
+  boost::lock_guard<boost::shared_mutex> aclock(allchannels_lock);
   allchannels.erase(channel); // channel should be deleted here
+  channel->close(); // close channel
 }
 
 const Configuration& ClientImpl::getConfiguration() {
@@ -394,5 +399,5 @@ const Configuration& ClientImpl::getConf
 }
 
 boost::asio::io_service& ClientImpl::getService() {
-  return dispatcher->getService();
+  return dispatcher->getService()->getService();
 }

Modified: zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/lib/clientimpl.h
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/lib/clientimpl.h?rev=1392319&r1=1392318&r2=1392319&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/lib/clientimpl.h (original)
+++ zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/lib/clientimpl.h Mon Oct  1 13:36:31 2012
@@ -206,6 +206,7 @@ namespace Hedwig {
 
     void setChannelForHost(const HostAddress& address, const DuplexChannelPtr& channel);
     void channelDied(const DuplexChannelPtr& channel);
+    void removeAndCloseChannel(const DuplexChannelPtr& channel);
     bool shuttingDown() const;
     
     SubscriberImpl& getSubscriberImpl();
@@ -230,7 +231,8 @@ namespace Hedwig {
 
     typedef boost::shared_ptr<EventDispatcher> EventDispatcherPtr;
     EventDispatcherPtr dispatcher;
-    
+    DuplexChannelManagerPtr channelManager;
+
     typedef std::tr1::unordered_multimap<HostAddress, std::string, HostAddressHash > Host2TopicsMap;
     Host2TopicsMap host2topics;
     boost::shared_mutex host2topics_lock;

Modified: zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/lib/eventdispatcher.cpp
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/lib/eventdispatcher.cpp?rev=1392319&r1=1392318&r2=1392319&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/lib/eventdispatcher.cpp (original)
+++ zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/lib/eventdispatcher.cpp Mon Oct  1 13:36:31 2012
@@ -27,41 +27,71 @@ static log4cxx::LoggerPtr logger(log4cxx
 
 using namespace Hedwig;
 
-EventDispatcher::EventDispatcher(int numThreads)
-  : num_threads(numThreads), running(false), next_io_service(0) {
-  if (0 == num_threads) {
-    throw std::runtime_error("number of threads in dispatcher is zero");
-  }
-  for (size_t i = 0; i < num_threads; i++) {
-    io_service_ptr service(new boost::asio::io_service);
-    services.push_back(service);
+const int DEFAULT_NUM_DISPATCH_THREADS = 1;
+
+IOService::IOService() {
+}
+
+IOService::~IOService() {}
+
+void IOService::start() {
+  if (work.get()) {
+    return;
   }
+  work = work_ptr(new boost::asio::io_service::work(service));
 }
 
-void EventDispatcher::run_forever(io_service_ptr service, size_t idx) {
-  LOG4CXX_DEBUG(logger, "Starting event dispatcher " << idx);
+void IOService::stop() {
+  if (!work.get()) {
+    return;
+  }
+
+  work = work_ptr();
+  service.stop();
+}
 
+void IOService::run() {
   while (true) {
     try {
-      service->run();
+      service.run();
       break;
     } catch (std::exception &e) {
-    LOG4CXX_ERROR(logger, "Exception in dispatch handler " << idx << " : " << e.what());
+      LOG4CXX_ERROR(logger, "Exception in IO Service " << this << " : " << e.what());
     }
   }
-  LOG4CXX_DEBUG(logger, "Event dispatcher " << idx << " done");
+}
+
+EventDispatcher::EventDispatcher(const Configuration& conf)
+  : conf(conf), running(false), next_io_service(0) {
+  num_threads = conf.getInt(Configuration::NUM_DISPATCH_THREADS,
+                            DEFAULT_NUM_DISPATCH_THREADS);
+  if (0 == num_threads) {
+    throw std::runtime_error("number of threads in dispatcher is zero");
+  }
+  for (size_t i = 0; i < num_threads; i++) {
+    services.push_back(IOServicePtr(new IOService()));
+  }
+}
+
+void EventDispatcher::run_forever(IOServicePtr service, size_t idx) {
+  LOG4CXX_INFO(logger, "Starting event dispatcher " << idx);
+
+  service->run();
+
+  LOG4CXX_INFO(logger, "Event dispatcher " << idx << " done");
 }
 
 void EventDispatcher::start() {
   if (running) {
     return;
   }
+
   for (size_t i = 0; i < num_threads; i++) {
-    io_service_ptr service = services[i];
-    work_ptr work(new boost::asio::io_service::work(*service));
-    works.push_back(work);
+    IOServicePtr service = services[i];
+    service->start();
     // new thread
-    thread_ptr t(new boost::thread(boost::bind(&EventDispatcher::run_forever, this, service, i)));
+    thread_ptr t(new boost::thread(boost::bind(&EventDispatcher::run_forever,
+                                               this, service, i)));
     threads.push_back(t);
   }
   running = true;
@@ -72,8 +102,6 @@ void EventDispatcher::stop() {
     return;
   }
 
-  works.clear();
-
   for (size_t i = 0; i < num_threads; i++) {
     services[i]->stop();
   }
@@ -90,13 +118,12 @@ EventDispatcher::~EventDispatcher() {
   services.clear();
 }
 
-boost::asio::io_service& EventDispatcher::getService() {
+IOServicePtr& EventDispatcher::getService() {
   size_t next = 0;
   {
     boost::lock_guard<boost::mutex> lock(next_lock);
     next = next_io_service;
     next_io_service = (next_io_service + 1) % num_threads;
   }
-  boost::asio::io_service& service = *services[next];
-  return service;
+  return services[next];
 }

Modified: zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/lib/eventdispatcher.h
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/lib/eventdispatcher.h?rev=1392319&r1=1392318&r2=1392319&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/lib/eventdispatcher.h (original)
+++ zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/lib/eventdispatcher.h Mon Oct  1 13:36:31 2012
@@ -20,37 +20,62 @@
 
 #include <vector>
 
+#include <hedwig/client.h>
+
 #include <boost/asio.hpp>
 #include <boost/thread.hpp>
-#include <boost/thread/mutex.hpp>
 #include <boost/shared_ptr.hpp>
 
 namespace Hedwig {
-  typedef boost::shared_ptr<boost::asio::io_service> io_service_ptr;
   typedef boost::shared_ptr<boost::asio::io_service::work> work_ptr;
   typedef boost::shared_ptr<boost::thread> thread_ptr;
 
+  class IOService;
+  typedef boost::shared_ptr<IOService> IOServicePtr;
+
+  class IOService {
+  public:
+    IOService();
+    virtual ~IOService();
+
+    // start the io service
+    void start();
+    // stop the io service
+    void stop();
+    // run the io service
+    void run();
+
+    inline boost::asio::io_service& getService() {
+      return service;
+    }
+
+  private:
+    boost::asio::io_service service;  
+    work_ptr work;
+  };
+
   class EventDispatcher {
   public:  
-    EventDispatcher(int numThreads = 1);
+    EventDispatcher(const Configuration& conf);
     ~EventDispatcher();
     
     void start();
+
     void stop();
     
-    boost::asio::io_service& getService();
-    
+    IOServicePtr& getService();
+
   private:
-    void run_forever(io_service_ptr service, size_t idx);
+    void run_forever(IOServicePtr service, size_t idx);
+
+    const Configuration& conf;
 
     // number of threads
     size_t num_threads;
     // running flag
     bool running;
     // pool of io_services.
-    std::vector<io_service_ptr> services;
-    // pool of works
-    std::vector<work_ptr> works;
+    std::vector<IOServicePtr> services;
     // threads
     std::vector<thread_ptr> threads;
     // next io_service used for a connection

Modified: zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/lib/subscriberimpl.cpp
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/lib/subscriberimpl.cpp?rev=1392319&r1=1392318&r2=1392319&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/lib/subscriberimpl.cpp (original)
+++ zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/lib/subscriberimpl.cpp Mon Oct  1 13:36:31 2012
@@ -129,7 +129,8 @@ void SubscriberConsumeCallback::operatio
   LOG4CXX_ERROR(logger, "Error passing message to client transaction: " << data->getTxnId() << " error: " << exception.what() 
 		<< " retrying in " << retrywait << " Microseconds");
 
-  boost::asio::deadline_timer t(handler->getChannel()->getService(), boost::posix_time::milliseconds(retrywait));
+  AbstractDuplexChannelPtr chPtr = boost::dynamic_pointer_cast<AbstractDuplexChannel>(handler->getChannel());
+  boost::asio::deadline_timer t(chPtr->getService(), boost::posix_time::milliseconds(retrywait));
 
   t.async_wait(boost::bind(&SubscriberConsumeCallback::timerComplete, handler, m, boost::asio::placeholders::error));  
 }
@@ -179,8 +180,23 @@ void SubscriberClientChannelHandler::mes
 void SubscriberClientChannelHandler::close() {
   closed = true;
 
+  // cancel reconnect timer
+  if (reconnectTimer.get()) {
+    boost::system::error_code ec;
+    reconnectTimer->cancel(ec);
+    if (ec) {
+      LOG4CXX_WARN(logger, "Handler " << this << " cancel reconnect task " << reconnectTimer.get() << " error :" << ec.message().c_str());
+    }
+  }
+
+  LOG4CXX_INFO(logger, "close subscription handler " << this << " for (topic:" << origData->getTopic()
+                    << ", subscriberId:" << origData->getSubscriberId() << ").");
   if (channel.get()) {
-    channel->kill();
+    // need to ensure the channel is removed from allchannels list
+    // since it will be killed
+    client->removeAndCloseChannel(channel);
+    LOG4CXX_INFO(logger, "removed subscription channel " << channel.get() << " for (topic: " << origData->getTopic()
+                      << ", subscriberId:" << origData->getSubscriberId() << ").");
   }
 }
 
@@ -236,9 +252,10 @@ void SubscriberClientChannelHandler::rec
   if (should_wait) {
     int retrywait = client->getConfiguration().getInt(Configuration::RECONNECT_SUBSCRIBE_RETRY_WAIT_TIME,
 						      DEFAULT_RECONNECT_SUBSCRIBE_RETRY_WAIT_TIME);
-    
+    AbstractDuplexChannelPtr chPtr = boost::dynamic_pointer_cast<AbstractDuplexChannel>(channel);
     // set reconnect timer
-    reconnectTimer = ReconnectTimerPtr(new boost::asio::deadline_timer(channel->getService(), boost::posix_time::milliseconds(retrywait)));
+    reconnectTimer = ReconnectTimerPtr(new boost::asio::deadline_timer(chPtr->getService(),
+                                       boost::posix_time::milliseconds(retrywait)));
     reconnectTimer->async_wait(boost::bind(&SubscriberClientChannelHandler::reconnectTimerComplete, shared_from_this(),
 			     channel, e, boost::asio::placeholders::error));  
     return;
@@ -256,6 +273,8 @@ void SubscriberClientChannelHandler::rec
   
   DuplexChannelPtr newchannel = client->createChannel(origData->getTopic(), baseptr);
   newhandler->setChannel(newchannel);
+  newchannel->connect();
+  LOG4CXX_DEBUG(logger, "Create a new channel " << newchannel.get() << " to handover delivery to new handler " << newhandler.get());
   handoverDelivery(newhandler);
   
   // remove record of the failed channel from the subscriber
@@ -368,6 +387,10 @@ void SubscriberImpl::asyncSubscribe(cons
 
   DuplexChannelPtr channel = client->createChannel(topic, handler);
   handler->setChannel(channel);
+  channel->connect();
+  LOG4CXX_INFO(logger, "New handler " << handler.get() << " on channel " << channel.get()
+                    << " is created for (topic:" << topic << ", subscriber:" << subscriberId << ", txn:"
+                    << data->getTxnId() << ").");
   doSubscribe(channel, data, handler);
 }
 
@@ -377,15 +400,19 @@ void SubscriberImpl::doSubscribe(const D
   OperationCallbackPtr writecb(new SubscriberWriteCallback(client, data));
   channel->writeRequest(data->getRequest(), writecb);
 
-  boost::lock_guard<boost::shared_mutex> lock(topicsubscriber2handler_lock);
-  TopicSubscriber t(data->getTopic(), data->getSubscriberId());
-  SubscriberClientChannelHandlerPtr oldhandler = topicsubscriber2handler[t];
-  if (oldhandler != NULL) {
+  SubscriberClientChannelHandlerPtr oldhandler;
+  {
+    boost::lock_guard<boost::shared_mutex> lock(topicsubscriber2handler_lock);
+    TopicSubscriber t(data->getTopic(), data->getSubscriberId());
+    oldhandler = topicsubscriber2handler[t];
+    topicsubscriber2handler[t] = handler;
+  }
+  if (oldhandler.get() != NULL) {
+    LOG4CXX_DEBUG(logger, "(topic:" << data->getTopic() << ", subscriber:" << data->getSubscriberId()
+                          << ") handover delivery from old handler " << oldhandler.get()
+                          << " to new handler " << handler.get());
     oldhandler->handoverDelivery(handler);
   }
-  topicsubscriber2handler[t] = handler;
-  
-  LOG4CXX_DEBUG(logger, "Set topic subscriber for topic(" << data->getTopic() << ") subscriberId(" << data->getSubscriberId() << ") to " << handler.get() << " topicsubscriber2topic(" << &topicsubscriber2handler << ")");
 }
 
 void SubscriberImpl::unsubscribe(const std::string& topic, const std::string& subscriberId) {

Modified: zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/lib/subscriberimpl.h
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/lib/subscriberimpl.h?rev=1392319&r1=1392318&r2=1392319&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/lib/subscriberimpl.h (original)
+++ zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/lib/subscriberimpl.h Mon Oct  1 13:36:31 2012
@@ -107,7 +107,8 @@ namespace Hedwig {
   class SubscriberClientChannelHandler : public HedwigClientChannelHandler, 
 					 public boost::enable_shared_from_this<SubscriberClientChannelHandler> {
   public: 
-    SubscriberClientChannelHandler(const ClientImplPtr& client, SubscriberImpl& subscriber, const PubSubDataPtr& data);
+    SubscriberClientChannelHandler(const ClientImplPtr& client, SubscriberImpl& subscriber,
+                                   const PubSubDataPtr& data);
     ~SubscriberClientChannelHandler();
 
     void messageReceived(const DuplexChannelPtr& channel, const PubSubResponsePtr& m);

Modified: zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/lib/util.cpp
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/lib/util.cpp?rev=1392319&r1=1392318&r2=1392319&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/lib/util.cpp (original)
+++ zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/lib/util.cpp Mon Oct  1 13:36:31 2012
@@ -39,7 +39,7 @@ const std::string UNITIALISED_HOST("UNIN
 const int DEFAULT_PORT = 4080;
 const int DEFAULT_SSL_PORT = 9876;
 
-HostAddress::HostAddress() : initialised(false), address_str() {
+HostAddress::HostAddress() : initialised(false), address_str(), ssl_host_port(0) {
 }
 
 HostAddress::~HostAddress() {
@@ -65,13 +65,23 @@ uint32_t HostAddress::ip() const {
   return host_ip;
 }
 
+void HostAddress::updateIP(uint32_t ip) {
+  this->host_ip = ip;
+}
+
 uint16_t HostAddress::port() const {
   return host_port;
 }
 
+uint16_t HostAddress::sslPort() const {
+  return ssl_host_port;
+}
+
 void HostAddress::parse_string() {
   char* url = strdup(address_str.c_str());
 
+  LOG4CXX_DEBUG(logger, "Parse address : " << url);
+
   if (url == NULL) {
     LOG4CXX_ERROR(logger, "You seems to be out of memory");
     throw OomException();
@@ -130,6 +140,7 @@ void HostAddress::parse_string() {
 
   host_ip = ntohl(socket_addr.sin_addr.s_addr);
   host_port = ntohs(socket_addr.sin_port);
+  ssl_host_port = sslport;
 
   freeaddrinfo(addr);
   free((void*)url);

Modified: zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/lib/util.h
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/lib/util.h?rev=1392319&r1=1392318&r2=1392319&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/lib/util.h (original)
+++ zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/lib/util.h Mon Oct  1 13:36:31 2012
@@ -53,6 +53,11 @@ namespace Hedwig {
     const std::string& getAddressString() const;
     uint32_t ip() const;
     uint16_t port() const;
+    uint16_t sslPort() const;
+
+    // the real ip address is different from default server
+    // if default server is a VIP
+    void updateIP(uint32_t ip);
 
     static HostAddress fromString(std::string host);
 
@@ -64,6 +69,7 @@ namespace Hedwig {
     std::string address_str;
     uint32_t host_ip;
     uint16_t host_port;
+    uint16_t ssl_host_port;
   };
 
   /**

Modified: zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/scripts/log4cxx.conf
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/scripts/log4cxx.conf?rev=1392319&r1=1392318&r2=1392319&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/scripts/log4cxx.conf (original)
+++ zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/scripts/log4cxx.conf Mon Oct  1 13:36:31 2012
@@ -22,20 +22,21 @@
 log4j.appender.rootAppender=org.apache.log4j.ConsoleAppender
 log4j.appender.rootAppender.layout=org.apache.log4j.BasicLayout
 
-#log4j.appender.hedwig=org.apache.log4j.RollingFileAppender
-log4j.appender.hedwig=org.apache.log4j.ConsoleAppender
-#log4j.appender.hedwig.fileName=./testLog.log
+log4j.appender.hedwig=org.apache.log4j.RollingFileAppender
+#log4j.appender.hedwig=org.apache.log4j.ConsoleAppender
+log4j.appender.hedwig.fileName=./testLog.log
 log4j.appender.hedwig.layout=org.apache.log4j.PatternLayout
 log4j.appender.hedwig.layout.ConversionPattern=[%d{%H:%M:%S.%l}] %t %p %c - %m%n
 
-log4j.appender.hedwigtest=org.apache.log4j.ConsoleAppender
-#log4j.appender.hedwig.fileName=./testLog.log
+log4j.appender.hedwig=org.apache.log4j.RollingFileAppender
+#log4j.appender.hedwigtest=org.apache.log4j.ConsoleAppender
+log4j.appender.hedwig.fileName=./testLog.log
 log4j.appender.hedwigtest.layout=org.apache.log4j.PatternLayout
 log4j.appender.hedwigtest.layout.ConversionPattern=[%d{%H:%M:%S.%l}] %t %p %c - %m%n
 
 # category
-log4j.category.hedwig=OFF, hedwig
-log4j.category.hedwigtest=OFF, hedwigtest
+log4j.category.hedwig=DEBUG, hedwig
+log4j.category.hedwigtest=DEBUG, hedwigtest
 
 log4j.rootCategory=OFF
 

Modified: zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/scripts/server-control.sh
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/scripts/server-control.sh?rev=1392319&r1=1392318&r2=1392319&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/scripts/server-control.sh (original)
+++ zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/scripts/server-control.sh Mon Oct  1 13:36:31 2012
@@ -94,6 +94,7 @@ start_hw_server () {
     REGION=$1
     COUNT=$2
     PORT=$((4080+$COUNT))
+    SSL_PORT=$((9876+$COUNT))
 
     export HEDWIG_LOG_CONF=/tmp/hw-log4j-$COUNT.properties
     cat > $HEDWIG_LOG_CONF <<EOF
@@ -112,6 +113,8 @@ log4j.appender.ROLLINGFILE.MaxFileSize=1
 #log4j.appender.ROLLINGFILE.MaxBackupIndex=10
 log4j.appender.ROLLINGFILE.layout=org.apache.log4j.PatternLayout
 log4j.appender.ROLLINGFILE.layout.ConversionPattern=%d{ISO8601} - %-5p [%t:%C{1}@%L] - %m%n
+log4j.logger.org.apache.zookeeper=OFF,ROLLINGFILE
+log4j.logger.org.apache.hedwig.zookeeper=OFF,ROLLINGFILE
 EOF
 
     export HEDWIG_SERVER_CONF=/tmp/hw-server-$COUNT.conf
@@ -122,9 +125,11 @@ zk_timeout=2000
 # The port at which the clients will connect.
 server_port=$PORT
 # The SSL port at which the clients will connect (only if SSL is enabled).
-ssl_server_port=9876
+ssl_server_port=$SSL_PORT
 # Flag indicating if the server should also operate in SSL mode.
-ssl_enabled=false
+ssl_enabled=true
+cert_path=$PWD/../../../../../hedwig-server/src/main/resources/server.p12
+password=eUySvp2phM2Wk
 region=$REGION
 EOF
     sh $HWSCRIPT server 2>&1 > hwoutput.$COUNT.log &

Modified: zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/scripts/tester.sh
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/scripts/tester.sh?rev=1392319&r1=1392318&r2=1392319&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/scripts/tester.sh (original)
+++ zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/scripts/tester.sh Mon Oct  1 13:36:31 2012
@@ -24,15 +24,19 @@ export LOG4CXX_CONF=`pwd`/log4cxx.conf
 source network-delays.sh
 source server-control.sh
 
-all() {
+runtest() {
     if [ "z$HEDWIG_NETWORK_DELAY" != "z" ]; then
 	setup_delays $HEDWIG_NETWORK_DELAY
     fi
 
     stop_cluster;
     start_cluster;
+    if [ "z$1" != "z" ]; then
+      ../test/hedwigtest -s true
+    else
+      ../test/hedwigtest
+    fi
 
-    ../test/hedwigtest 
     RESULT=$?
     stop_cluster;
 
@@ -101,6 +105,12 @@ case "$1" in
     stop-cluster)
 	stop_cluster 
 	;;
+    simple-test)
+        runtest
+        ;;
+    ssl-test)
+        runtest ssl
+        ;;
     setup-delays)
 	setup_delays $2
 	;;
@@ -108,7 +118,8 @@ case "$1" in
 	clear_delays
 	;;
     all)
-	all
+	runtest
+	runtest ssl
 	;;
     singletest)
 	singletest $2
@@ -118,7 +129,13 @@ case "$1" in
 Usage: tester.sh [command]
 
 tester.sh all
-   Run through the tests, setting up and cleaning up all prerequisites.
+   Run through the tests (both simple and ssl), setting up and cleaning up all prerequisites.
+
+tester.sh simple-test
+   Run through the tests (simple mode), setting up and cleaning up all prerequisites.
+
+tester.sh ssl-test
+   Run through the tests (ssl mode), setting up and cleaning up all prerequisites.
 
 tester.sh singletest <name>
    Run a single test

Modified: zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/test/Makefile.am
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/test/Makefile.am?rev=1392319&r1=1392318&r2=1392319&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/test/Makefile.am (original)
+++ zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/test/Makefile.am Mon Oct  1 13:36:31 2012
@@ -26,6 +26,13 @@ hedwigtest_LDFLAGS = -no-undefined $(BOO
 
 check: hedwigtest
 	bash ../scripts/tester.sh all
+
+sslcheck: hedwigtest
+	bash ../scripts/tester.sh ssl-test
+
+simplecheck: hedwigtest
+	bash ../scripts/tester.sh simple-test
+
 else
 check:
 	@echo "\n\nYou haven't configured with gtest. Run the ./configure command with --enable-gtest=<path_to_gtest>"

Modified: zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/test/main.cpp
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/test/main.cpp?rev=1392319&r1=1392318&r2=1392319&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/test/main.cpp (original)
+++ zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/test/main.cpp Mon Oct  1 13:36:31 2012
@@ -34,6 +34,9 @@
 
 #include "gtest/gtest.h"
 
+bool TestServerConfiguration::isSSL = false;
+std::string TestServerConfiguration::certFile = "";
+
 int main( int argc, char **argv)
 {
   try {
@@ -48,6 +51,25 @@ int main( int argc, char **argv)
   } catch (...) {
     std::cerr << "unknown exception while configuring log4cpp vi'." << std::endl;
   }
+
+  // Enable SSL for testing
+  int opt;
+  while((opt = getopt(argc,argv,"s:c:")) > 0) {
+    switch(opt) {
+    case 's':
+      if (std::string(optarg) == "true") {
+        std::cout << "run in ssl mode...." << std::endl;
+        TestServerConfiguration::isSSL = true;
+      } else {
+        TestServerConfiguration::isSSL = false;
+      }
+      break;
+    case 'c':
+      std::cout << "use cert file :" << optarg << std::endl;
+      TestServerConfiguration::certFile = std::string(optarg);
+      break;
+    }//switch
+  }//while
   
   ::testing::InitGoogleTest(&argc, argv);
   int ret = RUN_ALL_TESTS();

Modified: zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/test/messageboundtest.cpp
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/test/messageboundtest.cpp?rev=1392319&r1=1392318&r2=1392319&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/test/messageboundtest.cpp (original)
+++ zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/test/messageboundtest.cpp Mon Oct  1 13:36:31 2012
@@ -33,32 +33,16 @@
 
 static log4cxx::LoggerPtr logger(log4cxx::Logger::getLogger("hedwig."__FILE__));
 
-
-class MessageBoundConfiguration : public Hedwig::Configuration {
+class MessageBoundConfiguration : public TestServerConfiguration {
 public:
-  MessageBoundConfiguration() : address("localhost:4081") {}
+  MessageBoundConfiguration() : TestServerConfiguration() {}
     
   virtual int getInt(const std::string& key, int defaultVal) const {
     if (key == Configuration::SUBSCRIPTION_MESSAGE_BOUND) {
       return 5;
     }
-    return defaultVal;
-  }
-
-  virtual const std::string get(const std::string& key, const std::string& defaultVal) const {
-    if (key == Configuration::DEFAULT_SERVER) {
-      return address;
-    } else {
-      return defaultVal;
-    }
+    return TestServerConfiguration::getInt(key, defaultVal);
   }
-    
-  virtual bool getBool(const std::string& /*key*/, bool defaultVal) const {
-    return defaultVal;
-  }
-
-protected:
-  const std::string address;
 };
     
 class MessageBoundOrderCheckingMessageHandlerCallback : public Hedwig::MessageHandlerCallback {
@@ -91,10 +75,16 @@ protected:
 
 void sendXExpectLastY(Hedwig::Publisher& pub, Hedwig::Subscriber& sub, const std::string& topic, 
 		      const std::string& subid, int X, int Y) {
-  for (int i = 0; i < X; i++) {
+  for (int i = 0; i < X;) {
     std::stringstream oss;
     oss << i;
-    pub.publish(topic, oss.str());
+    try {
+      pub.publish(topic, oss.str());
+      ++i;
+    } catch (std::exception &e) {
+      LOG4CXX_WARN(logger, "Exception when publishing message " << i << " : "
+                           << e.what());
+    }
   }
 
   sub.subscribe(topic, subid, Hedwig::SubscribeRequest::ATTACH);

Modified: zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/test/messagefiltertest.cpp
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/test/messagefiltertest.cpp?rev=1392319&r1=1392318&r2=1392319&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/test/messagefiltertest.cpp (original)
+++ zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/test/messagefiltertest.cpp Mon Oct  1 13:36:31 2012
@@ -33,32 +33,17 @@
 
 static log4cxx::LoggerPtr logger(log4cxx::Logger::getLogger("hedwig."__FILE__));
 
-class MessageFilterConfiguration : public Hedwig::Configuration {
+class MessageFilterConfiguration : public TestServerConfiguration {
 public:
-  MessageFilterConfiguration() : address("localhost:4081") {}
-  
-  virtual int getInt(const std::string& key, int defaultVal) const {
-    return defaultVal;
-  }
-
-  virtual const std::string get(const std::string& key, const std::string& defaultVal) const {
-    if (key == Configuration::DEFAULT_SERVER) {
-      return address;
-    } else {
-      return defaultVal;
-    }
-  }
+  MessageFilterConfiguration() : TestServerConfiguration() {}
   
   virtual bool getBool(const std::string& key, bool defaultVal) const {
     if (key == Configuration::SUBSCRIBER_AUTOCONSUME) {
       return false;
     } else {
-      return defaultVal;
+      return TestServerConfiguration::getBool(key, defaultVal);
     }
   }
-
-  protected:
-  const std::string address;
 };
     
 class ModMessageFilter : public Hedwig::ClientMessageFilter {