You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zookeeper.apache.org by iv...@apache.org on 2012/10/18 12:59:35 UTC

svn commit: r1399578 [1/2] - in /zookeeper/bookkeeper/trunk: ./ hedwig-client/src/main/cpp/inc/hedwig/ hedwig-client/src/main/cpp/lib/ hedwig-client/src/main/cpp/test/

Author: ivank
Date: Thu Oct 18 10:59:34 2012
New Revision: 1399578

URL: http://svn.apache.org/viewvc?rev=1399578&view=rev
Log:
BOOKKEEPER-369: re-factor hedwig cpp client to provide better interface to support both one-subscription-per-channel and multiple-subscriptions-per-channel. (sijie via ivank)

Added:
    zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/lib/simplesubscriberimpl.cpp
    zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/lib/simplesubscriberimpl.h
Modified:
    zookeeper/bookkeeper/trunk/CHANGES.txt
    zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/inc/hedwig/exceptions.h
    zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/inc/hedwig/subscribe.h
    zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/lib/Makefile.am
    zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/lib/channel.cpp
    zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/lib/channel.h
    zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/lib/clientimpl.cpp
    zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/lib/clientimpl.h
    zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/lib/data.cpp
    zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/lib/data.h
    zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/lib/eventdispatcher.cpp
    zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/lib/publisherimpl.cpp
    zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/lib/publisherimpl.h
    zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/lib/subscriberimpl.cpp
    zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/lib/subscriberimpl.h
    zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/lib/util.cpp
    zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/lib/util.h
    zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/test/messageboundtest.cpp
    zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/test/pubsubtest.cpp
    zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/test/subscribetest.cpp
    zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/test/util.h

Modified: zookeeper/bookkeeper/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/CHANGES.txt?rev=1399578&r1=1399577&r2=1399578&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/CHANGES.txt (original)
+++ zookeeper/bookkeeper/trunk/CHANGES.txt Thu Oct 18 10:59:34 2012
@@ -198,6 +198,8 @@ Trunk (unreleased changes)
 
         BOOKKEEPER-413: Hedwig C++ client: Rename RUN_AS_SSL_MODE to SSL_ENABLED (ivank via sijie)
 
+        BOOKKEEPER-369: re-factor hedwig cpp client to provide better interface to support both one-subscription-per-channel and multiple-subscriptions-per-channel. (sijie via ivank)
+
 Release 4.1.0 - 2012-06-07
 
   Non-backward compatible changes:

Modified: zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/inc/hedwig/exceptions.h
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/inc/hedwig/exceptions.h?rev=1399578&r1=1399577&r2=1399578&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/inc/hedwig/exceptions.h (original)
+++ zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/inc/hedwig/exceptions.h Thu Oct 18 10:59:34 2012
@@ -32,16 +32,20 @@ namespace Hedwig {
   class OomException : public ClientException {};
   class UnknownRequestException : public ClientException {};
   class InvalidRedirectException : public ClientException {};
+  class NoChannelHandlerException : public ClientException {};
 
   class PublisherException : public ClientException { };
   
-
   class SubscriberException : public ClientException { };
   class AlreadySubscribedException : public SubscriberException {};
   class NotSubscribedException : public SubscriberException {};
+  class ResubscribeException : public SubscriberException {};
   class NullMessageHandlerException : public SubscriberException {};
   class NullMessageFilterException : public SubscriberException {};
 
+  class AlreadyStartDeliveryException : public SubscriberException {};
+  class StartingDeliveryException : public SubscriberException {};
+
   class ConfigurationException : public ClientException { };
   class InvalidPortException : public ConfigurationException {};
   class HostResolutionException : public ClientException {};

Modified: zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/inc/hedwig/subscribe.h
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/inc/hedwig/subscribe.h?rev=1399578&r1=1399577&r2=1399578&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/inc/hedwig/subscribe.h (original)
+++ zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/inc/hedwig/subscribe.h Thu Oct 18 10:59:34 2012
@@ -51,7 +51,10 @@ namespace Hedwig {
 
     virtual void stopDelivery(const std::string& topic, const std::string& subscriberId) = 0;
 
+    virtual bool hasSubscription(const std::string& topic, const std::string& subscriberId) = 0;
     virtual void closeSubscription(const std::string& topic, const std::string& subscriberId) = 0;
+    virtual void asyncCloseSubscription(const std::string& topic, const std::string& subscriberId,
+                                        const OperationCallbackPtr& callback) = 0;
 
     //
     // API to register/unregister subscription listeners for receiving

Modified: zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/lib/Makefile.am
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/lib/Makefile.am?rev=1399578&r1=1399577&r2=1399578&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/lib/Makefile.am (original)
+++ zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/lib/Makefile.am Thu Oct 18 10:59:34 2012
@@ -19,7 +19,7 @@
 PROTODEF = ../../../../../hedwig-protocol/src/main/protobuf/PubSubProtocol.proto
 
 lib_LTLIBRARIES = libhedwig01.la
-libhedwig01_la_SOURCES = protocol.cpp channel.cpp client.cpp util.cpp clientimpl.cpp publisherimpl.cpp subscriberimpl.cpp eventdispatcher.cpp data.cpp filterablemessagehandler.cpp
+libhedwig01_la_SOURCES = protocol.cpp channel.cpp client.cpp util.cpp clientimpl.cpp publisherimpl.cpp subscriberimpl.cpp eventdispatcher.cpp data.cpp filterablemessagehandler.cpp simplesubscriberimpl.cpp
 libhedwig01_la_CPPFLAGS = -I$(top_srcdir)/inc $(DEPS_CFLAGS)
 libhedwig01_la_LIBADD = $(DEPS_LIBS) $(BOOST_CPPFLAGS) 
 libhedwig01_la_LDFLAGS = -no-undefined $(BOOST_ASIO_LIB) $(BOOST_LDFLAGS) $(BOOST_THREAD_LIB)

Modified: zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/lib/channel.cpp
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/lib/channel.cpp?rev=1399578&r1=1399577&r2=1399578&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/lib/channel.cpp (original)
+++ zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/lib/channel.cpp Thu Oct 18 10:59:34 2012
@@ -47,7 +47,6 @@ static log4cxx::LoggerPtr logger(log4cxx
 
 using namespace Hedwig;
 
-const bool DEFAULT_SSL_ENABLED = false;
 const std::string DEFAULT_SSL_PEM_FILE = "";
 
 AbstractDuplexChannel::AbstractDuplexChannel(IOServicePtr& service,
@@ -67,6 +66,10 @@ AbstractDuplexChannel::~AbstractDuplexCh
   LOG4CXX_INFO(logger, "Destroying DuplexChannel(" << this << ")");
 }
 
+ChannelHandlerPtr AbstractDuplexChannel::getChannelHandler() {
+  return handler;
+}
+
 /*static*/ void AbstractDuplexChannel::connectCallbackHandler(
                   AbstractDuplexChannelPtr channel,
                   OperationCallbackPtr callback,
@@ -601,6 +604,7 @@ void AsioDuplexChannel::closeSocket() {
   if (ec) {
     LOG4CXX_WARN(logger, "Channel " << this << " close error : " << ec.message().c_str());
   }
+  LOG4CXX_DEBUG(logger, "Closed socket for channel " << this << ".");
 }
 
 // SSL Context Factory
@@ -795,35 +799,3 @@ void AsioSSLDuplexChannel::closeLowestLa
     LOG4CXX_WARN(logger, "Channel " << this << " close error : " << ec.message().c_str());
   }
 }
-
-DuplexChannelManagerPtr DuplexChannelManager::create(const Configuration& conf,
-                                                     EventDispatcher& dispatcher) {
-  DuplexChannelManagerPtr factory(new DuplexChannelManager(conf, dispatcher));
-  LOG4CXX_DEBUG(logger, "Created DuplexChannelManager " << factory);
-  return factory;
-}
-
-DuplexChannelManager::DuplexChannelManager(const Configuration& conf,
-                                           EventDispatcher& dispatcher)
-  : conf(conf), dispatcher(dispatcher) {
-  sslEnabled = conf.getBool(Configuration::SSL_ENABLED, DEFAULT_SSL_ENABLED);
-  if (sslEnabled) {
-    sslCtxFactory = SSLContextFactoryPtr(new SSLContextFactory(conf));
-  }
-}
-
-DuplexChannelManager::~DuplexChannelManager() {
-}
-
-DuplexChannelPtr DuplexChannelManager::createChannel(const HostAddress& addr,
-                                                     const ChannelHandlerPtr& handler) {
-  LOG4CXX_DEBUG(logger, "Creating channel with handler " << handler.get());
-  IOServicePtr& service = dispatcher.getService();
-  if (sslEnabled) {
-    boost_ssl_context_ptr sslCtx =
-      sslCtxFactory->createSSLContext(service->getService());
-    return DuplexChannelPtr(new AsioSSLDuplexChannel(service, sslCtx, addr, handler));
-  } else {
-    return DuplexChannelPtr(new AsioDuplexChannel(service, addr, handler));
-  }
-}

Modified: zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/lib/channel.h
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/lib/channel.h?rev=1399578&r1=1399577&r2=1399578&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/lib/channel.h (original)
+++ zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/lib/channel.h Thu Oct 18 10:59:34 2012
@@ -88,6 +88,9 @@ namespace Hedwig {
   public:
     virtual ~DuplexChannel() {}
 
+    // Return the channel handler bound with a channel
+    virtual ChannelHandlerPtr getChannelHandler() = 0;
+
     // Issues a connect request to the target host
     // User could writeRequest after issued connect request, those requests should
     // be buffered and written until the channel is connected.
@@ -156,25 +159,6 @@ namespace Hedwig {
 
   typedef boost::shared_ptr<SSLContextFactory> SSLContextFactoryPtr;
 
-  class DuplexChannelManager;
-  typedef boost::shared_ptr<DuplexChannelManager> DuplexChannelManagerPtr;
-
-  class DuplexChannelManager : public boost::enable_shared_from_this<DuplexChannelManager> {
-  public:
-    static DuplexChannelManagerPtr create(const Configuration& conf,
-                                          EventDispatcher& dispatcher);
-    ~DuplexChannelManager();
-
-    DuplexChannelPtr createChannel(const HostAddress& addr, const ChannelHandlerPtr& handler);
-  private:
-    DuplexChannelManager(const Configuration& conf, EventDispatcher& dispatcher);
-
-    const Configuration& conf;
-    EventDispatcher& dispatcher;
-    bool sslEnabled;
-    SSLContextFactoryPtr sslCtxFactory;
-  };
-
   class AbstractDuplexChannel;
   typedef boost::shared_ptr<AbstractDuplexChannel> AbstractDuplexChannelPtr;
 
@@ -186,6 +170,8 @@ namespace Hedwig {
                           const ChannelHandlerPtr& handler);
     virtual ~AbstractDuplexChannel();
 
+    virtual ChannelHandlerPtr getChannelHandler();
+
     //
     // Connect Operation
     //

Modified: zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/lib/clientimpl.cpp
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/lib/clientimpl.cpp?rev=1399578&r1=1399577&r2=1399578&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/lib/clientimpl.cpp (original)
+++ zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/lib/clientimpl.cpp Thu Oct 18 10:59:34 2012
@@ -23,13 +23,16 @@
 #include "channel.h"
 #include "publisherimpl.h"
 #include "subscriberimpl.h"
+#include "simplesubscriberimpl.h"
 #include <log4cxx/logger.h>
 
 static log4cxx::LoggerPtr logger(log4cxx::Logger::getLogger("hedwig."__FILE__));
 
 using namespace Hedwig;
 
+const int DEFAULT_MESSAGE_FORCE_CONSUME_RETRY_WAIT_TIME = 5000;
 const std::string DEFAULT_SERVER_DEFAULT_VAL = "";
+const bool DEFAULT_SSL_ENABLED = false;
 
 void SyncOperationCallback::wait() {
   boost::unique_lock<boost::mutex> lock(mut);
@@ -104,8 +107,51 @@ void SyncOperationCallback::throwExcepti
   }
 }
 
-HedwigClientChannelHandler::HedwigClientChannelHandler(const ClientImplPtr& client) 
-  : client(client){
+ResponseHandler::ResponseHandler(const DuplexChannelManagerPtr& channelManager)
+  : channelManager(channelManager) {
+}
+
+void ResponseHandler::redirectRequest(const PubSubResponsePtr& response,
+                                      const PubSubDataPtr& data,
+                                      const DuplexChannelPtr& channel) {
+  HostAddress oldhost = channel->getHostAddress();
+  data->addTriedServer(oldhost);
+
+  HostAddress h;
+  bool redirectToDefaultHost = true;
+  if (response->has_statusmsg()) {
+    try {
+      h = HostAddress::fromString(response->statusmsg());
+      redirectToDefaultHost = false;
+    } catch (std::exception& e) {
+      h = channelManager->getDefaultHost();
+    }
+  } else {
+    h = channelManager->getDefaultHost();
+  }
+  if (data->hasTriedServer(h)) {
+    LOG4CXX_ERROR(logger, "We've been told to try request [" << data->getTxnId() << "] with [" 
+		                      << h.getAddressString()<< "] by " << oldhost.getAddressString() 
+		                      << " but we've already tried that. Failing operation");
+    data->getCallback()->operationFailed(InvalidRedirectException());
+    return;
+  }
+  LOG4CXX_INFO(logger, "We've been told  [" << data->getTopic() << "] is on [" << h.getAddressString() 
+		                   << "] by [" << oldhost.getAddressString() << "]. Redirecting request "
+                       << data->getTxnId());
+  data->setShouldClaim(true);
+
+  // submit the request again to the target host
+  if (redirectToDefaultHost) {
+    channelManager->submitOpToDefaultServer(data);
+  } else {
+    channelManager->redirectOpToHost(data, h);
+  }
+}
+
+HedwigClientChannelHandler::HedwigClientChannelHandler(const DuplexChannelManagerPtr& channelManager,
+                                                       ResponseHandlerMap& handlers)
+  : channelManager(channelManager), handlers(handlers), closed(false), disconnected(false) {
 }
 
 void HedwigClientChannelHandler::messageReceived(const DuplexChannelPtr& channel, const PubSubResponsePtr& m) {
@@ -120,247 +166,421 @@ void HedwigClientChannelHandler::message
   /* you now have ownership of data, don't leave this funciton without deleting it or 
      palming it off to someone else */
 
-  if (data == NULL) {
+  if (data.get() == 0) {
+    LOG4CXX_ERROR(logger, "No pub/sub request for txnid(" << m->txnid() << ").");
     return;
   }
 
-  if (m->statuscode() == NOT_RESPONSIBLE_FOR_TOPIC) {
-    client->redirectRequest(channel, data, m);
-    return;
+  // Store the topic2Host mapping if this wasn't a server redirect.
+  // TODO: add specific response for failure of getting topic ownership
+  //       to distinguish SERVICE_DOWN to failure of getting topic ownership
+  if (m->statuscode() != NOT_RESPONSIBLE_FOR_TOPIC) {
+    const HostAddress& host = channel->getHostAddress();
+    channelManager->setHostForTopic(data->getTopic(), host);
   }
 
-  switch (data->getType()) {
-  case PUBLISH:
-    client->getPublisherImpl().messageHandler(m, data);
-    break;
-  case SUBSCRIBE:
-  case UNSUBSCRIBE:
-    client->getSubscriberImpl().messageHandler(m, data);
-    break;
-  default:
-    LOG4CXX_ERROR(logger, "Unimplemented request type " << data->getType());
-    break;
+  const ResponseHandlerPtr& respHandler = handlers[data->getType()];
+  if (respHandler.get()) {
+    respHandler->handleResponse(m, data, channel);
+  } else {
+    LOG4CXX_ERROR(logger, "Unimplemented request type " << data->getType() << " : "
+                          << *data);
+    data->getCallback()->operationFailed(UnknownRequestException());
   }
 }
 
-
 void HedwigClientChannelHandler::channelConnected(const DuplexChannelPtr& channel) {
   // do nothing 
 }
 
-void HedwigClientChannelHandler::channelDisconnected(const DuplexChannelPtr& channel, const std::exception& e) {
-  LOG4CXX_ERROR(logger, "Channel disconnected");
+void HedwigClientChannelHandler::channelDisconnected(const DuplexChannelPtr& channel,
+                                                     const std::exception& e) {
+  if (channelManager->isClosed()) {
+    return;
+  }
 
-  client->channelDied(channel);
+  // If this channel was closed explicitly by the client code,
+  // we do not need to do any of this logic. This could happen
+  // for redundant Publish channels created or redirected subscribe
+  // channels that are not used anymore or when we shutdown the
+  // client and manually close all of the open channels.
+  // Also don't do any of the disconnect logic if the client has stopped.
+  {
+    boost::lock_guard<boost::shared_mutex> lock(close_lock);
+    if (closed) {
+      return;
+    }
+    if (disconnected) {
+      return;
+    }
+    disconnected = true;
+  }
+  LOG4CXX_INFO(logger, "Channel " << channel.get() << " was disconnected.");
+  // execute logic after channel disconnected
+  onChannelDisconnected(channel);
+}
+
+void HedwigClientChannelHandler::onChannelDisconnected(const DuplexChannelPtr& channel) {
+  // Clean up the channel from channel manager
+  channelManager->nonSubscriptionChannelDied(channel);
 }
 
 void HedwigClientChannelHandler::exceptionOccurred(const DuplexChannelPtr& channel, const std::exception& e) {
   LOG4CXX_ERROR(logger, "Exception occurred" << e.what());
 }
 
-ClientTxnCounter::ClientTxnCounter() : counter(0) 
-{
+void HedwigClientChannelHandler::close() {
+  {
+    boost::lock_guard<boost::shared_mutex> lock(close_lock);
+    if (closed) {
+      return;
+    }
+    closed = true;
+  }
+  // do close handle logic here
+  doClose();
 }
 
-ClientTxnCounter::~ClientTxnCounter() {
+void HedwigClientChannelHandler::doClose() {
+  // do nothing for generic client channel handler
 }
 
-/**
-Increment the transaction counter and return the new value.
+//
+// Pub/Sub Request Write Callback
+//
+PubSubWriteCallback::PubSubWriteCallback(const DuplexChannelPtr& channel,
+                                         const PubSubDataPtr& data)
+  : channel(channel), data(data) {
+}
 
-@returns the next transaction id
-*/
-long ClientTxnCounter::next() {  // would be nice to remove lock from here, look more into it
-  boost::lock_guard<boost::mutex> lock(mutex);
+void PubSubWriteCallback::operationComplete() {
+  LOG4CXX_INFO(logger, "Successfully wrote pubsub request : " << *data << " to channel "
+                       << channel.get());
+}
 
-  long next= ++counter; 
+void PubSubWriteCallback::operationFailed(const std::exception& exception) {
+  LOG4CXX_ERROR(logger, "Error writing pubsub request (" << *data << ") : " << exception.what());
 
-  return next;
+  // remove the transaction from channel if write failed
+  channel->retrieveTransaction(data->getTxnId());
+  data->getCallback()->operationFailed(exception);
 }
 
-ClientImplPtr ClientImpl::Create(const Configuration& conf) {
-  ClientImplPtr impl(new ClientImpl(conf));
-  LOG4CXX_DEBUG(logger, "Creating Clientimpl " << impl);
-  impl->dispatcher->start();
-  return impl;
+//
+// Default Server Connect Callback
+//
+DefaultServerConnectCallback::DefaultServerConnectCallback(const DuplexChannelManagerPtr& channelManager,
+                                                           const DuplexChannelPtr& channel,
+                                                           const PubSubDataPtr& data)
+  : channelManager(channelManager), channel(channel), data(data) {
 }
 
-void ClientImpl::Destroy() {
-  LOG4CXX_DEBUG(logger, "destroying Clientimpl " << this);
+void DefaultServerConnectCallback::operationComplete() {
+  LOG4CXX_DEBUG(logger, "Channel " << channel.get() << " is connected to host "
+                        << channel->getHostAddress() << ".");
+  // After connected, we got the right ip for the target host
+  // so we could submit the request right now
+  channelManager->submitOpThruChannel(data, channel);
+}
 
-  {
-    boost::lock_guard<boost::shared_mutex> lock(allchannels_lock);
-    
-    shuttingDownFlag = true;
-    for (ChannelMap::iterator iter = allchannels.begin(); iter != allchannels.end(); ++iter ) {
-      (*iter)->close();
-    }  
-    allchannels.clear();
-  }
-  // SSL Channel shutdown needs to send packets to server
-  // so we only stop dispatcher after all channels are closed
-  dispatcher->stop();
+void DefaultServerConnectCallback::operationFailed(const std::exception& exception) {
+  LOG4CXX_ERROR(logger, "Channel " << channel.get() << " failed to connect to host "
+                        << channel->getHostAddress() << " : " << exception.what());
+  data->getCallback()->operationFailed(exception);
+}
 
-  /* destruction of the maps will clean up any items they hold */
-  
-  if (subscriber != NULL) {
-    delete subscriber;
-    subscriber = NULL;
+//
+// Subscription Event Emitter
+//
+SubscriptionEventEmitter::SubscriptionEventEmitter() {}
+
+void SubscriptionEventEmitter::addSubscriptionListener(
+  SubscriptionListenerPtr& listener) {
+  boost::lock_guard<boost::shared_mutex> lock(listeners_lock);
+  listeners.insert(listener);
+}
+
+void SubscriptionEventEmitter::removeSubscriptionListener(
+  SubscriptionListenerPtr& listener) {
+  boost::lock_guard<boost::shared_mutex> lock(listeners_lock);
+  listeners.erase(listener);
+}
+
+void SubscriptionEventEmitter::emitSubscriptionEvent(
+  const std::string& topic, const std::string& subscriberId,
+  const SubscriptionEvent event) {
+  boost::shared_lock<boost::shared_mutex> lock(listeners_lock);
+  if (0 == listeners.size()) {
+    return;
   }
-  if (publisher != NULL) {
-    delete publisher;
-    publisher = NULL;
+  for (SubscriptionListenerSet::iterator iter = listeners.begin();
+       iter != listeners.end(); ++iter) {
+    (*iter)->processEvent(topic, subscriberId, event);
   }
 }
 
-ClientImpl::ClientImpl(const Configuration& conf) 
-  : conf(conf), publisher(NULL), subscriber(NULL), counterobj(), shuttingDownFlag(false)
-{
+//
+// Channel Manager Used to manage all established channels
+//
+
+DuplexChannelManagerPtr DuplexChannelManager::create(const Configuration& conf) {
+  DuplexChannelManagerPtr manager(new SimpleDuplexChannelManager(conf));
+  LOG4CXX_DEBUG(logger, "Created DuplexChannelManager " << manager);
+  return manager;
+}
+
+DuplexChannelManager::DuplexChannelManager(const Configuration& conf)
+  : dispatcher(new EventDispatcher(conf)), conf(conf), closed(false), counterobj() {
+  sslEnabled = conf.getBool(Configuration::SSL_ENABLED, DEFAULT_SSL_ENABLED); 
   defaultHost = HostAddress::fromString(conf.get(Configuration::DEFAULT_SERVER,
                                                  DEFAULT_SERVER_DEFAULT_VAL));
-  dispatcher = EventDispatcherPtr(new EventDispatcher(conf));
-  channelManager = DuplexChannelManager::create(conf, *dispatcher);
+  if (sslEnabled) {
+    sslCtxFactory = SSLContextFactoryPtr(new SSLContextFactory(conf));
+  }
+  LOG4CXX_DEBUG(logger, "Created DuplexChannelManager " << this << " with default server "
+                        << defaultHost);
 }
 
-Subscriber& ClientImpl::getSubscriber() {
-  return getSubscriberImpl();
+DuplexChannelManager::~DuplexChannelManager() {
+  LOG4CXX_DEBUG(logger, "Destroyed DuplexChannelManager " << this);
 }
 
-Publisher& ClientImpl::getPublisher() {
-  return getPublisherImpl();
+void DuplexChannelManager::submitTo(const PubSubDataPtr& op, const DuplexChannelPtr& channel) {
+  if (channel.get()) {
+    channel->storeTransaction(op);
+    OperationCallbackPtr writecb(new PubSubWriteCallback(channel, op));
+    LOG4CXX_DEBUG(logger, "Submit pub/sub request " << *op << " thru channel " << channel.get());
+    channel->writeRequest(op->getRequest(), writecb);
+  } else {
+    submitOpToDefaultServer(op);
+  }
 }
     
-SubscriberImpl& ClientImpl::getSubscriberImpl() {
-  if (subscriber == NULL) {
-    boost::lock_guard<boost::mutex> lock(subscribercreate_lock);
-    if (subscriber == NULL) {
-      subscriber = new SubscriberImpl(shared_from_this());
-    }
+// Submit a pub/sub request
+void DuplexChannelManager::submitOp(const PubSubDataPtr& op) {
+  DuplexChannelPtr channel;
+  switch (op->getType()) {
+  case PUBLISH:
+  case UNSUBSCRIBE:
+    channel = getNonSubscriptionChannel(op->getTopic());  
+    break;
+  default:
+    TopicSubscriber ts(op->getTopic(), op->getSubscriberId());
+    channel = getSubscriptionChannel(ts, op->isResubscribeRequest());
+    break;
   }
-  return *subscriber;
+  // write the pub/sub request
+  submitTo(op, channel);
 }
 
-PublisherImpl& ClientImpl::getPublisherImpl() {
-  if (publisher == NULL) { 
-    boost::lock_guard<boost::mutex> lock(publishercreate_lock);
-    if (publisher == NULL) {
-      publisher = new PublisherImpl(shared_from_this());
+// Submit a pub/sub request to target host
+void DuplexChannelManager::redirectOpToHost(const PubSubDataPtr& op, const HostAddress& addr) {
+  DuplexChannelPtr channel;
+  switch (op->getType()) {
+  case PUBLISH:
+  case UNSUBSCRIBE:
+    // check whether there is a channel existed for non-subscription requests
+    channel = getNonSubscriptionChannel(addr);
+    if (!channel.get()) {
+      channel = createNonSubscriptionChannel(addr);
+      channel = storeNonSubscriptionChannel(channel, true);
+    }
+    break;
+  default:
+    channel = getSubscriptionChannel(addr);
+    if (!channel.get()) {
+      channel = createSubscriptionChannel(addr);
+      channel = storeSubscriptionChannel(channel, true);
     }
+    break;
   }
-  return *publisher;
+  // write the pub/sub request
+  submitTo(op, channel);
 }
 
-ClientTxnCounter& ClientImpl::counter() {
-  return counterobj;
+// Submit a pub/sub request to established request
+void DuplexChannelManager::submitOpThruChannel(const PubSubDataPtr& op,
+                                               const DuplexChannelPtr& ch) {
+  DuplexChannelPtr channel;
+  switch (op->getType()) {
+  case PUBLISH:
+  case UNSUBSCRIBE:
+    channel = storeNonSubscriptionChannel(ch, false);
+    break;
+  default:
+    channel = storeSubscriptionChannel(ch, false);
+    break;
+  }
+  // write the pub/sub request
+  submitTo(op, channel);
 }
 
-void ClientImpl::redirectRequest(const DuplexChannelPtr& channel, PubSubDataPtr& data, const PubSubResponsePtr& response) {
-  HostAddress oldhost = channel->getHostAddress();
-  data->addTriedServer(oldhost);
-  
-  HostAddress h = HostAddress::fromString(response->statusmsg());
-  if (data->hasTriedServer(h)) {
-    LOG4CXX_ERROR(logger, "We've been told to try request [" << data->getTxnId() << "] with [" 
-		  << h.getAddressString()<< "] by " << oldhost.getAddressString() 
-		  << " but we've already tried that. Failing operation");
-    data->getCallback()->operationFailed(InvalidRedirectException());
-    return;
+// Submit a pub/sub request to default server
+void DuplexChannelManager::submitOpToDefaultServer(const PubSubDataPtr& op) {
+  DuplexChannelPtr channel;
+  switch (op->getType()) {
+  case PUBLISH:
+  case UNSUBSCRIBE:
+    channel = createNonSubscriptionChannel(defaultHost);
+    break;
+  default:
+    channel = createSubscriptionChannel(defaultHost);
+    break;
   }
-  LOG4CXX_DEBUG(logger, "We've been told  [" << data->getTopic() << "] is on [" << h.getAddressString() 
-		<< "] by [" << oldhost.getAddressString() << "]. Redirecting request " << data->getTxnId());
-  data->setShouldClaim(true);
+  OperationCallbackPtr connectCallback(new DefaultServerConnectCallback(shared_from_this(),
+                                                                        channel, op));
+  // connect to default server. usually default server is a VIP, we only got the real
+  // IP address after connected. so before connected, we don't know the real target host.
+  // we only submit the request after channel is connected (ip address would be updated).
+  channel->connect(connectCallback);
+}
 
-  setHostForTopic(data->getTopic(), h);
-  DuplexChannelPtr newchannel;
-  try {
-    if (data->getType() == SUBSCRIBE) {
-      // a redirect for subscription, kill old channel and remove old channel from all channels list
-      // otherwise old channel will not be destroyed, caused lost of CLOSE_WAIT connections
-      removeAndCloseChannel(channel);
-
-      SubscriberClientChannelHandlerPtr handler(new SubscriberClientChannelHandler(shared_from_this(), 
-										   this->getSubscriberImpl(), data));
-      newchannel = createChannel(data->getTopic(), handler);
-      handler->setChannel(newchannel);
-      newchannel->connect();
-      getSubscriberImpl().doSubscribe(newchannel, data, handler);
-    } else if (data->getType() == PUBLISH) {
-      newchannel = getChannel(data->getTopic());
-      getPublisherImpl().doPublish(newchannel, data);
-    } else {
-      newchannel = getChannel(data->getTopic());
-      getSubscriberImpl().doUnsubscribe(newchannel, data);
+DuplexChannelPtr DuplexChannelManager::getNonSubscriptionChannel(const std::string& topic) {
+  HostAddress addr;
+  {
+    boost::shared_lock<boost::shared_mutex> lock(topic2host_lock);
+    addr = topic2host[topic];
+  }
+  if (addr.isNullHost()) {
+    return DuplexChannelPtr();
+  } else {
+    // we had known which hub server owned the topic
+    DuplexChannelPtr ch = getNonSubscriptionChannel(addr);
+    if (ch.get()) {
+      return ch;
     }
-  } catch (ShuttingDownException& e) {
-    return; // no point in redirecting if we're shutting down
+    ch = createNonSubscriptionChannel(addr);
+    return storeNonSubscriptionChannel(ch, true);
   }
 }
 
-ClientImpl::~ClientImpl() {
-  LOG4CXX_DEBUG(logger, "deleting Clientimpl " << this);
+DuplexChannelPtr DuplexChannelManager::getNonSubscriptionChannel(const HostAddress& addr) {
+  boost::shared_lock<boost::shared_mutex> lock(host2channel_lock);
+  return host2channel[addr];
 }
 
-DuplexChannelPtr ClientImpl::createChannel(const std::string& topic, const ChannelHandlerPtr& handler) {
-  // get the host address
-  // create a channel to the host
-  HostAddress addr;
+DuplexChannelPtr DuplexChannelManager::createNonSubscriptionChannel(const HostAddress& addr) {
+  // Create a non-subscription channel handler
+  ChannelHandlerPtr handler(new HedwigClientChannelHandler(shared_from_this(),
+                                                           nonSubscriptionHandlers));
+  // Create a non subscription channel
+  return createChannel(dispatcher->getService(), addr, handler);
+}
+
+DuplexChannelPtr DuplexChannelManager::storeNonSubscriptionChannel(const DuplexChannelPtr& ch,
+                                                                   bool doConnect) {
+  const HostAddress& host = ch->getHostAddress();
+
+  bool useOldCh;
+  DuplexChannelPtr oldCh;
   {
-    boost::lock_guard<boost::shared_mutex> lock(topic2host_lock);
-    addr = topic2host[topic];
-  }
-  if (addr.isNullHost()) {
-    addr = defaultHost;
-    setHostForTopic(topic, addr);
+    boost::lock_guard<boost::shared_mutex> lock(host2channel_lock);
+
+    oldCh = host2channel[host];
+    if (!oldCh.get()) {
+      host2channel[host] = ch;
+      useOldCh = false;
+    } else {
+      // If we've reached here, that means we already have a Channel
+      // mapping for the given host. This should ideally not happen
+      // and it means we are creating another Channel to a server host
+      // to publish on when we could have used an existing one. This could
+      // happen due to a race condition if initially multiple concurrent
+      // threads are publishing on the same topic and no Channel exists
+      // currently to the server. We are not synchronizing this initial
+      // creation of Channels to a given host for performance.
+      // Another possible way to have redundant Channels created is if
+      // a new topic is being published to, we connect to the default
+      // server host which should be a VIP that redirects to a "real"
+      // server host. Since we don't know beforehand what is the full
+      // set of server hosts, we could be redirected to a server that
+      // we already have a channel connection to from a prior existing
+      // topic. Close these redundant channels as they won't be used.
+      useOldCh = true;
+    }
+  } 
+  if (useOldCh) {
+    LOG4CXX_DEBUG(logger, "Channel " << oldCh.get() << " to host " << host
+                          << " already exists so close channel " << ch.get() << ".");
+    ch->close();
+    return oldCh;
+  } else {
+    if (doConnect) {
+      ch->connect();
+    }
+    LOG4CXX_DEBUG(logger, "Storing channel " << ch.get() << " for host " << host << ".");
+    return ch;
   }
+}
 
-  DuplexChannelPtr channel = channelManager->createChannel(addr, handler);
+DuplexChannelPtr DuplexChannelManager::createChannel(IOServicePtr& service,
+                                                     const HostAddress& addr,
+                                                     const ChannelHandlerPtr& handler) {
+  DuplexChannelPtr channel;
+  if (sslEnabled) {
+    boost_ssl_context_ptr sslCtx = sslCtxFactory->createSSLContext(service->getService());
+    channel = DuplexChannelPtr(new AsioSSLDuplexChannel(service, sslCtx, addr, handler));
+  } else {
+    channel = DuplexChannelPtr(new AsioDuplexChannel(service, addr, handler));
+  }
 
   boost::lock_guard<boost::shared_mutex> lock(allchannels_lock);
-  if (shuttingDownFlag) {
+  if (closed) {
     channel->close();
     throw ShuttingDownException();
   }
-  // Don't connect here, otherwise connect callback may be triggered before setChannel
-  // channel->connect();
-
   allchannels.insert(channel);
-  LOG4CXX_DEBUG(logger, "(create) All channels size: " << allchannels.size());
+  LOG4CXX_DEBUG(logger, "Created a channel to " << addr << ", all channels : " << allchannels.size());
 
   return channel;
 }
 
-DuplexChannelPtr ClientImpl::getChannel(const std::string& topic) {
-  HostAddress addr;
-  {
-    boost::lock_guard<boost::shared_mutex> lock(topic2host_lock);
-    addr = topic2host[topic];
-  }
-  if (addr.isNullHost()) {
-    addr = defaultHost;
-    setHostForTopic(topic, addr);
-  }  
-  DuplexChannelPtr channel = host2channel[addr];
-
-  if (channel.get() == 0) {
-    LOG4CXX_DEBUG(logger, " No channel for topic, creating new channel.get() " << channel.get() << " addr " << addr.getAddressString());
-    ChannelHandlerPtr handler(new HedwigClientChannelHandler(shared_from_this()));
-    channel = createChannel(topic, handler);
-    channel->connect();
+long DuplexChannelManager::nextTxnId() {
+  return counterobj.next();
+}
 
-    boost::lock_guard<boost::shared_mutex> lock(host2channel_lock);
-    host2channel[addr] = channel;
-  } 
+void DuplexChannelManager::setHostForTopic(const std::string& topic, const HostAddress& host) {
+  boost::lock_guard<boost::shared_mutex> h2clock(host2topics_lock);
+  boost::lock_guard<boost::shared_mutex> t2hlock(topic2host_lock);
+  topic2host[topic] = host;
+  TopicSetPtr ts = host2topics[host];
+  if (!ts.get()) {
+    ts = TopicSetPtr(new TopicSet());
+    host2topics[host] = ts;
+  }
+  ts->insert(topic);
+  LOG4CXX_DEBUG(logger, "Set ownership of topic " << topic << " to " << host << ".");
+}
 
-  return channel;
+const HostAddress& DuplexChannelManager::getHostForTopic(const std::string& topic) {
+  boost::shared_lock<boost::shared_mutex> t2hlock(topic2host_lock);
+  return topic2host[topic];
 }
 
-void ClientImpl::setHostForTopic(const std::string& topic, const HostAddress& host) {
-  boost::lock_guard<boost::shared_mutex> lock(topic2host_lock);
-  topic2host[topic] = host;
+void DuplexChannelManager::clearAllTopicsForHost(const HostAddress& addr) {
+  // remove topic mapping
+  boost::lock_guard<boost::shared_mutex> h2tlock(host2topics_lock);
+  boost::lock_guard<boost::shared_mutex> t2hlock(topic2host_lock);
+  Host2TopicsMap::iterator iter = host2topics.find(addr);
+  if (iter != host2topics.end()) {
+    for (TopicSet::iterator tsIter = iter->second->begin();
+         tsIter != iter->second->end(); ++tsIter) {
+      topic2host.erase(*tsIter);
+    }
+    host2topics.erase(iter);
+  }
 }
 
-bool ClientImpl::shuttingDown() const {
-  return shuttingDownFlag;
+void DuplexChannelManager::clearHostForTopic(const std::string& topic,
+                                             const HostAddress& addr) {
+  // remove topic mapping
+  boost::lock_guard<boost::shared_mutex> h2tlock(host2topics_lock);
+  boost::lock_guard<boost::shared_mutex> t2hlock(topic2host_lock);
+  Host2TopicsMap::iterator iter = host2topics.find(addr);
+  if (iter != host2topics.end()) {
+    iter->second->erase(topic);
+  }
+  topic2host.erase(topic);
 }
 
 /**
@@ -369,35 +589,120 @@ bool ClientImpl::shuttingDown() const {
    This does not delete the channel. Some publishers or subscribers will still hold it and will be errored
    when they try to do anything with it. 
 */
-void ClientImpl::channelDied(const DuplexChannelPtr& channel) {
-  if (shuttingDownFlag) {
-    return;
-  }
-
-  boost::lock_guard<boost::shared_mutex> h2tlock(host2topics_lock);
-  boost::lock_guard<boost::shared_mutex> h2clock(host2channel_lock);
-  boost::lock_guard<boost::shared_mutex> t2hlock(topic2host_lock);
+void DuplexChannelManager::nonSubscriptionChannelDied(const DuplexChannelPtr& channel) {
   // get host
   HostAddress addr = channel->getHostAddress();
-  
-  for (Host2TopicsMap::iterator iter = host2topics.find(addr); iter != host2topics.end(); ++iter) {
-    topic2host.erase((*iter).second);
+
+  // Clear the topic owner ship when a nonsubscription channel disconnected
+  clearAllTopicsForHost(addr);
+
+  // remove channel mapping
+  {
+    boost::lock_guard<boost::shared_mutex> h2clock(host2channel_lock);
+    host2channel.erase(addr);
+  }
+  removeChannel(channel);
+}
+
+void DuplexChannelManager::removeChannel(const DuplexChannelPtr& channel) {
+  {
+    boost::lock_guard<boost::shared_mutex> aclock(allchannels_lock);
+    allchannels.erase(channel); // channel should be deleted here
+  }
+  channel->close();
+}
+
+void DuplexChannelManager::start() {
+  // add non-subscription response handlers
+  nonSubscriptionHandlers[PUBLISH] =
+    ResponseHandlerPtr(new PublishResponseHandler(shared_from_this()));
+  nonSubscriptionHandlers[UNSUBSCRIBE] =
+    ResponseHandlerPtr(new UnsubscribeResponseHandler(shared_from_this()));
+
+  // start the dispatcher
+  dispatcher->start();
+}
+
+bool DuplexChannelManager::isClosed() {
+  boost::shared_lock<boost::shared_mutex> lock(allchannels_lock);
+  return closed;
+}
+
+void DuplexChannelManager::close() {
+  // stop the dispatcher
+  dispatcher->stop();
+  {
+    boost::lock_guard<boost::shared_mutex> lock(allchannels_lock);
+    
+    closed = true;
+    for (ChannelMap::iterator iter = allchannels.begin(); iter != allchannels.end(); ++iter ) {
+      (*iter)->close();
+    }  
+    allchannels.clear();
+  }
+
+  // Unregistered response handlers
+  nonSubscriptionHandlers.clear();
+  /* destruction of the maps will clean up any items they hold */
+}
+
+ClientImplPtr ClientImpl::Create(const Configuration& conf) {
+  ClientImplPtr impl(new ClientImpl(conf));
+  LOG4CXX_DEBUG(logger, "Creating Clientimpl " << impl);
+  impl->channelManager->start();
+  return impl;
+}
+
+void ClientImpl::Destroy() {
+  LOG4CXX_DEBUG(logger, "destroying Clientimpl " << this);
+
+  // close the channel manager
+  channelManager->close();
+
+  if (subscriber != NULL) {
+    delete subscriber;
+    subscriber = NULL;
+  }
+  if (publisher != NULL) {
+    delete publisher;
+    publisher = NULL;
   }
-  host2topics.erase(addr);
-  host2channel.erase(addr);
-  removeAndCloseChannel(channel);
 }
 
-void ClientImpl::removeAndCloseChannel(const DuplexChannelPtr& channel) {
-  boost::lock_guard<boost::shared_mutex> aclock(allchannels_lock);
-  allchannels.erase(channel); // channel should be deleted here
-  channel->close(); // close channel
+ClientImpl::ClientImpl(const Configuration& conf) 
+  : conf(conf), publisher(NULL), subscriber(NULL)
+{
+  channelManager = DuplexChannelManager::create(conf);
 }
 
-const Configuration& ClientImpl::getConfiguration() {
-  return conf;
+Subscriber& ClientImpl::getSubscriber() {
+  return getSubscriberImpl();
 }
 
-boost::asio::io_service& ClientImpl::getService() {
-  return dispatcher->getService()->getService();
+Publisher& ClientImpl::getPublisher() {
+  return getPublisherImpl();
+}
+    
+SubscriberImpl& ClientImpl::getSubscriberImpl() {
+  if (subscriber == NULL) {
+    boost::lock_guard<boost::mutex> lock(subscribercreate_lock);
+    if (subscriber == NULL) {
+      subscriber = new SubscriberImpl(channelManager);
+    }
+  }
+  return *subscriber;
+}
+
+PublisherImpl& ClientImpl::getPublisherImpl() {
+  if (publisher == NULL) { 
+    boost::lock_guard<boost::mutex> lock(publishercreate_lock);
+    if (publisher == NULL) {
+      publisher = new PublisherImpl(channelManager);
+    }
+  }
+  return *publisher;
+}
+
+ClientImpl::~ClientImpl() {
+  LOG4CXX_DEBUG(logger, "deleting Clientimpl " << this);
 }

Modified: zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/lib/clientimpl.h
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/lib/clientimpl.h?rev=1399578&r1=1399577&r2=1399578&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/lib/clientimpl.h (original)
+++ zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/lib/clientimpl.h Thu Oct 18 10:59:34 2012
@@ -164,22 +164,301 @@ namespace Hedwig {
     int timeout;
   };
 
+  class DuplexChannelManager;
+  typedef boost::shared_ptr<DuplexChannelManager> DuplexChannelManagerPtr;
+
+  //
+  // Hedwig Response Handler
+  //
+
+  // Response Handler used to process response for different types of requests
+  class ResponseHandler {
+  public:
+    ResponseHandler(const DuplexChannelManagerPtr& channelManager); 
+    virtual ~ResponseHandler() {};
+
+    virtual void handleResponse(const PubSubResponsePtr& m, const PubSubDataPtr& txn,
+                                const DuplexChannelPtr& channel) = 0;
+  protected:
+    // common method used to redirect request
+    void redirectRequest(const PubSubResponsePtr& response, const PubSubDataPtr& data,
+                         const DuplexChannelPtr& channel);
+
+    // channel manager to manage all established channels
+    const DuplexChannelManagerPtr channelManager;
+  };
+
+  typedef boost::shared_ptr<ResponseHandler> ResponseHandlerPtr;
+  typedef std::tr1::unordered_map<OperationType, ResponseHandlerPtr, OperationTypeHash> ResponseHandlerMap;
+
+  class PubSubWriteCallback : public OperationCallback {
+  public:
+    PubSubWriteCallback(const DuplexChannelPtr& channel, const PubSubDataPtr& data); 
+    virtual void operationComplete();
+    virtual void operationFailed(const std::exception& exception);
+  private:
+    DuplexChannelPtr channel;
+    PubSubDataPtr data;
+  };
+
+  class DefaultServerConnectCallback : public OperationCallback {
+  public:
+    DefaultServerConnectCallback(const DuplexChannelManagerPtr& channelManager,
+                                 const DuplexChannelPtr& channel,
+                                 const PubSubDataPtr& data);
+    virtual void operationComplete();
+    virtual void operationFailed(const std::exception& exception);
+  private:
+    DuplexChannelManagerPtr channelManager;
+    DuplexChannelPtr channel;
+    PubSubDataPtr data;
+  };
+
+  struct SubscriptionListenerPtrHash : public std::unary_function<SubscriptionListenerPtr, size_t> {
+    size_t operator()(const Hedwig::SubscriptionListenerPtr& listener) const {
+      return reinterpret_cast<size_t>(listener.get());
+    }
+  };
+
+  // Subscription Event Emitter
+  class SubscriptionEventEmitter {
+  public:
+    SubscriptionEventEmitter();
+
+    void addSubscriptionListener(SubscriptionListenerPtr& listener);
+    void removeSubscriptionListener(SubscriptionListenerPtr& listener);
+    void emitSubscriptionEvent(const std::string& topic,
+                               const std::string& subscriberId,
+                               const SubscriptionEvent event);
+
+  private:
+    typedef std::tr1::unordered_set<SubscriptionListenerPtr, SubscriptionListenerPtrHash> SubscriptionListenerSet;
+    SubscriptionListenerSet listeners;
+    boost::shared_mutex listeners_lock;
+  };
+
+  class SubscriberClientChannelHandler;
+
+  //
+  // Duplex Channel Manager to manage all established channels
+  //
+
+  class DuplexChannelManager : public boost::enable_shared_from_this<DuplexChannelManager> {
+  public:
+    static DuplexChannelManagerPtr create(const Configuration& conf);
+    virtual ~DuplexChannelManager();
+
+    inline const Configuration& getConfiguration() const {
+      return conf;
+    }
+
+    // Submit a pub/sub request
+    void submitOp(const PubSubDataPtr& op);
+
+    // Submit a pub/sub request to default host
+    // It is called only when client doesn't have the knowledge of topic ownership
+    void submitOpToDefaultServer(const PubSubDataPtr& op);
+
+    // Redirect pub/sub request to a target hosts
+    void redirectOpToHost(const PubSubDataPtr& op, const HostAddress& host);
+
+    // Submit a pub/sub request thru established channel
+    // It is called when connecting to default server to established a channel
+    void submitOpThruChannel(const PubSubDataPtr& op, const DuplexChannelPtr& channel);
+
+    // Generate next transaction id for pub/sub requests sending thru this manager
+    long nextTxnId();
+
+    // return default host
+    inline const HostAddress& getDefaultHost() { return defaultHost; }
+
+    // set the owner host of a topic
+    void setHostForTopic(const std::string& topic, const HostAddress& host);
+
+    // clear all topics that hosted by a hub server
+    void clearAllTopicsForHost(const HostAddress& host);
+
+    // clear host for a given topic
+    void clearHostForTopic(const std::string& topic, const HostAddress& host);
+
+    // Called when a channel is disconnected
+    void nonSubscriptionChannelDied(const DuplexChannelPtr& channel);
+
+    // Remove a channel from all channel map
+    void removeChannel(const DuplexChannelPtr& channel);
+
+    // Get the subscription channel handler for a given subscription
+    virtual boost::shared_ptr<SubscriberClientChannelHandler>
+            getSubscriptionChannelHandler(const TopicSubscriber& ts) = 0;
+
+    // Close subscription for a given subscription
+    virtual void asyncCloseSubscription(const TopicSubscriber& ts,
+                                        const OperationCallbackPtr& callback) = 0;
+
+    virtual void handoverDelivery(const TopicSubscriber& ts,
+                                  const MessageHandlerCallbackPtr& handler,
+                                  const ClientMessageFilterPtr& filter) = 0;
+
+    // start the channel manager
+    virtual void start();
+    // close the channel manager
+    virtual void close();
+    // whether the channel manager is closed
+    bool isClosed();
+
+    // Return an available service
+    inline boost::asio::io_service & getService() const {
+      return dispatcher->getService()->getService();
+    }
+
+    // Return the event emitter
+    inline SubscriptionEventEmitter& getEventEmitter() {
+      return eventEmitter;
+    }
+
+  protected:
+    DuplexChannelManager(const Configuration& conf);
+
+    // Get the ownership for a given topic.
+    const HostAddress& getHostForTopic(const std::string& topic);
+
+    //
+    // Channel Management
+    //
+
+    // Non subscription channel management
+
+    // Get a non subscription channel for a given topic
+    // If the topic's owner is known, retrieve a subscription channel to
+    // target host (if there is no channel existed, create one);
+    // If the topic's owner is unknown, return null
+    DuplexChannelPtr getNonSubscriptionChannel(const std::string& topic);
+
+    // Get an existed non subscription channel to a given host
+    DuplexChannelPtr getNonSubscriptionChannel(const HostAddress& addr);
+
+    // Create a non subscription channel to a given host
+    DuplexChannelPtr createNonSubscriptionChannel(const HostAddress& addr);
+
+    // Store the established non subscription channel
+    DuplexChannelPtr storeNonSubscriptionChannel(const DuplexChannelPtr& ch,
+                                                 bool doConnect); 
+
+    //
+    // Subscription Channel Management
+    //
+
+    // Get a subscription channel for a given subscription.
+    // If there is subscription channel established before, return it.
+    // Otherwise, check whether the topic's owner is known. If the topic owner
+    // is known, retrieve a subscription channel to target host (if there is no
+    // channel exsited, create one); If unknown, return null
+    virtual DuplexChannelPtr getSubscriptionChannel(const TopicSubscriber& ts,
+                                                    const bool isResubscribeRequest) = 0;
+
+    // Get an existed subscription channel to a given host
+    virtual DuplexChannelPtr getSubscriptionChannel(const HostAddress& addr) = 0;
+
+    // Create a subscription channel to a given host
+    // If store is true, store the channel for future usage.
+    // If store is false, return a newly created channel.
+    virtual DuplexChannelPtr createSubscriptionChannel(const HostAddress& addr) = 0;
+
+    // Store the established subscription channel
+    virtual DuplexChannelPtr storeSubscriptionChannel(const DuplexChannelPtr& ch,
+                                                      bool doConnect) = 0;
+
+    //
+    // Raw Channel Management
+    //
+
+    // Create a raw channel
+    DuplexChannelPtr createChannel(IOServicePtr& service,
+                                   const HostAddress& addr, const ChannelHandlerPtr& handler);
+
+    // event dispatcher running io threads
+    typedef boost::shared_ptr<EventDispatcher> EventDispatcherPtr;
+    EventDispatcherPtr dispatcher;
+
+    // topic2host mapping for topic ownership
+    std::tr1::unordered_map<std::string, HostAddress> topic2host;
+    boost::shared_mutex topic2host_lock;
+    typedef std::tr1::unordered_set<std::string> TopicSet;
+    typedef boost::shared_ptr<TopicSet> TopicSetPtr;
+    typedef std::tr1::unordered_map<HostAddress, TopicSetPtr, HostAddressHash> Host2TopicsMap;
+    Host2TopicsMap host2topics;
+    boost::shared_mutex host2topics_lock;
+  private:
+    // write the request to target channel
+    void submitTo(const PubSubDataPtr& op, const DuplexChannelPtr& channel);
+
+    const Configuration& conf;
+    bool sslEnabled;
+    SSLContextFactoryPtr sslCtxFactory;
+
+    // whether the channel manager is shutting down
+    bool closed;
+
+    // counter used for generating transaction ids
+    ClientTxnCounter counterobj;
+
+    // default host
+    HostAddress defaultHost;
+
+    // non-subscription channels
+    std::tr1::unordered_map<HostAddress, DuplexChannelPtr, HostAddressHash > host2channel;
+    boost::shared_mutex host2channel_lock;
+
+    // maintain all established channels
+    typedef std::tr1::unordered_set<DuplexChannelPtr, DuplexChannelPtrHash > ChannelMap;
+    ChannelMap allchannels;
+    boost::shared_mutex allchannels_lock;
+
+    // Response Handlers for non-subscription requests
+    ResponseHandlerMap nonSubscriptionHandlers;
+
+    // Subscription Event Emitter
+    SubscriptionEventEmitter eventEmitter;
+  };
+
+  //
+  // Hedwig Client Channel Handler to handle responses received from the channel
+  //
+
   class HedwigClientChannelHandler : public ChannelHandler {
   public:
-    HedwigClientChannelHandler(const ClientImplPtr& client);
+    HedwigClientChannelHandler(const DuplexChannelManagerPtr& channelManager,
+                               ResponseHandlerMap& handlers);
+    virtual ~HedwigClientChannelHandler() {}
     
     virtual void messageReceived(const DuplexChannelPtr& channel, const PubSubResponsePtr& m);
     virtual void channelConnected(const DuplexChannelPtr& channel);
     virtual void channelDisconnected(const DuplexChannelPtr& channel, const std::exception& e);
     virtual void exceptionOccurred(const DuplexChannelPtr& channel, const std::exception& e);
-    
+
+    void close();
   protected:
-    const ClientImplPtr client;
+    // real channel disconnected logic
+    virtual void onChannelDisconnected(const DuplexChannelPtr& channel);
+
+    // real close logic
+    virtual void doClose();
+
+    // channel manager to manage all established channels
+    const DuplexChannelManagerPtr channelManager;
+    ResponseHandlerMap& handlers;
+
+    boost::shared_mutex close_lock;
+    // Boolean indicating if we closed the handler explicitly or not.
+    // If so, we do not need to do the channel disconnected logic here.
+    bool closed;
+    // whether channel is disconnected.
+    bool disconnected;
   };
   
   class PublisherImpl;
   class SubscriberImpl;
-  
+
   /**
      Implementation of the hedwig client. This class takes care of globals such as the topic->host map and the transaction id counter.
   */
@@ -191,30 +470,9 @@ namespace Hedwig {
     Subscriber& getSubscriber();
     Publisher& getPublisher();
 
-    ClientTxnCounter& counter();
-
-    void redirectRequest(const DuplexChannelPtr& channel, PubSubDataPtr& data, const PubSubResponsePtr& response);
-
-    const HostAddress& getHostForTopic(const std::string& topic);
-
-    //DuplexChannelPtr getChannelForTopic(const std::string& topic, OperationCallback& callback);
-    //DuplexChannelPtr createChannelForTopic(const std::string& topic, ChannelHandlerPtr& handler, OperationCallback& callback);
-    DuplexChannelPtr createChannel(const std::string& topic, const ChannelHandlerPtr& handler);    
-    DuplexChannelPtr getChannel(const std::string& topic);
-
-    void setHostForTopic(const std::string& topic, const HostAddress& host);
-
-    void setChannelForHost(const HostAddress& address, const DuplexChannelPtr& channel);
-    void channelDied(const DuplexChannelPtr& channel);
-    void removeAndCloseChannel(const DuplexChannelPtr& channel);
-    bool shuttingDown() const;
-    
     SubscriberImpl& getSubscriberImpl();
     PublisherImpl& getPublisherImpl();
 
-    const Configuration& getConfiguration();
-    boost::asio::io_service& getService();
-
     ~ClientImpl();
   private:
     ClientImpl(const Configuration& conf);
@@ -227,27 +485,8 @@ namespace Hedwig {
     boost::mutex subscribercreate_lock;
     SubscriberImpl* subscriber;
 
-    ClientTxnCounter counterobj;
-
-    typedef boost::shared_ptr<EventDispatcher> EventDispatcherPtr;
-    EventDispatcherPtr dispatcher;
+    // channel manager manage all channels for the client
     DuplexChannelManagerPtr channelManager;
-
-    typedef std::tr1::unordered_multimap<HostAddress, std::string, HostAddressHash > Host2TopicsMap;
-    Host2TopicsMap host2topics;
-    boost::shared_mutex host2topics_lock;
-    HostAddress defaultHost;
-
-    std::tr1::unordered_map<HostAddress, DuplexChannelPtr, HostAddressHash > host2channel;
-    boost::shared_mutex host2channel_lock;
-    std::tr1::unordered_map<std::string, HostAddress> topic2host;
-    boost::shared_mutex topic2host_lock;
-
-    typedef std::tr1::unordered_set<DuplexChannelPtr, DuplexChannelPtrHash > ChannelMap;
-    ChannelMap allchannels;
-    boost::shared_mutex allchannels_lock;
-
-    bool shuttingDownFlag;
   };
 };
 #endif

Modified: zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/lib/data.cpp
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/lib/data.cpp?rev=1399578&r1=1399577&r2=1399578&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/lib/data.cpp (original)
+++ zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/lib/data.cpp Thu Oct 18 10:59:34 2012
@@ -23,11 +23,23 @@
 #include "data.h"
 
 #include <log4cxx/logger.h>
+#include <iostream>
+
+#define stringify( name ) #name
 
 static log4cxx::LoggerPtr logger(log4cxx::Logger::getLogger("hedwig."__FILE__));
 
 using namespace Hedwig;
 
+const char* OPERATION_TYPE_NAMES[] = {
+  stringify( PUBLISH ),
+  stringify( SUBSCRIBE ),
+  stringify( CONSUME ),
+  stringify( UNSUBSCRIBE ),
+  stringify( START_DELIVERY ),
+  stringify( STOP_DELIVERY )
+};
+
 PubSubDataPtr PubSubData::forPublishRequest(long txnid, const std::string& topic, const Message& body,
                                             const ResponseCallbackPtr& callback) {
   PubSubDataPtr ptr(new PubSubData());
@@ -186,3 +198,57 @@ const std::string& PubSubData::getSubscr
 const SubscriptionOptions& PubSubData::getSubscriptionOptions() const {
   return options;
 }
+
+void PubSubData::setOrigChannelForResubscribe(
+  boost::shared_ptr<DuplexChannel>& channel) {
+  this->origChannel = channel;
+}
+
+boost::shared_ptr<DuplexChannel>& PubSubData::getOrigChannelForResubscribe() {
+  return this->origChannel;
+}
+
+bool PubSubData::isResubscribeRequest() {
+  return 0 != this->origChannel.get();
+}
+
+ClientTxnCounter::ClientTxnCounter() : counter(0) 
+{
+}
+
+ClientTxnCounter::~ClientTxnCounter() {
+}
+
+/**
+Increment the transaction counter and return the new value.
+
+@returns the next transaction id
+*/
+long ClientTxnCounter::next() {  // would be nice to remove lock from here, look more into it
+  boost::lock_guard<boost::mutex> lock(mutex);
+
+  long next= ++counter; 
+
+  return next;
+}
+
+std::ostream& Hedwig::operator<<(std::ostream& os, const PubSubData& data) {
+  OperationType type = data.getType();
+  os << "[" << OPERATION_TYPE_NAMES[type] << " request (txn:" << data.getTxnId()
+     << ") for (topic:" << data.getTopic();
+  switch (type) {
+  case SUBSCRIBE:
+  case UNSUBSCRIBE:
+    os << ", subscriber:" << data.getSubscriberId() << ")";
+    break;
+  case CONSUME:
+    os << ", subscriber:" << data.getSubscriberId() << ", seq:"
+    << data.getMessageSeqId().localcomponent() << ")";
+    break;
+  case PUBLISH:
+  default:
+    os << ")";
+    break;
+  }
+  return os;
+}

Modified: zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/lib/data.h
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/lib/data.h?rev=1399578&r1=1399577&r2=1399578&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/lib/data.h (original)
+++ zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/lib/data.h Thu Oct 18 10:59:34 2012
@@ -23,6 +23,7 @@
 #include <hedwig/callback.h>
 
 #include <pthread.h>
+#include <iostream>
 
 #ifdef USE_BOOST_TR1
 #include <boost/tr1/unordered_set.hpp>
@@ -57,6 +58,8 @@ namespace Hedwig {
   typedef boost::shared_ptr<PubSubRequest> PubSubRequestPtr;
   typedef boost::shared_ptr<PubSubResponse> PubSubResponsePtr;
 
+  class DuplexChannel;
+
   /**
      Data structure to hold information about requests and build request messages.
      Used to store requests which may need to be resent to another server. 
@@ -92,6 +95,12 @@ namespace Hedwig {
     void addTriedServer(HostAddress& h);
     bool hasTriedServer(HostAddress& h);
     void clearTriedServers();
+
+    void setOrigChannelForResubscribe(boost::shared_ptr<DuplexChannel>& channel);
+    bool isResubscribeRequest();
+    boost::shared_ptr<DuplexChannel>& getOrigChannelForResubscribe();
+
+    friend std::ostream& operator<<(std::ostream& os, const PubSubData& data);
   private:
 
     PubSubData();
@@ -110,7 +119,9 @@ namespace Hedwig {
     SubscriptionOptions options;
     MessageSeqId msgid;
     std::tr1::unordered_set<HostAddress, HostAddressHash > triedservers;
+    // record the origChannel for a resubscribe request
+    boost::shared_ptr<DuplexChannel> origChannel;
   };
-  
+
 };
 #endif

Modified: zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/lib/eventdispatcher.cpp
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/lib/eventdispatcher.cpp?rev=1399578&r1=1399577&r2=1399578&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/lib/eventdispatcher.cpp (original)
+++ zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/lib/eventdispatcher.cpp Thu Oct 18 10:59:34 2012
@@ -66,11 +66,13 @@ EventDispatcher::EventDispatcher(const C
   num_threads = conf.getInt(Configuration::NUM_DISPATCH_THREADS,
                             DEFAULT_NUM_DISPATCH_THREADS);
   if (0 == num_threads) {
+    LOG4CXX_ERROR(logger, "Number of threads in dispatcher is zero");
     throw std::runtime_error("number of threads in dispatcher is zero");
   }
   for (size_t i = 0; i < num_threads; i++) {
     services.push_back(IOServicePtr(new IOService()));
   }
+  LOG4CXX_DEBUG(logger, "Created EventDispatcher " << this);
 }
 
 void EventDispatcher::run_forever(IOServicePtr service, size_t idx) {

Modified: zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/lib/publisherimpl.cpp
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/lib/publisherimpl.cpp?rev=1399578&r1=1399577&r2=1399578&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/lib/publisherimpl.cpp (original)
+++ zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/lib/publisherimpl.cpp Thu Oct 18 10:59:34 2012
@@ -19,11 +19,12 @@
 #include <config.h>
 #endif
 
+#include <string>
+#include <log4cxx/logger.h>
+
 #include "publisherimpl.h"
 #include "channel.h"
 
-#include <log4cxx/logger.h>
-
 static log4cxx::LoggerPtr logger(log4cxx::Logger::getLogger("hedwig."__FILE__));
 
 using namespace Hedwig;
@@ -48,31 +49,19 @@ void PublishResponseAdaptor::operationFa
   pubCallback->operationFailed(exception);
 }
 
-PublishWriteCallback::PublishWriteCallback(const ClientImplPtr& client, const PubSubDataPtr& data) : client(client), data(data) {}
-
-void PublishWriteCallback::operationComplete() {
-  LOG4CXX_DEBUG(logger, "Successfully wrote transaction: " << data->getTxnId());
-}
-
-void PublishWriteCallback::operationFailed(const std::exception& exception) {
-  LOG4CXX_ERROR(logger, "Error writing to publisher " << exception.what());
-  
-  data->getCallback()->operationFailed(exception);
-}
-
-PublisherImpl::PublisherImpl(const ClientImplPtr& client) 
-  : client(client) {
+PublisherImpl::PublisherImpl(const DuplexChannelManagerPtr& channelManager)
+  : channelManager(channelManager) {
 }
 
 PublishResponsePtr PublisherImpl::publish(const std::string& topic, const Message& message) {
-  SyncCallback<PublishResponsePtr>* cb =
-    new SyncCallback<PublishResponsePtr>(client->getConfiguration().getInt(Configuration::SYNC_REQUEST_TIMEOUT, 
-											                                                     DEFAULT_SYNC_REQUEST_TIMEOUT));
+  SyncCallback<PublishResponsePtr>* cb = new SyncCallback<PublishResponsePtr>(
+    channelManager->getConfiguration().getInt(Configuration::SYNC_REQUEST_TIMEOUT,
+                                              DEFAULT_SYNC_REQUEST_TIMEOUT));
   PublishResponseCallbackPtr callback(cb);
   asyncPublishWithResponse(topic, message, callback);
   cb->wait();
-  
-  cb->throwExceptionIfNeeded();  
+
+  cb->throwExceptionIfNeeded();
   return cb->getResult();
 }
 
@@ -82,13 +71,15 @@ PublishResponsePtr PublisherImpl::publis
   return publish(topic, msg);
 }
 
-void PublisherImpl::asyncPublish(const std::string& topic, const Message& message, const OperationCallbackPtr& callback) {
+void PublisherImpl::asyncPublish(const std::string& topic, const Message& message,
+                                 const OperationCallbackPtr& callback) {
   // use release after callback to release the channel after the callback is called
   ResponseCallbackPtr respCallback(new ResponseCallbackAdaptor(callback));
   doPublish(topic, message, respCallback);
 }
 
-void PublisherImpl::asyncPublish(const std::string& topic, const std::string& message, const OperationCallbackPtr& callback) {
+void PublisherImpl::asyncPublish(const std::string& topic, const std::string& message,
+                                 const OperationCallbackPtr& callback) {
   Message msg;
   msg.set_body(message);
   asyncPublish(topic, msg, callback);
@@ -100,22 +91,26 @@ void PublisherImpl::asyncPublishWithResp
   doPublish(topic, message, respCallback);
 }
 
-void PublisherImpl::doPublish(const std::string& topic, const Message& message, const ResponseCallbackPtr& callback) {
-  PubSubDataPtr data = PubSubData::forPublishRequest(client->counter().next(), topic, message, callback);
-  
-  DuplexChannelPtr channel = client->getChannel(topic);
-
-  doPublish(channel, data);
-}
-
-void PublisherImpl::doPublish(const DuplexChannelPtr& channel, const PubSubDataPtr& data) {
-  channel->storeTransaction(data);
-  
-  OperationCallbackPtr writecb(new PublishWriteCallback(client, data));
-  channel->writeRequest(data->getRequest(), writecb);
-}
-
-void PublisherImpl::messageHandler(const PubSubResponsePtr& m, const PubSubDataPtr& txn) {
+void PublisherImpl::doPublish(const std::string& topic, const Message& message,
+                              const ResponseCallbackPtr& callback) {
+  PubSubDataPtr data = PubSubData::forPublishRequest(channelManager->nextTxnId(),
+                                                     topic, message, callback);
+  LOG4CXX_INFO(logger, "Publish message (topic:" << data->getTopic() << ", txn:"
+                       << data->getTxnId() << ").");
+  channelManager->submitOp(data);
+}
+
+//
+// Publish Response Handler
+//
+PublishResponseHandler::PublishResponseHandler(const DuplexChannelManagerPtr& channelManager)
+  : ResponseHandler(channelManager) {
+  LOG4CXX_DEBUG(logger, "Created PublishResponseHandler for ChannelManager " << channelManager.get());
+}
+
+void PublishResponseHandler::handleResponse(const PubSubResponsePtr& m,
+                                            const PubSubDataPtr& txn,
+                                            const DuplexChannelPtr& channel) {
   switch (m->statuscode()) {
   case SUCCESS:
     if (m->has_responsebody()) {
@@ -128,6 +123,9 @@ void PublisherImpl::messageHandler(const
     LOG4CXX_ERROR(logger, "Server responsed with SERVICE_DOWN for " << txn->getTxnId());
     txn->getCallback()->operationFailed(ServiceDownException());
     break;
+  case NOT_RESPONSIBLE_FOR_TOPIC:
+    redirectRequest(m, txn, channel);
+    break;
   default:
     LOG4CXX_ERROR(logger, "Unexpected response " << m->statuscode() << " for " << txn->getTxnId());
     txn->getCallback()->operationFailed(UnexpectedResponseException());

Modified: zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/lib/publisherimpl.h
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/lib/publisherimpl.h?rev=1399578&r1=1399577&r2=1399578&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/lib/publisherimpl.h (original)
+++ zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/lib/publisherimpl.h Thu Oct 18 10:59:34 2012
@@ -34,20 +34,18 @@ namespace Hedwig {
     PublishResponseCallbackPtr pubCallback;
   };
 
-  class PublishWriteCallback : public OperationCallback {
+  class PublishResponseHandler : public ResponseHandler {
   public:
-    PublishWriteCallback(const ClientImplPtr& client, const PubSubDataPtr& data);
+    PublishResponseHandler(const DuplexChannelManagerPtr& channelManager);
+    virtual ~PublishResponseHandler() {};
 
-    void operationComplete();
-    void operationFailed(const std::exception& exception);
-  private:
-    ClientImplPtr client;
-    PubSubDataPtr data;
+    virtual void handleResponse(const PubSubResponsePtr& m, const PubSubDataPtr& txn,
+                                const DuplexChannelPtr& channel);
   };
 
   class PublisherImpl : public Publisher {
   public:
-    PublisherImpl(const ClientImplPtr& client);
+    PublisherImpl(const DuplexChannelManagerPtr& channelManager);
 
     PublishResponsePtr publish(const std::string& topic, const std::string& message);
     PublishResponsePtr publish(const std::string& topic, const Message& message);
@@ -56,14 +54,11 @@ namespace Hedwig {
     void asyncPublish(const std::string& topic, const Message& message, const OperationCallbackPtr& callback);
     void asyncPublishWithResponse(const std::string& topic, const Message& messsage,
                                   const PublishResponseCallbackPtr& callback);
-    
-    void messageHandler(const PubSubResponsePtr& m, const PubSubDataPtr& txn);
-
-    void doPublish(const std::string& topic, const Message& message, const ResponseCallbackPtr& callback);
-    void doPublish(const DuplexChannelPtr& channel, const PubSubDataPtr& data);
 
+    void doPublish(const std::string& topic, const Message& message,
+                   const ResponseCallbackPtr& callback);
   private:
-    ClientImplPtr client;
+    DuplexChannelManagerPtr channelManager;
   };
 
 };

Added: zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/lib/simplesubscriberimpl.cpp
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/lib/simplesubscriberimpl.cpp?rev=1399578&view=auto
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/lib/simplesubscriberimpl.cpp (added)
+++ zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/lib/simplesubscriberimpl.cpp Thu Oct 18 10:59:34 2012
@@ -0,0 +1,470 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifdef HAVE_CONFIG_H
+#include <config.h>
+#endif
+
+#include <log4cxx/logger.h>
+
+#include "simplesubscriberimpl.h"
+#include "util.h"
+
+static log4cxx::LoggerPtr logger(log4cxx::Logger::getLogger("hedwig."__FILE__));
+
+using namespace Hedwig;
+
+const int DEFAULT_MAX_MESSAGE_QUEUE_SIZE = 10;
+
+SimpleActiveSubscriber::SimpleActiveSubscriber(const PubSubDataPtr& data,
+                                               const AbstractDuplexChannelPtr& channel,
+                                               const SubscriptionPreferencesPtr& preferences,
+                                               const DuplexChannelManagerPtr& channelManager)
+  : ActiveSubscriber(data, channel, preferences, channelManager) {
+  maxQueueLen = channelManager->getConfiguration().getInt(Configuration::MAX_MESSAGE_QUEUE_SIZE,
+                                                          DEFAULT_MAX_MESSAGE_QUEUE_SIZE);
+}
+
+void SimpleActiveSubscriber::doStartDelivery(const MessageHandlerCallbackPtr& handler,
+                                             const ClientMessageFilterPtr& filter) {
+  ActiveSubscriber::doStartDelivery(handler, filter);
+  // put channel#startReceiving out of lock of subscriber#queue_lock
+  // otherwise we enter dead lock
+  // subscriber#startDelivery(subscriber#queue_lock) =>
+  // channel#startReceiving(channel#receiving_lock) =>
+  channel->startReceiving();
+}
+
+void SimpleActiveSubscriber::doStopDelivery() {
+  channel->stopReceiving();
+}
+
+void SimpleActiveSubscriber::queueMessage(const PubSubResponsePtr& m) {
+  ActiveSubscriber::queueMessage(m);
+
+  if (queue.size() >= maxQueueLen) {
+    channel->stopReceiving();
+  }
+}
+
+CloseSubscriptionCallback::CloseSubscriptionCallback(const ActiveSubscriberPtr& activeSubscriber)
+  : activeSubscriber(activeSubscriber) {
+}
+
+void CloseSubscriptionCallback::operationComplete() {
+  finish();
+}
+
+void CloseSubscriptionCallback::operationFailed(const std::exception& e) {
+  finish();
+}
+
+void CloseSubscriptionCallback::finish() {
+  // Process the disconnect logic after cleaning up
+  activeSubscriber->processEvent(activeSubscriber->getTopic(),
+                                 activeSubscriber->getSubscriberId(),
+                                 TOPIC_MOVED);
+}
+
+SimpleSubscriberClientChannelHandler::SimpleSubscriberClientChannelHandler(
+    const DuplexChannelManagerPtr& channelManager, ResponseHandlerMap& handlers)
+    : SubscriberClientChannelHandler(channelManager, handlers) {
+}
+
+bool SimpleSubscriberClientChannelHandler::setActiveSubscriber(
+  const PubSubDataPtr& op, const SubscriptionPreferencesPtr& preferences) {
+  boost::lock_guard<boost::shared_mutex> lock(subscriber_lock);
+  if (subscriber.get()) {
+    LOG4CXX_ERROR(logger, *subscriber << " has been found alive on channel " << channel.get());
+    return false;
+  }
+  subscriber = ActiveSubscriberPtr(new SimpleActiveSubscriber(op, channel, preferences,
+                                                              channelManager));
+  return true;
+}
+
+void SimpleSubscriberClientChannelHandler::deliverMessage(const TopicSubscriber& ts,
+                                                          const PubSubResponsePtr& m) {
+  ActiveSubscriberPtr as = getActiveSubscriber();
+  if (!as.get()) {
+    LOG4CXX_ERROR(logger, "No Active Subscriber found alive on channel " << channel.get());
+    return;
+  }
+  as->deliverMessage(m);
+}
+
+void SimpleSubscriberClientChannelHandler::startDelivery(const TopicSubscriber& ts,
+                                                         const MessageHandlerCallbackPtr& handler,
+                                                         const ClientMessageFilterPtr& filter) {
+  ActiveSubscriberPtr as = getActiveSubscriber();
+  if (!as.get()) {
+    LOG4CXX_ERROR(logger, "No Active Subscriber found alive on channel " << channel.get());
+    throw NotSubscribedException();
+  }
+  as->startDelivery(handler, filter);
+}
+
+void SimpleSubscriberClientChannelHandler::stopDelivery(const TopicSubscriber& ts) {
+  ActiveSubscriberPtr as = getActiveSubscriber();
+  if (!as.get()) {
+    LOG4CXX_ERROR(logger, "No Active Subscriber found alive on channel " << channel.get());
+    throw NotSubscribedException();
+  }
+  as->stopDelivery();
+}
+
+bool SimpleSubscriberClientChannelHandler::hasSubscription(const TopicSubscriber& ts) {
+  ActiveSubscriberPtr as = getActiveSubscriber();
+  if (!as.get()) {
+    return false;
+  }
+  return ts.first == as->getTopic() && ts.second == as->getSubscriberId();
+}
+
+void SimpleSubscriberClientChannelHandler::asyncCloseSubscription(
+  const TopicSubscriber& ts, const OperationCallbackPtr& callback) {
+  // just remove the active subscriber
+  ActiveSubscriberPtr as = getActiveSubscriber();
+  if (as.get()) {
+    as->close();
+    clearActiveSubscriber();
+  }
+  callback->operationComplete();
+}
+
+void SimpleSubscriberClientChannelHandler::consume(const TopicSubscriber& ts,
+                                                   const MessageSeqId& messageSeqId) {
+  ActiveSubscriberPtr as = getActiveSubscriber();
+  if (!as.get()) {
+    LOG4CXX_ERROR(logger, "No Active Subscriber found alive on channel " << channel.get());
+    return;
+  }
+  as->consume(messageSeqId);
+}
+
+void SimpleSubscriberClientChannelHandler::onChannelDisconnected(
+  const DuplexChannelPtr& channel) {
+  ActiveSubscriberPtr as = getActiveSubscriber();
+  if (!as.get()) {
+    LOG4CXX_ERROR(logger, "No Active Subscriber found when channel " << channel.get()
+                          << " disconnected.");
+    // no active subscriber found, but we still need to close the channel
+    channelManager->removeChannel(channel);
+    return;
+  }
+  // Clear the topic owner ship
+  channelManager->clearHostForTopic(as->getTopic(), channel->getHostAddress());
+
+  // When the channel disconnected, if resubscribe is required, we would just
+  // cleanup the old channel when resubscribe succeed.
+  // Otherwise, we would cleanup the old channel then notify with a TOPIC_MOVED event
+  LOG4CXX_INFO(logger, "Tell " << *as << " his channel " << channel.get() << " is disconnected.");
+  if (!as->isResubscribeRequired()) {
+    OperationCallbackPtr closeCb(new CloseSubscriptionCallback(as));
+    TopicSubscriber ts(as->getTopic(), as->getSubscriberId());
+    channelManager->asyncCloseSubscription(ts, closeCb);
+  } else {
+    as->processEvent(as->getTopic(), as->getSubscriberId(), TOPIC_MOVED);
+  }
+}
+
+void SimpleSubscriberClientChannelHandler::closeHandler() {
+  // just remove the active subscriber
+  ActiveSubscriberPtr as = getActiveSubscriber();
+  if (as.get()) {
+    as->close();
+    clearActiveSubscriber();
+    LOG4CXX_DEBUG(logger, "Closed " << *as << ".");
+  }
+}
+
+//
+// Subscribe Response Handler
+//
+SimpleSubscribeResponseHandler::SimpleSubscribeResponseHandler(
+  const SimpleDuplexChannelManagerPtr& channelManager)
+  : ResponseHandler(boost::dynamic_pointer_cast<DuplexChannelManager>(channelManager)),
+    sChannelManager(channelManager) {
+}
+
+void SimpleSubscribeResponseHandler::handleSuccessResponse(
+  const PubSubResponsePtr& m, const PubSubDataPtr& txn,
+  const SimpleSubscriberClientChannelHandlerPtr& handler) {
+  // for subscribe request, check whether is any subscription preferences received
+  SubscriptionPreferencesPtr preferences;
+  if (m->has_responsebody()) {
+    const ResponseBody& respBody = m->responsebody();
+    if (respBody.has_subscriberesponse()) {
+      const SubscribeResponse& resp = respBody.subscriberesponse();
+      if (resp.has_preferences()) {
+        preferences = SubscriptionPreferencesPtr(new SubscriptionPreferences(resp.preferences()));
+      }
+    }
+  }
+
+  handler->setActiveSubscriber(txn, preferences);
+  TopicSubscriber ts(txn->getTopic(), txn->getSubscriberId());
+  if (!sChannelManager->storeSubscriptionChannelHandler(ts, txn, handler)) {
+    // found existed subscription channel handler
+    handler->close();
+    if (txn->isResubscribeRequest()) {
+      txn->getCallback()->operationFailed(ResubscribeException());
+    } else {
+      txn->getCallback()->operationFailed(AlreadySubscribedException());
+    }
+    return;
+  }
+  if (m->has_responsebody()) {
+    txn->getCallback()->operationComplete(m->responsebody());
+  } else {
+    txn->getCallback()->operationComplete(ResponseBody());
+  }
+}
+
+void SimpleSubscribeResponseHandler::handleResponse(const PubSubResponsePtr& m,
+                                                    const PubSubDataPtr& txn,
+                                                    const DuplexChannelPtr& channel) {
+  if (!txn.get()) {
+    LOG4CXX_ERROR(logger, "Invalid transaction recevied from channel " << channel.get());
+    return;
+  }
+
+  LOG4CXX_DEBUG(logger, "message received with status " << m->statuscode()
+                        << " from channel " << channel.get());
+
+  SimpleSubscriberClientChannelHandlerPtr handler =
+    boost::dynamic_pointer_cast<SimpleSubscriberClientChannelHandler>(channel->getChannelHandler());
+  if (!handler.get()) {
+    LOG4CXX_ERROR(logger, "No simple subscriber client channel handler found for channel "
+                          << channel.get() << ".");
+    // No channel handler, but we still need to close the channel
+    channel->close();
+    txn->getCallback()->operationFailed(NoChannelHandlerException());
+    return;
+  }
+
+  if (SUCCESS != m->statuscode()) {
+    // Subscribe request doesn't succeed, we close the handle and its binding channel
+    handler->close();
+  }
+
+  switch (m->statuscode()) {
+  case SUCCESS:
+    handleSuccessResponse(m, txn, handler);
+    break;
+  case SERVICE_DOWN:
+    txn->getCallback()->operationFailed(ServiceDownException());
+    break;
+  case CLIENT_ALREADY_SUBSCRIBED:
+  case TOPIC_BUSY:
+    txn->getCallback()->operationFailed(AlreadySubscribedException());
+    break;
+  case CLIENT_NOT_SUBSCRIBED:
+    txn->getCallback()->operationFailed(NotSubscribedException());
+    break;
+  case NOT_RESPONSIBLE_FOR_TOPIC:
+    redirectRequest(m, txn, channel);
+    break;
+  default:
+    LOG4CXX_ERROR(logger, "Unexpected response " << m->statuscode() << " for " << txn->getTxnId());
+    txn->getCallback()->operationFailed(UnexpectedResponseException());
+    break;
+  }
+}
+
+//
+// Simple Duplex Channel Manager
+//
+SimpleDuplexChannelManager::SimpleDuplexChannelManager(const Configuration& conf)
+  : DuplexChannelManager(conf) {
+  LOG4CXX_DEBUG(logger, "Created SimpleDuplexChannelManager " << this);
+}
+
+SimpleDuplexChannelManager::~SimpleDuplexChannelManager() {
+  LOG4CXX_DEBUG(logger, "Destroyed SimpleDuplexChannelManager " << this);
+}
+
+void SimpleDuplexChannelManager::start() {
+  // Add subscribe response handler
+  subscriptionHandlers[SUBSCRIBE] =
+    ResponseHandlerPtr(new SimpleSubscribeResponseHandler(
+    boost::dynamic_pointer_cast<SimpleDuplexChannelManager>(shared_from_this())));
+  DuplexChannelManager::start();
+}
+
+void SimpleDuplexChannelManager::close() {
+  DuplexChannelManager::close();
+  subscriptionHandlers.clear();
+}
+
+SubscriberClientChannelHandlerPtr
+SimpleDuplexChannelManager::getSubscriptionChannelHandler(const TopicSubscriber& ts) {
+  return boost::dynamic_pointer_cast<SubscriberClientChannelHandler>(
+         getSimpleSubscriptionChannelHandler(ts));
+}
+
+const SimpleSubscriberClientChannelHandlerPtr&
+SimpleDuplexChannelManager::getSimpleSubscriptionChannelHandler(const TopicSubscriber& ts) {
+  boost::shared_lock<boost::shared_mutex> lock(topicsubscriber2handler_lock);
+  return topicsubscriber2handler[ts];
+}
+
+DuplexChannelPtr SimpleDuplexChannelManager::getSubscriptionChannel(
+  const TopicSubscriber& ts, const bool isResubscribeRequest) {
+  SimpleSubscriberClientChannelHandlerPtr handler;
+  // for resubscribe request, we forced a new subscription channel
+  if (!isResubscribeRequest) {
+    handler = getSimpleSubscriptionChannelHandler(ts);
+  }
+  // found a live subscription channel
+  if (handler.get()) {
+    return boost::dynamic_pointer_cast<DuplexChannel>(handler->getChannel());
+  }
+  const HostAddress& addr = getHostForTopic(ts.first);
+  if (addr.isNullHost()) {
+    return DuplexChannelPtr();
+  } else {
+    // we had known which hub server owned the topic
+    DuplexChannelPtr ch = getSubscriptionChannel(addr);
+    if (ch.get()) {
+      return ch;
+    }
+    ch = createSubscriptionChannel(addr);
+    return storeSubscriptionChannel(ch, true);
+  }
+}
+
+DuplexChannelPtr SimpleDuplexChannelManager::getSubscriptionChannel(const HostAddress& addr) {
+  // for simple subscription channel, we established a new channel each time
+  return DuplexChannelPtr();
+}
+
+DuplexChannelPtr SimpleDuplexChannelManager::createSubscriptionChannel(const HostAddress& addr) {
+  // Create a simple subscriber channel handler
+  SimpleSubscriberClientChannelHandler * subscriberHandler =
+    new SimpleSubscriberClientChannelHandler(
+      boost::dynamic_pointer_cast<SimpleDuplexChannelManager>(shared_from_this()),
+      subscriptionHandlers);
+  ChannelHandlerPtr channelHandler(subscriberHandler);
+  // Create a subscription channel
+  DuplexChannelPtr channel = createChannel(dispatcher->getService(), addr, channelHandler);
+  subscriberHandler->setChannel(boost::dynamic_pointer_cast<AbstractDuplexChannel>(channel));
+  LOG4CXX_INFO(logger, "New subscription channel " << channel.get() << " is created to host "
+                       << addr << ", whose channel handler is " << subscriberHandler);
+  return channel;
+}
+
+DuplexChannelPtr SimpleDuplexChannelManager::storeSubscriptionChannel(const DuplexChannelPtr& ch,
+                                                                      bool doConnect) {
+  // for simple duplex channel manager
+  // we just store subscription channel handler until subscribe successfully
+  if (doConnect) {
+    ch->connect();
+  }
+  return ch;
+}
+
+bool SimpleDuplexChannelManager::storeSubscriptionChannelHandler(
+  const TopicSubscriber& ts, const PubSubDataPtr& txn,
+  const SimpleSubscriberClientChannelHandlerPtr& handler) {
+  SimpleSubscriberClientChannelHandlerPtr other;
+  bool success = false;
+  bool isResubscribeRequest = txn->isResubscribeRequest();
+  {
+    boost::lock_guard<boost::shared_mutex> lock(topicsubscriber2handler_lock);
+    other = topicsubscriber2handler[ts];
+    if (other.get()) {
+      if (isResubscribeRequest) {
+        DuplexChannelPtr& origChannel = txn->getOrigChannelForResubscribe();
+        const AbstractDuplexChannelPtr& otherChannel =
+          other->getChannel();
+        if (origChannel.get() != otherChannel.get()) {
+          // channel has been changed for a specific subscriber
+          // which means the client closesub and subscribe again
+          // when channel disconnect to resubscribe for it.
+          // so we should not let the resubscribe succeed
+          success = false;
+        } else {
+          topicsubscriber2handler[ts] = handler;
+          success = true;
+        }
+      } else {
+        success = false;
+      }
+    } else {
+      if (isResubscribeRequest) {
+        // if it is a resubscribe request and there is no handler found
+        // which means a closesub has been called when resubscribing
+        // so we should not let the resubscribe succeed
+        success = false;
+      } else {
+        topicsubscriber2handler[ts] = handler;
+        success = true;
+      }
+    }
+  }
+  if (isResubscribeRequest && success && other.get()) {
+    // the old handler is evicted due to resubscribe succeed
+    // so it is the time to close the old disconnected channel now
+    other->close();
+  }
+  return success;
+}
+
+void SimpleDuplexChannelManager::asyncCloseSubscription(const TopicSubscriber& ts,
+                                                        const OperationCallbackPtr& callback) {
+  SimpleSubscriberClientChannelHandlerPtr handler;
+  {
+    boost::lock_guard<boost::shared_mutex> lock(topicsubscriber2handler_lock);
+    handler = topicsubscriber2handler[ts];
+    topicsubscriber2handler.erase(ts);
+    LOG4CXX_DEBUG(logger, "CloseSubscription:: remove subscriber channel handler for (topic:"
+                          << ts.first << ", subscriber:" << ts.second << ").");
+  }
+
+  if (handler.get() != 0) {
+    handler->close();
+  }
+  callback->operationComplete();
+}
+
+void SimpleDuplexChannelManager::handoverDelivery(const TopicSubscriber& ts,
+                                                  const MessageHandlerCallbackPtr& msgHandler,
+                                                  const ClientMessageFilterPtr& filter) {
+  SimpleSubscriberClientChannelHandlerPtr handler;
+  {
+    boost::shared_lock<boost::shared_mutex> lock(topicsubscriber2handler_lock);
+    handler = topicsubscriber2handler[ts];
+  }
+
+  if (!handler.get()) {
+    LOG4CXX_WARN(logger, "No channel handler found for (topic:" << ts.first << ", subscriber:"
+                         << ts.second << ") to handover delivery with handler "
+                         << msgHandler.get() << ", filter " << filter.get() << ".");
+    return;
+  }
+  try {
+    handler->startDelivery(ts, msgHandler, filter);
+  } catch(const AlreadyStartDeliveryException& ase) {
+    LOG4CXX_WARN(logger, "Other one has started delivery for (topic:" << ts.first <<
+                         ", subscriber:" << ts.second << ") using brand new message handler. "
+                         << "It is OK that we could give up handing over old message handler.");
+  } catch(const std::exception& e) {
+    LOG4CXX_WARN(logger, "Error when handing over old message handler for (topic:" << ts.first
+                         << ", subscriber:" << ts.second << ") : " << e.what());
+  }
+}