You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zookeeper.apache.org by iv...@apache.org on 2012/10/18 12:59:35 UTC
svn commit: r1399578 [1/2] - in /zookeeper/bookkeeper/trunk: ./
hedwig-client/src/main/cpp/inc/hedwig/ hedwig-client/src/main/cpp/lib/
hedwig-client/src/main/cpp/test/
Author: ivank
Date: Thu Oct 18 10:59:34 2012
New Revision: 1399578
URL: http://svn.apache.org/viewvc?rev=1399578&view=rev
Log:
BOOKKEEPER-369: re-factor hedwig cpp client to provide better interface to support both one-subscription-per-channel and multiple-subscriptions-per-channel. (sijie via ivank)
Added:
zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/lib/simplesubscriberimpl.cpp
zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/lib/simplesubscriberimpl.h
Modified:
zookeeper/bookkeeper/trunk/CHANGES.txt
zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/inc/hedwig/exceptions.h
zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/inc/hedwig/subscribe.h
zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/lib/Makefile.am
zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/lib/channel.cpp
zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/lib/channel.h
zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/lib/clientimpl.cpp
zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/lib/clientimpl.h
zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/lib/data.cpp
zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/lib/data.h
zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/lib/eventdispatcher.cpp
zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/lib/publisherimpl.cpp
zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/lib/publisherimpl.h
zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/lib/subscriberimpl.cpp
zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/lib/subscriberimpl.h
zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/lib/util.cpp
zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/lib/util.h
zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/test/messageboundtest.cpp
zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/test/pubsubtest.cpp
zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/test/subscribetest.cpp
zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/test/util.h
Modified: zookeeper/bookkeeper/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/CHANGES.txt?rev=1399578&r1=1399577&r2=1399578&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/CHANGES.txt (original)
+++ zookeeper/bookkeeper/trunk/CHANGES.txt Thu Oct 18 10:59:34 2012
@@ -198,6 +198,8 @@ Trunk (unreleased changes)
BOOKKEEPER-413: Hedwig C++ client: Rename RUN_AS_SSL_MODE to SSL_ENABLED (ivank via sijie)
+ BOOKKEEPER-369: re-factor hedwig cpp client to provide better interface to support both one-subscription-per-channel and multiple-subscriptions-per-channel. (sijie via ivank)
+
Release 4.1.0 - 2012-06-07
Non-backward compatible changes:
Modified: zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/inc/hedwig/exceptions.h
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/inc/hedwig/exceptions.h?rev=1399578&r1=1399577&r2=1399578&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/inc/hedwig/exceptions.h (original)
+++ zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/inc/hedwig/exceptions.h Thu Oct 18 10:59:34 2012
@@ -32,16 +32,20 @@ namespace Hedwig {
class OomException : public ClientException {};
class UnknownRequestException : public ClientException {};
class InvalidRedirectException : public ClientException {};
+ class NoChannelHandlerException : public ClientException {};
class PublisherException : public ClientException { };
-
class SubscriberException : public ClientException { };
class AlreadySubscribedException : public SubscriberException {};
class NotSubscribedException : public SubscriberException {};
+ class ResubscribeException : public SubscriberException {};
class NullMessageHandlerException : public SubscriberException {};
class NullMessageFilterException : public SubscriberException {};
+ class AlreadyStartDeliveryException : public SubscriberException {};
+ class StartingDeliveryException : public SubscriberException {};
+
class ConfigurationException : public ClientException { };
class InvalidPortException : public ConfigurationException {};
class HostResolutionException : public ClientException {};
Modified: zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/inc/hedwig/subscribe.h
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/inc/hedwig/subscribe.h?rev=1399578&r1=1399577&r2=1399578&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/inc/hedwig/subscribe.h (original)
+++ zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/inc/hedwig/subscribe.h Thu Oct 18 10:59:34 2012
@@ -51,7 +51,10 @@ namespace Hedwig {
virtual void stopDelivery(const std::string& topic, const std::string& subscriberId) = 0;
+ virtual bool hasSubscription(const std::string& topic, const std::string& subscriberId) = 0;
virtual void closeSubscription(const std::string& topic, const std::string& subscriberId) = 0;
+ virtual void asyncCloseSubscription(const std::string& topic, const std::string& subscriberId,
+ const OperationCallbackPtr& callback) = 0;
//
// API to register/unregister subscription listeners for receiving
Modified: zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/lib/Makefile.am
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/lib/Makefile.am?rev=1399578&r1=1399577&r2=1399578&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/lib/Makefile.am (original)
+++ zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/lib/Makefile.am Thu Oct 18 10:59:34 2012
@@ -19,7 +19,7 @@
PROTODEF = ../../../../../hedwig-protocol/src/main/protobuf/PubSubProtocol.proto
lib_LTLIBRARIES = libhedwig01.la
-libhedwig01_la_SOURCES = protocol.cpp channel.cpp client.cpp util.cpp clientimpl.cpp publisherimpl.cpp subscriberimpl.cpp eventdispatcher.cpp data.cpp filterablemessagehandler.cpp
+libhedwig01_la_SOURCES = protocol.cpp channel.cpp client.cpp util.cpp clientimpl.cpp publisherimpl.cpp subscriberimpl.cpp eventdispatcher.cpp data.cpp filterablemessagehandler.cpp simplesubscriberimpl.cpp
libhedwig01_la_CPPFLAGS = -I$(top_srcdir)/inc $(DEPS_CFLAGS)
libhedwig01_la_LIBADD = $(DEPS_LIBS) $(BOOST_CPPFLAGS)
libhedwig01_la_LDFLAGS = -no-undefined $(BOOST_ASIO_LIB) $(BOOST_LDFLAGS) $(BOOST_THREAD_LIB)
Modified: zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/lib/channel.cpp
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/lib/channel.cpp?rev=1399578&r1=1399577&r2=1399578&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/lib/channel.cpp (original)
+++ zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/lib/channel.cpp Thu Oct 18 10:59:34 2012
@@ -47,7 +47,6 @@ static log4cxx::LoggerPtr logger(log4cxx
using namespace Hedwig;
-const bool DEFAULT_SSL_ENABLED = false;
const std::string DEFAULT_SSL_PEM_FILE = "";
AbstractDuplexChannel::AbstractDuplexChannel(IOServicePtr& service,
@@ -67,6 +66,10 @@ AbstractDuplexChannel::~AbstractDuplexCh
LOG4CXX_INFO(logger, "Destroying DuplexChannel(" << this << ")");
}
+ChannelHandlerPtr AbstractDuplexChannel::getChannelHandler() {
+ return handler;
+}
+
/*static*/ void AbstractDuplexChannel::connectCallbackHandler(
AbstractDuplexChannelPtr channel,
OperationCallbackPtr callback,
@@ -601,6 +604,7 @@ void AsioDuplexChannel::closeSocket() {
if (ec) {
LOG4CXX_WARN(logger, "Channel " << this << " close error : " << ec.message().c_str());
}
+ LOG4CXX_DEBUG(logger, "Closed socket for channel " << this << ".");
}
// SSL Context Factory
@@ -795,35 +799,3 @@ void AsioSSLDuplexChannel::closeLowestLa
LOG4CXX_WARN(logger, "Channel " << this << " close error : " << ec.message().c_str());
}
}
-
-DuplexChannelManagerPtr DuplexChannelManager::create(const Configuration& conf,
- EventDispatcher& dispatcher) {
- DuplexChannelManagerPtr factory(new DuplexChannelManager(conf, dispatcher));
- LOG4CXX_DEBUG(logger, "Created DuplexChannelManager " << factory);
- return factory;
-}
-
-DuplexChannelManager::DuplexChannelManager(const Configuration& conf,
- EventDispatcher& dispatcher)
- : conf(conf), dispatcher(dispatcher) {
- sslEnabled = conf.getBool(Configuration::SSL_ENABLED, DEFAULT_SSL_ENABLED);
- if (sslEnabled) {
- sslCtxFactory = SSLContextFactoryPtr(new SSLContextFactory(conf));
- }
-}
-
-DuplexChannelManager::~DuplexChannelManager() {
-}
-
-DuplexChannelPtr DuplexChannelManager::createChannel(const HostAddress& addr,
- const ChannelHandlerPtr& handler) {
- LOG4CXX_DEBUG(logger, "Creating channel with handler " << handler.get());
- IOServicePtr& service = dispatcher.getService();
- if (sslEnabled) {
- boost_ssl_context_ptr sslCtx =
- sslCtxFactory->createSSLContext(service->getService());
- return DuplexChannelPtr(new AsioSSLDuplexChannel(service, sslCtx, addr, handler));
- } else {
- return DuplexChannelPtr(new AsioDuplexChannel(service, addr, handler));
- }
-}
Modified: zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/lib/channel.h
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/lib/channel.h?rev=1399578&r1=1399577&r2=1399578&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/lib/channel.h (original)
+++ zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/lib/channel.h Thu Oct 18 10:59:34 2012
@@ -88,6 +88,9 @@ namespace Hedwig {
public:
virtual ~DuplexChannel() {}
+ // Return the channel handler bound with a channel
+ virtual ChannelHandlerPtr getChannelHandler() = 0;
+
// Issues a connect request to the target host
// User could writeRequest after issued connect request, those requests should
// be buffered and written until the channel is connected.
@@ -156,25 +159,6 @@ namespace Hedwig {
typedef boost::shared_ptr<SSLContextFactory> SSLContextFactoryPtr;
- class DuplexChannelManager;
- typedef boost::shared_ptr<DuplexChannelManager> DuplexChannelManagerPtr;
-
- class DuplexChannelManager : public boost::enable_shared_from_this<DuplexChannelManager> {
- public:
- static DuplexChannelManagerPtr create(const Configuration& conf,
- EventDispatcher& dispatcher);
- ~DuplexChannelManager();
-
- DuplexChannelPtr createChannel(const HostAddress& addr, const ChannelHandlerPtr& handler);
- private:
- DuplexChannelManager(const Configuration& conf, EventDispatcher& dispatcher);
-
- const Configuration& conf;
- EventDispatcher& dispatcher;
- bool sslEnabled;
- SSLContextFactoryPtr sslCtxFactory;
- };
-
class AbstractDuplexChannel;
typedef boost::shared_ptr<AbstractDuplexChannel> AbstractDuplexChannelPtr;
@@ -186,6 +170,8 @@ namespace Hedwig {
const ChannelHandlerPtr& handler);
virtual ~AbstractDuplexChannel();
+ virtual ChannelHandlerPtr getChannelHandler();
+
//
// Connect Operation
//
Modified: zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/lib/clientimpl.cpp
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/lib/clientimpl.cpp?rev=1399578&r1=1399577&r2=1399578&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/lib/clientimpl.cpp (original)
+++ zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/lib/clientimpl.cpp Thu Oct 18 10:59:34 2012
@@ -23,13 +23,16 @@
#include "channel.h"
#include "publisherimpl.h"
#include "subscriberimpl.h"
+#include "simplesubscriberimpl.h"
#include <log4cxx/logger.h>
static log4cxx::LoggerPtr logger(log4cxx::Logger::getLogger("hedwig."__FILE__));
using namespace Hedwig;
+const int DEFAULT_MESSAGE_FORCE_CONSUME_RETRY_WAIT_TIME = 5000;
const std::string DEFAULT_SERVER_DEFAULT_VAL = "";
+const bool DEFAULT_SSL_ENABLED = false;
void SyncOperationCallback::wait() {
boost::unique_lock<boost::mutex> lock(mut);
@@ -104,8 +107,51 @@ void SyncOperationCallback::throwExcepti
}
}
-HedwigClientChannelHandler::HedwigClientChannelHandler(const ClientImplPtr& client)
- : client(client){
+ResponseHandler::ResponseHandler(const DuplexChannelManagerPtr& channelManager)
+ : channelManager(channelManager) {
+}
+
+void ResponseHandler::redirectRequest(const PubSubResponsePtr& response,
+ const PubSubDataPtr& data,
+ const DuplexChannelPtr& channel) {
+ HostAddress oldhost = channel->getHostAddress();
+ data->addTriedServer(oldhost);
+
+ HostAddress h;
+ bool redirectToDefaultHost = true;
+ if (response->has_statusmsg()) {
+ try {
+ h = HostAddress::fromString(response->statusmsg());
+ redirectToDefaultHost = false;
+ } catch (std::exception& e) {
+ h = channelManager->getDefaultHost();
+ }
+ } else {
+ h = channelManager->getDefaultHost();
+ }
+ if (data->hasTriedServer(h)) {
+ LOG4CXX_ERROR(logger, "We've been told to try request [" << data->getTxnId() << "] with ["
+ << h.getAddressString()<< "] by " << oldhost.getAddressString()
+ << " but we've already tried that. Failing operation");
+ data->getCallback()->operationFailed(InvalidRedirectException());
+ return;
+ }
+ LOG4CXX_INFO(logger, "We've been told [" << data->getTopic() << "] is on [" << h.getAddressString()
+ << "] by [" << oldhost.getAddressString() << "]. Redirecting request "
+ << data->getTxnId());
+ data->setShouldClaim(true);
+
+ // submit the request again to the target host
+ if (redirectToDefaultHost) {
+ channelManager->submitOpToDefaultServer(data);
+ } else {
+ channelManager->redirectOpToHost(data, h);
+ }
+}
+
+HedwigClientChannelHandler::HedwigClientChannelHandler(const DuplexChannelManagerPtr& channelManager,
+ ResponseHandlerMap& handlers)
+ : channelManager(channelManager), handlers(handlers), closed(false), disconnected(false) {
}
void HedwigClientChannelHandler::messageReceived(const DuplexChannelPtr& channel, const PubSubResponsePtr& m) {
@@ -120,247 +166,421 @@ void HedwigClientChannelHandler::message
/* you now have ownership of data, don't leave this funciton without deleting it or
palming it off to someone else */
- if (data == NULL) {
+ if (data.get() == 0) {
+ LOG4CXX_ERROR(logger, "No pub/sub request for txnid(" << m->txnid() << ").");
return;
}
- if (m->statuscode() == NOT_RESPONSIBLE_FOR_TOPIC) {
- client->redirectRequest(channel, data, m);
- return;
+ // Store the topic2Host mapping if this wasn't a server redirect.
+ // TODO: add specific response for failure of getting topic ownership
+ // to distinguish SERVICE_DOWN to failure of getting topic ownership
+ if (m->statuscode() != NOT_RESPONSIBLE_FOR_TOPIC) {
+ const HostAddress& host = channel->getHostAddress();
+ channelManager->setHostForTopic(data->getTopic(), host);
}
- switch (data->getType()) {
- case PUBLISH:
- client->getPublisherImpl().messageHandler(m, data);
- break;
- case SUBSCRIBE:
- case UNSUBSCRIBE:
- client->getSubscriberImpl().messageHandler(m, data);
- break;
- default:
- LOG4CXX_ERROR(logger, "Unimplemented request type " << data->getType());
- break;
+ const ResponseHandlerPtr& respHandler = handlers[data->getType()];
+ if (respHandler.get()) {
+ respHandler->handleResponse(m, data, channel);
+ } else {
+ LOG4CXX_ERROR(logger, "Unimplemented request type " << data->getType() << " : "
+ << *data);
+ data->getCallback()->operationFailed(UnknownRequestException());
}
}
-
void HedwigClientChannelHandler::channelConnected(const DuplexChannelPtr& channel) {
// do nothing
}
-void HedwigClientChannelHandler::channelDisconnected(const DuplexChannelPtr& channel, const std::exception& e) {
- LOG4CXX_ERROR(logger, "Channel disconnected");
+void HedwigClientChannelHandler::channelDisconnected(const DuplexChannelPtr& channel,
+ const std::exception& e) {
+ if (channelManager->isClosed()) {
+ return;
+ }
- client->channelDied(channel);
+ // If this channel was closed explicitly by the client code,
+ // we do not need to do any of this logic. This could happen
+ // for redundant Publish channels created or redirected subscribe
+ // channels that are not used anymore or when we shutdown the
+ // client and manually close all of the open channels.
+ // Also don't do any of the disconnect logic if the client has stopped.
+ {
+ boost::lock_guard<boost::shared_mutex> lock(close_lock);
+ if (closed) {
+ return;
+ }
+ if (disconnected) {
+ return;
+ }
+ disconnected = true;
+ }
+ LOG4CXX_INFO(logger, "Channel " << channel.get() << " was disconnected.");
+ // execute logic after channel disconnected
+ onChannelDisconnected(channel);
+}
+
+void HedwigClientChannelHandler::onChannelDisconnected(const DuplexChannelPtr& channel) {
+ // Clean up the channel from channel manager
+ channelManager->nonSubscriptionChannelDied(channel);
}
void HedwigClientChannelHandler::exceptionOccurred(const DuplexChannelPtr& channel, const std::exception& e) {
LOG4CXX_ERROR(logger, "Exception occurred" << e.what());
}
-ClientTxnCounter::ClientTxnCounter() : counter(0)
-{
+void HedwigClientChannelHandler::close() {
+ {
+ boost::lock_guard<boost::shared_mutex> lock(close_lock);
+ if (closed) {
+ return;
+ }
+ closed = true;
+ }
+ // do close handle logic here
+ doClose();
}
-ClientTxnCounter::~ClientTxnCounter() {
+void HedwigClientChannelHandler::doClose() {
+ // do nothing for generic client channel handler
}
-/**
-Increment the transaction counter and return the new value.
+//
+// Pub/Sub Request Write Callback
+//
+PubSubWriteCallback::PubSubWriteCallback(const DuplexChannelPtr& channel,
+ const PubSubDataPtr& data)
+ : channel(channel), data(data) {
+}
-@returns the next transaction id
-*/
-long ClientTxnCounter::next() { // would be nice to remove lock from here, look more into it
- boost::lock_guard<boost::mutex> lock(mutex);
+void PubSubWriteCallback::operationComplete() {
+ LOG4CXX_INFO(logger, "Successfully wrote pubsub request : " << *data << " to channel "
+ << channel.get());
+}
- long next= ++counter;
+void PubSubWriteCallback::operationFailed(const std::exception& exception) {
+ LOG4CXX_ERROR(logger, "Error writing pubsub request (" << *data << ") : " << exception.what());
- return next;
+ // remove the transaction from channel if write failed
+ channel->retrieveTransaction(data->getTxnId());
+ data->getCallback()->operationFailed(exception);
}
-ClientImplPtr ClientImpl::Create(const Configuration& conf) {
- ClientImplPtr impl(new ClientImpl(conf));
- LOG4CXX_DEBUG(logger, "Creating Clientimpl " << impl);
- impl->dispatcher->start();
- return impl;
+//
+// Default Server Connect Callback
+//
+DefaultServerConnectCallback::DefaultServerConnectCallback(const DuplexChannelManagerPtr& channelManager,
+ const DuplexChannelPtr& channel,
+ const PubSubDataPtr& data)
+ : channelManager(channelManager), channel(channel), data(data) {
}
-void ClientImpl::Destroy() {
- LOG4CXX_DEBUG(logger, "destroying Clientimpl " << this);
+void DefaultServerConnectCallback::operationComplete() {
+ LOG4CXX_DEBUG(logger, "Channel " << channel.get() << " is connected to host "
+ << channel->getHostAddress() << ".");
+ // After connected, we got the right ip for the target host
+ // so we could submit the request right now
+ channelManager->submitOpThruChannel(data, channel);
+}
- {
- boost::lock_guard<boost::shared_mutex> lock(allchannels_lock);
-
- shuttingDownFlag = true;
- for (ChannelMap::iterator iter = allchannels.begin(); iter != allchannels.end(); ++iter ) {
- (*iter)->close();
- }
- allchannels.clear();
- }
- // SSL Channel shutdown needs to send packets to server
- // so we only stop dispatcher after all channels are closed
- dispatcher->stop();
+void DefaultServerConnectCallback::operationFailed(const std::exception& exception) {
+ LOG4CXX_ERROR(logger, "Channel " << channel.get() << " failed to connect to host "
+ << channel->getHostAddress() << " : " << exception.what());
+ data->getCallback()->operationFailed(exception);
+}
- /* destruction of the maps will clean up any items they hold */
-
- if (subscriber != NULL) {
- delete subscriber;
- subscriber = NULL;
+//
+// Subscription Event Emitter
+//
+SubscriptionEventEmitter::SubscriptionEventEmitter() {}
+
+void SubscriptionEventEmitter::addSubscriptionListener(
+ SubscriptionListenerPtr& listener) {
+ boost::lock_guard<boost::shared_mutex> lock(listeners_lock);
+ listeners.insert(listener);
+}
+
+void SubscriptionEventEmitter::removeSubscriptionListener(
+ SubscriptionListenerPtr& listener) {
+ boost::lock_guard<boost::shared_mutex> lock(listeners_lock);
+ listeners.erase(listener);
+}
+
+void SubscriptionEventEmitter::emitSubscriptionEvent(
+ const std::string& topic, const std::string& subscriberId,
+ const SubscriptionEvent event) {
+ boost::shared_lock<boost::shared_mutex> lock(listeners_lock);
+ if (0 == listeners.size()) {
+ return;
}
- if (publisher != NULL) {
- delete publisher;
- publisher = NULL;
+ for (SubscriptionListenerSet::iterator iter = listeners.begin();
+ iter != listeners.end(); ++iter) {
+ (*iter)->processEvent(topic, subscriberId, event);
}
}
-ClientImpl::ClientImpl(const Configuration& conf)
- : conf(conf), publisher(NULL), subscriber(NULL), counterobj(), shuttingDownFlag(false)
-{
+//
+// Channel Manager Used to manage all established channels
+//
+
+DuplexChannelManagerPtr DuplexChannelManager::create(const Configuration& conf) {
+ DuplexChannelManagerPtr manager(new SimpleDuplexChannelManager(conf));
+ LOG4CXX_DEBUG(logger, "Created DuplexChannelManager " << manager);
+ return manager;
+}
+
+DuplexChannelManager::DuplexChannelManager(const Configuration& conf)
+ : dispatcher(new EventDispatcher(conf)), conf(conf), closed(false), counterobj() {
+ sslEnabled = conf.getBool(Configuration::SSL_ENABLED, DEFAULT_SSL_ENABLED);
defaultHost = HostAddress::fromString(conf.get(Configuration::DEFAULT_SERVER,
DEFAULT_SERVER_DEFAULT_VAL));
- dispatcher = EventDispatcherPtr(new EventDispatcher(conf));
- channelManager = DuplexChannelManager::create(conf, *dispatcher);
+ if (sslEnabled) {
+ sslCtxFactory = SSLContextFactoryPtr(new SSLContextFactory(conf));
+ }
+ LOG4CXX_DEBUG(logger, "Created DuplexChannelManager " << this << " with default server "
+ << defaultHost);
}
-Subscriber& ClientImpl::getSubscriber() {
- return getSubscriberImpl();
+DuplexChannelManager::~DuplexChannelManager() {
+ LOG4CXX_DEBUG(logger, "Destroyed DuplexChannelManager " << this);
}
-Publisher& ClientImpl::getPublisher() {
- return getPublisherImpl();
+void DuplexChannelManager::submitTo(const PubSubDataPtr& op, const DuplexChannelPtr& channel) {
+ if (channel.get()) {
+ channel->storeTransaction(op);
+ OperationCallbackPtr writecb(new PubSubWriteCallback(channel, op));
+ LOG4CXX_DEBUG(logger, "Submit pub/sub request " << *op << " thru channel " << channel.get());
+ channel->writeRequest(op->getRequest(), writecb);
+ } else {
+ submitOpToDefaultServer(op);
+ }
}
-SubscriberImpl& ClientImpl::getSubscriberImpl() {
- if (subscriber == NULL) {
- boost::lock_guard<boost::mutex> lock(subscribercreate_lock);
- if (subscriber == NULL) {
- subscriber = new SubscriberImpl(shared_from_this());
- }
+// Submit a pub/sub request
+void DuplexChannelManager::submitOp(const PubSubDataPtr& op) {
+ DuplexChannelPtr channel;
+ switch (op->getType()) {
+ case PUBLISH:
+ case UNSUBSCRIBE:
+ channel = getNonSubscriptionChannel(op->getTopic());
+ break;
+ default:
+ TopicSubscriber ts(op->getTopic(), op->getSubscriberId());
+ channel = getSubscriptionChannel(ts, op->isResubscribeRequest());
+ break;
}
- return *subscriber;
+ // write the pub/sub request
+ submitTo(op, channel);
}
-PublisherImpl& ClientImpl::getPublisherImpl() {
- if (publisher == NULL) {
- boost::lock_guard<boost::mutex> lock(publishercreate_lock);
- if (publisher == NULL) {
- publisher = new PublisherImpl(shared_from_this());
+// Submit a pub/sub request to target host
+void DuplexChannelManager::redirectOpToHost(const PubSubDataPtr& op, const HostAddress& addr) {
+ DuplexChannelPtr channel;
+ switch (op->getType()) {
+ case PUBLISH:
+ case UNSUBSCRIBE:
+ // check whether there is a channel existed for non-subscription requests
+ channel = getNonSubscriptionChannel(addr);
+ if (!channel.get()) {
+ channel = createNonSubscriptionChannel(addr);
+ channel = storeNonSubscriptionChannel(channel, true);
+ }
+ break;
+ default:
+ channel = getSubscriptionChannel(addr);
+ if (!channel.get()) {
+ channel = createSubscriptionChannel(addr);
+ channel = storeSubscriptionChannel(channel, true);
}
+ break;
}
- return *publisher;
+ // write the pub/sub request
+ submitTo(op, channel);
}
-ClientTxnCounter& ClientImpl::counter() {
- return counterobj;
+// Submit a pub/sub request to established request
+void DuplexChannelManager::submitOpThruChannel(const PubSubDataPtr& op,
+ const DuplexChannelPtr& ch) {
+ DuplexChannelPtr channel;
+ switch (op->getType()) {
+ case PUBLISH:
+ case UNSUBSCRIBE:
+ channel = storeNonSubscriptionChannel(ch, false);
+ break;
+ default:
+ channel = storeSubscriptionChannel(ch, false);
+ break;
+ }
+ // write the pub/sub request
+ submitTo(op, channel);
}
-void ClientImpl::redirectRequest(const DuplexChannelPtr& channel, PubSubDataPtr& data, const PubSubResponsePtr& response) {
- HostAddress oldhost = channel->getHostAddress();
- data->addTriedServer(oldhost);
-
- HostAddress h = HostAddress::fromString(response->statusmsg());
- if (data->hasTriedServer(h)) {
- LOG4CXX_ERROR(logger, "We've been told to try request [" << data->getTxnId() << "] with ["
- << h.getAddressString()<< "] by " << oldhost.getAddressString()
- << " but we've already tried that. Failing operation");
- data->getCallback()->operationFailed(InvalidRedirectException());
- return;
+// Submit a pub/sub request to default server
+void DuplexChannelManager::submitOpToDefaultServer(const PubSubDataPtr& op) {
+ DuplexChannelPtr channel;
+ switch (op->getType()) {
+ case PUBLISH:
+ case UNSUBSCRIBE:
+ channel = createNonSubscriptionChannel(defaultHost);
+ break;
+ default:
+ channel = createSubscriptionChannel(defaultHost);
+ break;
}
- LOG4CXX_DEBUG(logger, "We've been told [" << data->getTopic() << "] is on [" << h.getAddressString()
- << "] by [" << oldhost.getAddressString() << "]. Redirecting request " << data->getTxnId());
- data->setShouldClaim(true);
+ OperationCallbackPtr connectCallback(new DefaultServerConnectCallback(shared_from_this(),
+ channel, op));
+ // connect to default server. usually default server is a VIP, we only got the real
+ // IP address after connected. so before connected, we don't know the real target host.
+ // we only submit the request after channel is connected (ip address would be updated).
+ channel->connect(connectCallback);
+}
- setHostForTopic(data->getTopic(), h);
- DuplexChannelPtr newchannel;
- try {
- if (data->getType() == SUBSCRIBE) {
- // a redirect for subscription, kill old channel and remove old channel from all channels list
- // otherwise old channel will not be destroyed, caused lost of CLOSE_WAIT connections
- removeAndCloseChannel(channel);
-
- SubscriberClientChannelHandlerPtr handler(new SubscriberClientChannelHandler(shared_from_this(),
- this->getSubscriberImpl(), data));
- newchannel = createChannel(data->getTopic(), handler);
- handler->setChannel(newchannel);
- newchannel->connect();
- getSubscriberImpl().doSubscribe(newchannel, data, handler);
- } else if (data->getType() == PUBLISH) {
- newchannel = getChannel(data->getTopic());
- getPublisherImpl().doPublish(newchannel, data);
- } else {
- newchannel = getChannel(data->getTopic());
- getSubscriberImpl().doUnsubscribe(newchannel, data);
+DuplexChannelPtr DuplexChannelManager::getNonSubscriptionChannel(const std::string& topic) {
+ HostAddress addr;
+ {
+ boost::shared_lock<boost::shared_mutex> lock(topic2host_lock);
+ addr = topic2host[topic];
+ }
+ if (addr.isNullHost()) {
+ return DuplexChannelPtr();
+ } else {
+ // we had known which hub server owned the topic
+ DuplexChannelPtr ch = getNonSubscriptionChannel(addr);
+ if (ch.get()) {
+ return ch;
}
- } catch (ShuttingDownException& e) {
- return; // no point in redirecting if we're shutting down
+ ch = createNonSubscriptionChannel(addr);
+ return storeNonSubscriptionChannel(ch, true);
}
}
-ClientImpl::~ClientImpl() {
- LOG4CXX_DEBUG(logger, "deleting Clientimpl " << this);
+DuplexChannelPtr DuplexChannelManager::getNonSubscriptionChannel(const HostAddress& addr) {
+ boost::shared_lock<boost::shared_mutex> lock(host2channel_lock);
+ return host2channel[addr];
}
-DuplexChannelPtr ClientImpl::createChannel(const std::string& topic, const ChannelHandlerPtr& handler) {
- // get the host address
- // create a channel to the host
- HostAddress addr;
+DuplexChannelPtr DuplexChannelManager::createNonSubscriptionChannel(const HostAddress& addr) {
+ // Create a non-subscription channel handler
+ ChannelHandlerPtr handler(new HedwigClientChannelHandler(shared_from_this(),
+ nonSubscriptionHandlers));
+ // Create a non subscription channel
+ return createChannel(dispatcher->getService(), addr, handler);
+}
+
+DuplexChannelPtr DuplexChannelManager::storeNonSubscriptionChannel(const DuplexChannelPtr& ch,
+ bool doConnect) {
+ const HostAddress& host = ch->getHostAddress();
+
+ bool useOldCh;
+ DuplexChannelPtr oldCh;
{
- boost::lock_guard<boost::shared_mutex> lock(topic2host_lock);
- addr = topic2host[topic];
- }
- if (addr.isNullHost()) {
- addr = defaultHost;
- setHostForTopic(topic, addr);
+ boost::lock_guard<boost::shared_mutex> lock(host2channel_lock);
+
+ oldCh = host2channel[host];
+ if (!oldCh.get()) {
+ host2channel[host] = ch;
+ useOldCh = false;
+ } else {
+ // If we've reached here, that means we already have a Channel
+ // mapping for the given host. This should ideally not happen
+ // and it means we are creating another Channel to a server host
+ // to publish on when we could have used an existing one. This could
+ // happen due to a race condition if initially multiple concurrent
+ // threads are publishing on the same topic and no Channel exists
+ // currently to the server. We are not synchronizing this initial
+ // creation of Channels to a given host for performance.
+ // Another possible way to have redundant Channels created is if
+ // a new topic is being published to, we connect to the default
+ // server host which should be a VIP that redirects to a "real"
+ // server host. Since we don't know beforehand what is the full
+ // set of server hosts, we could be redirected to a server that
+ // we already have a channel connection to from a prior existing
+ // topic. Close these redundant channels as they won't be used.
+ useOldCh = true;
+ }
+ }
+ if (useOldCh) {
+ LOG4CXX_DEBUG(logger, "Channel " << oldCh.get() << " to host " << host
+ << " already exists so close channel " << ch.get() << ".");
+ ch->close();
+ return oldCh;
+ } else {
+ if (doConnect) {
+ ch->connect();
+ }
+ LOG4CXX_DEBUG(logger, "Storing channel " << ch.get() << " for host " << host << ".");
+ return ch;
}
+}
- DuplexChannelPtr channel = channelManager->createChannel(addr, handler);
+DuplexChannelPtr DuplexChannelManager::createChannel(IOServicePtr& service,
+ const HostAddress& addr,
+ const ChannelHandlerPtr& handler) {
+ DuplexChannelPtr channel;
+ if (sslEnabled) {
+ boost_ssl_context_ptr sslCtx = sslCtxFactory->createSSLContext(service->getService());
+ channel = DuplexChannelPtr(new AsioSSLDuplexChannel(service, sslCtx, addr, handler));
+ } else {
+ channel = DuplexChannelPtr(new AsioDuplexChannel(service, addr, handler));
+ }
boost::lock_guard<boost::shared_mutex> lock(allchannels_lock);
- if (shuttingDownFlag) {
+ if (closed) {
channel->close();
throw ShuttingDownException();
}
- // Don't connect here, otherwise connect callback may be triggered before setChannel
- // channel->connect();
-
allchannels.insert(channel);
- LOG4CXX_DEBUG(logger, "(create) All channels size: " << allchannels.size());
+ LOG4CXX_DEBUG(logger, "Created a channel to " << addr << ", all channels : " << allchannels.size());
return channel;
}
-DuplexChannelPtr ClientImpl::getChannel(const std::string& topic) {
- HostAddress addr;
- {
- boost::lock_guard<boost::shared_mutex> lock(topic2host_lock);
- addr = topic2host[topic];
- }
- if (addr.isNullHost()) {
- addr = defaultHost;
- setHostForTopic(topic, addr);
- }
- DuplexChannelPtr channel = host2channel[addr];
-
- if (channel.get() == 0) {
- LOG4CXX_DEBUG(logger, " No channel for topic, creating new channel.get() " << channel.get() << " addr " << addr.getAddressString());
- ChannelHandlerPtr handler(new HedwigClientChannelHandler(shared_from_this()));
- channel = createChannel(topic, handler);
- channel->connect();
+long DuplexChannelManager::nextTxnId() {
+ return counterobj.next();
+}
- boost::lock_guard<boost::shared_mutex> lock(host2channel_lock);
- host2channel[addr] = channel;
- }
+void DuplexChannelManager::setHostForTopic(const std::string& topic, const HostAddress& host) {
+ boost::lock_guard<boost::shared_mutex> h2clock(host2topics_lock);
+ boost::lock_guard<boost::shared_mutex> t2hlock(topic2host_lock);
+ topic2host[topic] = host;
+ TopicSetPtr ts = host2topics[host];
+ if (!ts.get()) {
+ ts = TopicSetPtr(new TopicSet());
+ host2topics[host] = ts;
+ }
+ ts->insert(topic);
+ LOG4CXX_DEBUG(logger, "Set ownership of topic " << topic << " to " << host << ".");
+}
- return channel;
+const HostAddress& DuplexChannelManager::getHostForTopic(const std::string& topic) {
+ boost::shared_lock<boost::shared_mutex> t2hlock(topic2host_lock);
+ return topic2host[topic];
}
-void ClientImpl::setHostForTopic(const std::string& topic, const HostAddress& host) {
- boost::lock_guard<boost::shared_mutex> lock(topic2host_lock);
- topic2host[topic] = host;
+void DuplexChannelManager::clearAllTopicsForHost(const HostAddress& addr) {
+ // remove topic mapping
+ boost::lock_guard<boost::shared_mutex> h2tlock(host2topics_lock);
+ boost::lock_guard<boost::shared_mutex> t2hlock(topic2host_lock);
+ Host2TopicsMap::iterator iter = host2topics.find(addr);
+ if (iter != host2topics.end()) {
+ for (TopicSet::iterator tsIter = iter->second->begin();
+ tsIter != iter->second->end(); ++tsIter) {
+ topic2host.erase(*tsIter);
+ }
+ host2topics.erase(iter);
+ }
}
-bool ClientImpl::shuttingDown() const {
- return shuttingDownFlag;
+void DuplexChannelManager::clearHostForTopic(const std::string& topic,
+ const HostAddress& addr) {
+ // remove topic mapping
+ boost::lock_guard<boost::shared_mutex> h2tlock(host2topics_lock);
+ boost::lock_guard<boost::shared_mutex> t2hlock(topic2host_lock);
+ Host2TopicsMap::iterator iter = host2topics.find(addr);
+ if (iter != host2topics.end()) {
+ iter->second->erase(topic);
+ }
+ topic2host.erase(topic);
}
/**
@@ -369,35 +589,120 @@ bool ClientImpl::shuttingDown() const {
This does not delete the channel. Some publishers or subscribers will still hold it and will be errored
when they try to do anything with it.
*/
-void ClientImpl::channelDied(const DuplexChannelPtr& channel) {
- if (shuttingDownFlag) {
- return;
- }
-
- boost::lock_guard<boost::shared_mutex> h2tlock(host2topics_lock);
- boost::lock_guard<boost::shared_mutex> h2clock(host2channel_lock);
- boost::lock_guard<boost::shared_mutex> t2hlock(topic2host_lock);
+void DuplexChannelManager::nonSubscriptionChannelDied(const DuplexChannelPtr& channel) {
// get host
HostAddress addr = channel->getHostAddress();
-
- for (Host2TopicsMap::iterator iter = host2topics.find(addr); iter != host2topics.end(); ++iter) {
- topic2host.erase((*iter).second);
+
+ // Clear the topic owner ship when a nonsubscription channel disconnected
+ clearAllTopicsForHost(addr);
+
+ // remove channel mapping
+ {
+ boost::lock_guard<boost::shared_mutex> h2clock(host2channel_lock);
+ host2channel.erase(addr);
+ }
+ removeChannel(channel);
+}
+
+void DuplexChannelManager::removeChannel(const DuplexChannelPtr& channel) {
+ {
+ boost::lock_guard<boost::shared_mutex> aclock(allchannels_lock);
+ allchannels.erase(channel); // channel should be deleted here
+ }
+ channel->close();
+}
+
+void DuplexChannelManager::start() {
+ // add non-subscription response handlers
+ nonSubscriptionHandlers[PUBLISH] =
+ ResponseHandlerPtr(new PublishResponseHandler(shared_from_this()));
+ nonSubscriptionHandlers[UNSUBSCRIBE] =
+ ResponseHandlerPtr(new UnsubscribeResponseHandler(shared_from_this()));
+
+ // start the dispatcher
+ dispatcher->start();
+}
+
+bool DuplexChannelManager::isClosed() {
+ boost::shared_lock<boost::shared_mutex> lock(allchannels_lock);
+ return closed;
+}
+
+void DuplexChannelManager::close() {
+ // stop the dispatcher
+ dispatcher->stop();
+ {
+ boost::lock_guard<boost::shared_mutex> lock(allchannels_lock);
+
+ closed = true;
+ for (ChannelMap::iterator iter = allchannels.begin(); iter != allchannels.end(); ++iter ) {
+ (*iter)->close();
+ }
+ allchannels.clear();
+ }
+
+ // Unregistered response handlers
+ nonSubscriptionHandlers.clear();
+ /* destruction of the maps will clean up any items they hold */
+}
+
+ClientImplPtr ClientImpl::Create(const Configuration& conf) {
+ ClientImplPtr impl(new ClientImpl(conf));
+ LOG4CXX_DEBUG(logger, "Creating Clientimpl " << impl);
+ impl->channelManager->start();
+ return impl;
+}
+
+void ClientImpl::Destroy() {
+ LOG4CXX_DEBUG(logger, "destroying Clientimpl " << this);
+
+ // close the channel manager
+ channelManager->close();
+
+ if (subscriber != NULL) {
+ delete subscriber;
+ subscriber = NULL;
+ }
+ if (publisher != NULL) {
+ delete publisher;
+ publisher = NULL;
}
- host2topics.erase(addr);
- host2channel.erase(addr);
- removeAndCloseChannel(channel);
}
-void ClientImpl::removeAndCloseChannel(const DuplexChannelPtr& channel) {
- boost::lock_guard<boost::shared_mutex> aclock(allchannels_lock);
- allchannels.erase(channel); // channel should be deleted here
- channel->close(); // close channel
+ClientImpl::ClientImpl(const Configuration& conf)
+ : conf(conf), publisher(NULL), subscriber(NULL)
+{
+ channelManager = DuplexChannelManager::create(conf);
}
-const Configuration& ClientImpl::getConfiguration() {
- return conf;
+Subscriber& ClientImpl::getSubscriber() {
+ return getSubscriberImpl();
}
-boost::asio::io_service& ClientImpl::getService() {
- return dispatcher->getService()->getService();
+Publisher& ClientImpl::getPublisher() {
+ return getPublisherImpl();
+}
+
+SubscriberImpl& ClientImpl::getSubscriberImpl() {
+ if (subscriber == NULL) {
+ boost::lock_guard<boost::mutex> lock(subscribercreate_lock);
+ if (subscriber == NULL) {
+ subscriber = new SubscriberImpl(channelManager);
+ }
+ }
+ return *subscriber;
+}
+
+PublisherImpl& ClientImpl::getPublisherImpl() {
+ if (publisher == NULL) {
+ boost::lock_guard<boost::mutex> lock(publishercreate_lock);
+ if (publisher == NULL) {
+ publisher = new PublisherImpl(channelManager);
+ }
+ }
+ return *publisher;
+}
+
+ClientImpl::~ClientImpl() {
+ LOG4CXX_DEBUG(logger, "deleting Clientimpl " << this);
}
Modified: zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/lib/clientimpl.h
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/lib/clientimpl.h?rev=1399578&r1=1399577&r2=1399578&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/lib/clientimpl.h (original)
+++ zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/lib/clientimpl.h Thu Oct 18 10:59:34 2012
@@ -164,22 +164,301 @@ namespace Hedwig {
int timeout;
};
+ class DuplexChannelManager;
+ typedef boost::shared_ptr<DuplexChannelManager> DuplexChannelManagerPtr;
+
+ //
+ // Hedwig Response Handler
+ //
+
+ // Response Handler used to process response for different types of requests
+ class ResponseHandler {
+ public:
+ ResponseHandler(const DuplexChannelManagerPtr& channelManager);
+ virtual ~ResponseHandler() {};
+
+ virtual void handleResponse(const PubSubResponsePtr& m, const PubSubDataPtr& txn,
+ const DuplexChannelPtr& channel) = 0;
+ protected:
+ // common method used to redirect request
+ void redirectRequest(const PubSubResponsePtr& response, const PubSubDataPtr& data,
+ const DuplexChannelPtr& channel);
+
+ // channel manager to manage all established channels
+ const DuplexChannelManagerPtr channelManager;
+ };
+
+ typedef boost::shared_ptr<ResponseHandler> ResponseHandlerPtr;
+ typedef std::tr1::unordered_map<OperationType, ResponseHandlerPtr, OperationTypeHash> ResponseHandlerMap;
+
+ class PubSubWriteCallback : public OperationCallback {
+ public:
+ PubSubWriteCallback(const DuplexChannelPtr& channel, const PubSubDataPtr& data);
+ virtual void operationComplete();
+ virtual void operationFailed(const std::exception& exception);
+ private:
+ DuplexChannelPtr channel;
+ PubSubDataPtr data;
+ };
+
+ class DefaultServerConnectCallback : public OperationCallback {
+ public:
+ DefaultServerConnectCallback(const DuplexChannelManagerPtr& channelManager,
+ const DuplexChannelPtr& channel,
+ const PubSubDataPtr& data);
+ virtual void operationComplete();
+ virtual void operationFailed(const std::exception& exception);
+ private:
+ DuplexChannelManagerPtr channelManager;
+ DuplexChannelPtr channel;
+ PubSubDataPtr data;
+ };
+
+ struct SubscriptionListenerPtrHash : public std::unary_function<SubscriptionListenerPtr, size_t> {
+ size_t operator()(const Hedwig::SubscriptionListenerPtr& listener) const {
+ return reinterpret_cast<size_t>(listener.get());
+ }
+ };
+
+ // Subscription Event Emitter
+ class SubscriptionEventEmitter {
+ public:
+ SubscriptionEventEmitter();
+
+ void addSubscriptionListener(SubscriptionListenerPtr& listener);
+ void removeSubscriptionListener(SubscriptionListenerPtr& listener);
+ void emitSubscriptionEvent(const std::string& topic,
+ const std::string& subscriberId,
+ const SubscriptionEvent event);
+
+ private:
+ typedef std::tr1::unordered_set<SubscriptionListenerPtr, SubscriptionListenerPtrHash> SubscriptionListenerSet;
+ SubscriptionListenerSet listeners;
+ boost::shared_mutex listeners_lock;
+ };
+
+ class SubscriberClientChannelHandler;
+
+ //
+ // Duplex Channel Manager to manage all established channels
+ //
+
+ class DuplexChannelManager : public boost::enable_shared_from_this<DuplexChannelManager> {
+ public:
+ static DuplexChannelManagerPtr create(const Configuration& conf);
+ virtual ~DuplexChannelManager();
+
+ inline const Configuration& getConfiguration() const {
+ return conf;
+ }
+
+ // Submit a pub/sub request
+ void submitOp(const PubSubDataPtr& op);
+
+ // Submit a pub/sub request to default host
+ // It is called only when client doesn't have the knowledge of topic ownership
+ void submitOpToDefaultServer(const PubSubDataPtr& op);
+
+ // Redirect pub/sub request to a target hosts
+ void redirectOpToHost(const PubSubDataPtr& op, const HostAddress& host);
+
+ // Submit a pub/sub request thru established channel
+ // It is called when connecting to default server to established a channel
+ void submitOpThruChannel(const PubSubDataPtr& op, const DuplexChannelPtr& channel);
+
+ // Generate next transaction id for pub/sub requests sending thru this manager
+ long nextTxnId();
+
+ // return default host
+ inline const HostAddress& getDefaultHost() { return defaultHost; }
+
+ // set the owner host of a topic
+ void setHostForTopic(const std::string& topic, const HostAddress& host);
+
+ // clear all topics that hosted by a hub server
+ void clearAllTopicsForHost(const HostAddress& host);
+
+ // clear host for a given topic
+ void clearHostForTopic(const std::string& topic, const HostAddress& host);
+
+ // Called when a channel is disconnected
+ void nonSubscriptionChannelDied(const DuplexChannelPtr& channel);
+
+ // Remove a channel from all channel map
+ void removeChannel(const DuplexChannelPtr& channel);
+
+ // Get the subscription channel handler for a given subscription
+ virtual boost::shared_ptr<SubscriberClientChannelHandler>
+ getSubscriptionChannelHandler(const TopicSubscriber& ts) = 0;
+
+ // Close subscription for a given subscription
+ virtual void asyncCloseSubscription(const TopicSubscriber& ts,
+ const OperationCallbackPtr& callback) = 0;
+
+ virtual void handoverDelivery(const TopicSubscriber& ts,
+ const MessageHandlerCallbackPtr& handler,
+ const ClientMessageFilterPtr& filter) = 0;
+
+ // start the channel manager
+ virtual void start();
+ // close the channel manager
+ virtual void close();
+ // whether the channel manager is closed
+ bool isClosed();
+
+ // Return an available service
+ inline boost::asio::io_service & getService() const {
+ return dispatcher->getService()->getService();
+ }
+
+ // Return the event emitter
+ inline SubscriptionEventEmitter& getEventEmitter() {
+ return eventEmitter;
+ }
+
+ protected:
+ DuplexChannelManager(const Configuration& conf);
+
+ // Get the ownership for a given topic.
+ const HostAddress& getHostForTopic(const std::string& topic);
+
+ //
+ // Channel Management
+ //
+
+ // Non subscription channel management
+
+ // Get a non subscription channel for a given topic
+ // If the topic's owner is known, retrieve a subscription channel to
+ // target host (if there is no channel existed, create one);
+ // If the topic's owner is unknown, return null
+ DuplexChannelPtr getNonSubscriptionChannel(const std::string& topic);
+
+ // Get an existed non subscription channel to a given host
+ DuplexChannelPtr getNonSubscriptionChannel(const HostAddress& addr);
+
+ // Create a non subscription channel to a given host
+ DuplexChannelPtr createNonSubscriptionChannel(const HostAddress& addr);
+
+ // Store the established non subscription channel
+ DuplexChannelPtr storeNonSubscriptionChannel(const DuplexChannelPtr& ch,
+ bool doConnect);
+
+ //
+ // Subscription Channel Management
+ //
+
+ // Get a subscription channel for a given subscription.
+ // If there is subscription channel established before, return it.
+ // Otherwise, check whether the topic's owner is known. If the topic owner
+ // is known, retrieve a subscription channel to target host (if there is no
+ // channel exsited, create one); If unknown, return null
+ virtual DuplexChannelPtr getSubscriptionChannel(const TopicSubscriber& ts,
+ const bool isResubscribeRequest) = 0;
+
+ // Get an existed subscription channel to a given host
+ virtual DuplexChannelPtr getSubscriptionChannel(const HostAddress& addr) = 0;
+
+ // Create a subscription channel to a given host
+ // If store is true, store the channel for future usage.
+ // If store is false, return a newly created channel.
+ virtual DuplexChannelPtr createSubscriptionChannel(const HostAddress& addr) = 0;
+
+ // Store the established subscription channel
+ virtual DuplexChannelPtr storeSubscriptionChannel(const DuplexChannelPtr& ch,
+ bool doConnect) = 0;
+
+ //
+ // Raw Channel Management
+ //
+
+ // Create a raw channel
+ DuplexChannelPtr createChannel(IOServicePtr& service,
+ const HostAddress& addr, const ChannelHandlerPtr& handler);
+
+ // event dispatcher running io threads
+ typedef boost::shared_ptr<EventDispatcher> EventDispatcherPtr;
+ EventDispatcherPtr dispatcher;
+
+ // topic2host mapping for topic ownership
+ std::tr1::unordered_map<std::string, HostAddress> topic2host;
+ boost::shared_mutex topic2host_lock;
+ typedef std::tr1::unordered_set<std::string> TopicSet;
+ typedef boost::shared_ptr<TopicSet> TopicSetPtr;
+ typedef std::tr1::unordered_map<HostAddress, TopicSetPtr, HostAddressHash> Host2TopicsMap;
+ Host2TopicsMap host2topics;
+ boost::shared_mutex host2topics_lock;
+ private:
+ // write the request to target channel
+ void submitTo(const PubSubDataPtr& op, const DuplexChannelPtr& channel);
+
+ const Configuration& conf;
+ bool sslEnabled;
+ SSLContextFactoryPtr sslCtxFactory;
+
+ // whether the channel manager is shutting down
+ bool closed;
+
+ // counter used for generating transaction ids
+ ClientTxnCounter counterobj;
+
+ // default host
+ HostAddress defaultHost;
+
+ // non-subscription channels
+ std::tr1::unordered_map<HostAddress, DuplexChannelPtr, HostAddressHash > host2channel;
+ boost::shared_mutex host2channel_lock;
+
+ // maintain all established channels
+ typedef std::tr1::unordered_set<DuplexChannelPtr, DuplexChannelPtrHash > ChannelMap;
+ ChannelMap allchannels;
+ boost::shared_mutex allchannels_lock;
+
+ // Response Handlers for non-subscription requests
+ ResponseHandlerMap nonSubscriptionHandlers;
+
+ // Subscription Event Emitter
+ SubscriptionEventEmitter eventEmitter;
+ };
+
+ //
+ // Hedwig Client Channel Handler to handle responses received from the channel
+ //
+
class HedwigClientChannelHandler : public ChannelHandler {
public:
- HedwigClientChannelHandler(const ClientImplPtr& client);
+ HedwigClientChannelHandler(const DuplexChannelManagerPtr& channelManager,
+ ResponseHandlerMap& handlers);
+ virtual ~HedwigClientChannelHandler() {}
virtual void messageReceived(const DuplexChannelPtr& channel, const PubSubResponsePtr& m);
virtual void channelConnected(const DuplexChannelPtr& channel);
virtual void channelDisconnected(const DuplexChannelPtr& channel, const std::exception& e);
virtual void exceptionOccurred(const DuplexChannelPtr& channel, const std::exception& e);
-
+
+ void close();
protected:
- const ClientImplPtr client;
+ // real channel disconnected logic
+ virtual void onChannelDisconnected(const DuplexChannelPtr& channel);
+
+ // real close logic
+ virtual void doClose();
+
+ // channel manager to manage all established channels
+ const DuplexChannelManagerPtr channelManager;
+ ResponseHandlerMap& handlers;
+
+ boost::shared_mutex close_lock;
+ // Boolean indicating if we closed the handler explicitly or not.
+ // If so, we do not need to do the channel disconnected logic here.
+ bool closed;
+ // whether channel is disconnected.
+ bool disconnected;
};
class PublisherImpl;
class SubscriberImpl;
-
+
/**
Implementation of the hedwig client. This class takes care of globals such as the topic->host map and the transaction id counter.
*/
@@ -191,30 +470,9 @@ namespace Hedwig {
Subscriber& getSubscriber();
Publisher& getPublisher();
- ClientTxnCounter& counter();
-
- void redirectRequest(const DuplexChannelPtr& channel, PubSubDataPtr& data, const PubSubResponsePtr& response);
-
- const HostAddress& getHostForTopic(const std::string& topic);
-
- //DuplexChannelPtr getChannelForTopic(const std::string& topic, OperationCallback& callback);
- //DuplexChannelPtr createChannelForTopic(const std::string& topic, ChannelHandlerPtr& handler, OperationCallback& callback);
- DuplexChannelPtr createChannel(const std::string& topic, const ChannelHandlerPtr& handler);
- DuplexChannelPtr getChannel(const std::string& topic);
-
- void setHostForTopic(const std::string& topic, const HostAddress& host);
-
- void setChannelForHost(const HostAddress& address, const DuplexChannelPtr& channel);
- void channelDied(const DuplexChannelPtr& channel);
- void removeAndCloseChannel(const DuplexChannelPtr& channel);
- bool shuttingDown() const;
-
SubscriberImpl& getSubscriberImpl();
PublisherImpl& getPublisherImpl();
- const Configuration& getConfiguration();
- boost::asio::io_service& getService();
-
~ClientImpl();
private:
ClientImpl(const Configuration& conf);
@@ -227,27 +485,8 @@ namespace Hedwig {
boost::mutex subscribercreate_lock;
SubscriberImpl* subscriber;
- ClientTxnCounter counterobj;
-
- typedef boost::shared_ptr<EventDispatcher> EventDispatcherPtr;
- EventDispatcherPtr dispatcher;
+ // channel manager manage all channels for the client
DuplexChannelManagerPtr channelManager;
-
- typedef std::tr1::unordered_multimap<HostAddress, std::string, HostAddressHash > Host2TopicsMap;
- Host2TopicsMap host2topics;
- boost::shared_mutex host2topics_lock;
- HostAddress defaultHost;
-
- std::tr1::unordered_map<HostAddress, DuplexChannelPtr, HostAddressHash > host2channel;
- boost::shared_mutex host2channel_lock;
- std::tr1::unordered_map<std::string, HostAddress> topic2host;
- boost::shared_mutex topic2host_lock;
-
- typedef std::tr1::unordered_set<DuplexChannelPtr, DuplexChannelPtrHash > ChannelMap;
- ChannelMap allchannels;
- boost::shared_mutex allchannels_lock;
-
- bool shuttingDownFlag;
};
};
#endif
Modified: zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/lib/data.cpp
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/lib/data.cpp?rev=1399578&r1=1399577&r2=1399578&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/lib/data.cpp (original)
+++ zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/lib/data.cpp Thu Oct 18 10:59:34 2012
@@ -23,11 +23,23 @@
#include "data.h"
#include <log4cxx/logger.h>
+#include <iostream>
+
+#define stringify( name ) #name
static log4cxx::LoggerPtr logger(log4cxx::Logger::getLogger("hedwig."__FILE__));
using namespace Hedwig;
+const char* OPERATION_TYPE_NAMES[] = {
+ stringify( PUBLISH ),
+ stringify( SUBSCRIBE ),
+ stringify( CONSUME ),
+ stringify( UNSUBSCRIBE ),
+ stringify( START_DELIVERY ),
+ stringify( STOP_DELIVERY )
+};
+
PubSubDataPtr PubSubData::forPublishRequest(long txnid, const std::string& topic, const Message& body,
const ResponseCallbackPtr& callback) {
PubSubDataPtr ptr(new PubSubData());
@@ -186,3 +198,57 @@ const std::string& PubSubData::getSubscr
const SubscriptionOptions& PubSubData::getSubscriptionOptions() const {
return options;
}
+
+void PubSubData::setOrigChannelForResubscribe(
+ boost::shared_ptr<DuplexChannel>& channel) {
+ this->origChannel = channel;
+}
+
+boost::shared_ptr<DuplexChannel>& PubSubData::getOrigChannelForResubscribe() {
+ return this->origChannel;
+}
+
+bool PubSubData::isResubscribeRequest() {
+ return 0 != this->origChannel.get();
+}
+
+ClientTxnCounter::ClientTxnCounter() : counter(0)
+{
+}
+
+ClientTxnCounter::~ClientTxnCounter() {
+}
+
+/**
+Increment the transaction counter and return the new value.
+
+@returns the next transaction id
+*/
+long ClientTxnCounter::next() { // would be nice to remove lock from here, look more into it
+ boost::lock_guard<boost::mutex> lock(mutex);
+
+ long next= ++counter;
+
+ return next;
+}
+
+std::ostream& Hedwig::operator<<(std::ostream& os, const PubSubData& data) {
+ OperationType type = data.getType();
+ os << "[" << OPERATION_TYPE_NAMES[type] << " request (txn:" << data.getTxnId()
+ << ") for (topic:" << data.getTopic();
+ switch (type) {
+ case SUBSCRIBE:
+ case UNSUBSCRIBE:
+ os << ", subscriber:" << data.getSubscriberId() << ")";
+ break;
+ case CONSUME:
+ os << ", subscriber:" << data.getSubscriberId() << ", seq:"
+ << data.getMessageSeqId().localcomponent() << ")";
+ break;
+ case PUBLISH:
+ default:
+ os << ")";
+ break;
+ }
+ return os;
+}
Modified: zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/lib/data.h
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/lib/data.h?rev=1399578&r1=1399577&r2=1399578&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/lib/data.h (original)
+++ zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/lib/data.h Thu Oct 18 10:59:34 2012
@@ -23,6 +23,7 @@
#include <hedwig/callback.h>
#include <pthread.h>
+#include <iostream>
#ifdef USE_BOOST_TR1
#include <boost/tr1/unordered_set.hpp>
@@ -57,6 +58,8 @@ namespace Hedwig {
typedef boost::shared_ptr<PubSubRequest> PubSubRequestPtr;
typedef boost::shared_ptr<PubSubResponse> PubSubResponsePtr;
+ class DuplexChannel;
+
/**
Data structure to hold information about requests and build request messages.
Used to store requests which may need to be resent to another server.
@@ -92,6 +95,12 @@ namespace Hedwig {
void addTriedServer(HostAddress& h);
bool hasTriedServer(HostAddress& h);
void clearTriedServers();
+
+ void setOrigChannelForResubscribe(boost::shared_ptr<DuplexChannel>& channel);
+ bool isResubscribeRequest();
+ boost::shared_ptr<DuplexChannel>& getOrigChannelForResubscribe();
+
+ friend std::ostream& operator<<(std::ostream& os, const PubSubData& data);
private:
PubSubData();
@@ -110,7 +119,9 @@ namespace Hedwig {
SubscriptionOptions options;
MessageSeqId msgid;
std::tr1::unordered_set<HostAddress, HostAddressHash > triedservers;
+ // record the origChannel for a resubscribe request
+ boost::shared_ptr<DuplexChannel> origChannel;
};
-
+
};
#endif
Modified: zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/lib/eventdispatcher.cpp
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/lib/eventdispatcher.cpp?rev=1399578&r1=1399577&r2=1399578&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/lib/eventdispatcher.cpp (original)
+++ zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/lib/eventdispatcher.cpp Thu Oct 18 10:59:34 2012
@@ -66,11 +66,13 @@ EventDispatcher::EventDispatcher(const C
num_threads = conf.getInt(Configuration::NUM_DISPATCH_THREADS,
DEFAULT_NUM_DISPATCH_THREADS);
if (0 == num_threads) {
+ LOG4CXX_ERROR(logger, "Number of threads in dispatcher is zero");
throw std::runtime_error("number of threads in dispatcher is zero");
}
for (size_t i = 0; i < num_threads; i++) {
services.push_back(IOServicePtr(new IOService()));
}
+ LOG4CXX_DEBUG(logger, "Created EventDispatcher " << this);
}
void EventDispatcher::run_forever(IOServicePtr service, size_t idx) {
Modified: zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/lib/publisherimpl.cpp
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/lib/publisherimpl.cpp?rev=1399578&r1=1399577&r2=1399578&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/lib/publisherimpl.cpp (original)
+++ zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/lib/publisherimpl.cpp Thu Oct 18 10:59:34 2012
@@ -19,11 +19,12 @@
#include <config.h>
#endif
+#include <string>
+#include <log4cxx/logger.h>
+
#include "publisherimpl.h"
#include "channel.h"
-#include <log4cxx/logger.h>
-
static log4cxx::LoggerPtr logger(log4cxx::Logger::getLogger("hedwig."__FILE__));
using namespace Hedwig;
@@ -48,31 +49,19 @@ void PublishResponseAdaptor::operationFa
pubCallback->operationFailed(exception);
}
-PublishWriteCallback::PublishWriteCallback(const ClientImplPtr& client, const PubSubDataPtr& data) : client(client), data(data) {}
-
-void PublishWriteCallback::operationComplete() {
- LOG4CXX_DEBUG(logger, "Successfully wrote transaction: " << data->getTxnId());
-}
-
-void PublishWriteCallback::operationFailed(const std::exception& exception) {
- LOG4CXX_ERROR(logger, "Error writing to publisher " << exception.what());
-
- data->getCallback()->operationFailed(exception);
-}
-
-PublisherImpl::PublisherImpl(const ClientImplPtr& client)
- : client(client) {
+PublisherImpl::PublisherImpl(const DuplexChannelManagerPtr& channelManager)
+ : channelManager(channelManager) {
}
PublishResponsePtr PublisherImpl::publish(const std::string& topic, const Message& message) {
- SyncCallback<PublishResponsePtr>* cb =
- new SyncCallback<PublishResponsePtr>(client->getConfiguration().getInt(Configuration::SYNC_REQUEST_TIMEOUT,
- DEFAULT_SYNC_REQUEST_TIMEOUT));
+ SyncCallback<PublishResponsePtr>* cb = new SyncCallback<PublishResponsePtr>(
+ channelManager->getConfiguration().getInt(Configuration::SYNC_REQUEST_TIMEOUT,
+ DEFAULT_SYNC_REQUEST_TIMEOUT));
PublishResponseCallbackPtr callback(cb);
asyncPublishWithResponse(topic, message, callback);
cb->wait();
-
- cb->throwExceptionIfNeeded();
+
+ cb->throwExceptionIfNeeded();
return cb->getResult();
}
@@ -82,13 +71,15 @@ PublishResponsePtr PublisherImpl::publis
return publish(topic, msg);
}
-void PublisherImpl::asyncPublish(const std::string& topic, const Message& message, const OperationCallbackPtr& callback) {
+void PublisherImpl::asyncPublish(const std::string& topic, const Message& message,
+ const OperationCallbackPtr& callback) {
// use release after callback to release the channel after the callback is called
ResponseCallbackPtr respCallback(new ResponseCallbackAdaptor(callback));
doPublish(topic, message, respCallback);
}
-void PublisherImpl::asyncPublish(const std::string& topic, const std::string& message, const OperationCallbackPtr& callback) {
+void PublisherImpl::asyncPublish(const std::string& topic, const std::string& message,
+ const OperationCallbackPtr& callback) {
Message msg;
msg.set_body(message);
asyncPublish(topic, msg, callback);
@@ -100,22 +91,26 @@ void PublisherImpl::asyncPublishWithResp
doPublish(topic, message, respCallback);
}
-void PublisherImpl::doPublish(const std::string& topic, const Message& message, const ResponseCallbackPtr& callback) {
- PubSubDataPtr data = PubSubData::forPublishRequest(client->counter().next(), topic, message, callback);
-
- DuplexChannelPtr channel = client->getChannel(topic);
-
- doPublish(channel, data);
-}
-
-void PublisherImpl::doPublish(const DuplexChannelPtr& channel, const PubSubDataPtr& data) {
- channel->storeTransaction(data);
-
- OperationCallbackPtr writecb(new PublishWriteCallback(client, data));
- channel->writeRequest(data->getRequest(), writecb);
-}
-
-void PublisherImpl::messageHandler(const PubSubResponsePtr& m, const PubSubDataPtr& txn) {
+void PublisherImpl::doPublish(const std::string& topic, const Message& message,
+ const ResponseCallbackPtr& callback) {
+ PubSubDataPtr data = PubSubData::forPublishRequest(channelManager->nextTxnId(),
+ topic, message, callback);
+ LOG4CXX_INFO(logger, "Publish message (topic:" << data->getTopic() << ", txn:"
+ << data->getTxnId() << ").");
+ channelManager->submitOp(data);
+}
+
+//
+// Publish Response Handler
+//
+PublishResponseHandler::PublishResponseHandler(const DuplexChannelManagerPtr& channelManager)
+ : ResponseHandler(channelManager) {
+ LOG4CXX_DEBUG(logger, "Created PublishResponseHandler for ChannelManager " << channelManager.get());
+}
+
+void PublishResponseHandler::handleResponse(const PubSubResponsePtr& m,
+ const PubSubDataPtr& txn,
+ const DuplexChannelPtr& channel) {
switch (m->statuscode()) {
case SUCCESS:
if (m->has_responsebody()) {
@@ -128,6 +123,9 @@ void PublisherImpl::messageHandler(const
LOG4CXX_ERROR(logger, "Server responsed with SERVICE_DOWN for " << txn->getTxnId());
txn->getCallback()->operationFailed(ServiceDownException());
break;
+ case NOT_RESPONSIBLE_FOR_TOPIC:
+ redirectRequest(m, txn, channel);
+ break;
default:
LOG4CXX_ERROR(logger, "Unexpected response " << m->statuscode() << " for " << txn->getTxnId());
txn->getCallback()->operationFailed(UnexpectedResponseException());
Modified: zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/lib/publisherimpl.h
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/lib/publisherimpl.h?rev=1399578&r1=1399577&r2=1399578&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/lib/publisherimpl.h (original)
+++ zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/lib/publisherimpl.h Thu Oct 18 10:59:34 2012
@@ -34,20 +34,18 @@ namespace Hedwig {
PublishResponseCallbackPtr pubCallback;
};
- class PublishWriteCallback : public OperationCallback {
+ class PublishResponseHandler : public ResponseHandler {
public:
- PublishWriteCallback(const ClientImplPtr& client, const PubSubDataPtr& data);
+ PublishResponseHandler(const DuplexChannelManagerPtr& channelManager);
+ virtual ~PublishResponseHandler() {};
- void operationComplete();
- void operationFailed(const std::exception& exception);
- private:
- ClientImplPtr client;
- PubSubDataPtr data;
+ virtual void handleResponse(const PubSubResponsePtr& m, const PubSubDataPtr& txn,
+ const DuplexChannelPtr& channel);
};
class PublisherImpl : public Publisher {
public:
- PublisherImpl(const ClientImplPtr& client);
+ PublisherImpl(const DuplexChannelManagerPtr& channelManager);
PublishResponsePtr publish(const std::string& topic, const std::string& message);
PublishResponsePtr publish(const std::string& topic, const Message& message);
@@ -56,14 +54,11 @@ namespace Hedwig {
void asyncPublish(const std::string& topic, const Message& message, const OperationCallbackPtr& callback);
void asyncPublishWithResponse(const std::string& topic, const Message& messsage,
const PublishResponseCallbackPtr& callback);
-
- void messageHandler(const PubSubResponsePtr& m, const PubSubDataPtr& txn);
-
- void doPublish(const std::string& topic, const Message& message, const ResponseCallbackPtr& callback);
- void doPublish(const DuplexChannelPtr& channel, const PubSubDataPtr& data);
+ void doPublish(const std::string& topic, const Message& message,
+ const ResponseCallbackPtr& callback);
private:
- ClientImplPtr client;
+ DuplexChannelManagerPtr channelManager;
};
};
Added: zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/lib/simplesubscriberimpl.cpp
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/lib/simplesubscriberimpl.cpp?rev=1399578&view=auto
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/lib/simplesubscriberimpl.cpp (added)
+++ zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/lib/simplesubscriberimpl.cpp Thu Oct 18 10:59:34 2012
@@ -0,0 +1,470 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifdef HAVE_CONFIG_H
+#include <config.h>
+#endif
+
+#include <log4cxx/logger.h>
+
+#include "simplesubscriberimpl.h"
+#include "util.h"
+
+static log4cxx::LoggerPtr logger(log4cxx::Logger::getLogger("hedwig."__FILE__));
+
+using namespace Hedwig;
+
+const int DEFAULT_MAX_MESSAGE_QUEUE_SIZE = 10;
+
+SimpleActiveSubscriber::SimpleActiveSubscriber(const PubSubDataPtr& data,
+ const AbstractDuplexChannelPtr& channel,
+ const SubscriptionPreferencesPtr& preferences,
+ const DuplexChannelManagerPtr& channelManager)
+ : ActiveSubscriber(data, channel, preferences, channelManager) {
+ maxQueueLen = channelManager->getConfiguration().getInt(Configuration::MAX_MESSAGE_QUEUE_SIZE,
+ DEFAULT_MAX_MESSAGE_QUEUE_SIZE);
+}
+
+void SimpleActiveSubscriber::doStartDelivery(const MessageHandlerCallbackPtr& handler,
+ const ClientMessageFilterPtr& filter) {
+ ActiveSubscriber::doStartDelivery(handler, filter);
+ // put channel#startReceiving out of lock of subscriber#queue_lock
+ // otherwise we enter dead lock
+ // subscriber#startDelivery(subscriber#queue_lock) =>
+ // channel#startReceiving(channel#receiving_lock) =>
+ channel->startReceiving();
+}
+
+void SimpleActiveSubscriber::doStopDelivery() {
+ channel->stopReceiving();
+}
+
+void SimpleActiveSubscriber::queueMessage(const PubSubResponsePtr& m) {
+ ActiveSubscriber::queueMessage(m);
+
+ if (queue.size() >= maxQueueLen) {
+ channel->stopReceiving();
+ }
+}
+
+CloseSubscriptionCallback::CloseSubscriptionCallback(const ActiveSubscriberPtr& activeSubscriber)
+ : activeSubscriber(activeSubscriber) {
+}
+
+void CloseSubscriptionCallback::operationComplete() {
+ finish();
+}
+
+void CloseSubscriptionCallback::operationFailed(const std::exception& e) {
+ finish();
+}
+
+void CloseSubscriptionCallback::finish() {
+ // Process the disconnect logic after cleaning up
+ activeSubscriber->processEvent(activeSubscriber->getTopic(),
+ activeSubscriber->getSubscriberId(),
+ TOPIC_MOVED);
+}
+
+SimpleSubscriberClientChannelHandler::SimpleSubscriberClientChannelHandler(
+ const DuplexChannelManagerPtr& channelManager, ResponseHandlerMap& handlers)
+ : SubscriberClientChannelHandler(channelManager, handlers) {
+}
+
+bool SimpleSubscriberClientChannelHandler::setActiveSubscriber(
+ const PubSubDataPtr& op, const SubscriptionPreferencesPtr& preferences) {
+ boost::lock_guard<boost::shared_mutex> lock(subscriber_lock);
+ if (subscriber.get()) {
+ LOG4CXX_ERROR(logger, *subscriber << " has been found alive on channel " << channel.get());
+ return false;
+ }
+ subscriber = ActiveSubscriberPtr(new SimpleActiveSubscriber(op, channel, preferences,
+ channelManager));
+ return true;
+}
+
+void SimpleSubscriberClientChannelHandler::deliverMessage(const TopicSubscriber& ts,
+ const PubSubResponsePtr& m) {
+ ActiveSubscriberPtr as = getActiveSubscriber();
+ if (!as.get()) {
+ LOG4CXX_ERROR(logger, "No Active Subscriber found alive on channel " << channel.get());
+ return;
+ }
+ as->deliverMessage(m);
+}
+
+void SimpleSubscriberClientChannelHandler::startDelivery(const TopicSubscriber& ts,
+ const MessageHandlerCallbackPtr& handler,
+ const ClientMessageFilterPtr& filter) {
+ ActiveSubscriberPtr as = getActiveSubscriber();
+ if (!as.get()) {
+ LOG4CXX_ERROR(logger, "No Active Subscriber found alive on channel " << channel.get());
+ throw NotSubscribedException();
+ }
+ as->startDelivery(handler, filter);
+}
+
+void SimpleSubscriberClientChannelHandler::stopDelivery(const TopicSubscriber& ts) {
+ ActiveSubscriberPtr as = getActiveSubscriber();
+ if (!as.get()) {
+ LOG4CXX_ERROR(logger, "No Active Subscriber found alive on channel " << channel.get());
+ throw NotSubscribedException();
+ }
+ as->stopDelivery();
+}
+
+bool SimpleSubscriberClientChannelHandler::hasSubscription(const TopicSubscriber& ts) {
+ ActiveSubscriberPtr as = getActiveSubscriber();
+ if (!as.get()) {
+ return false;
+ }
+ return ts.first == as->getTopic() && ts.second == as->getSubscriberId();
+}
+
+void SimpleSubscriberClientChannelHandler::asyncCloseSubscription(
+ const TopicSubscriber& ts, const OperationCallbackPtr& callback) {
+ // just remove the active subscriber
+ ActiveSubscriberPtr as = getActiveSubscriber();
+ if (as.get()) {
+ as->close();
+ clearActiveSubscriber();
+ }
+ callback->operationComplete();
+}
+
+void SimpleSubscriberClientChannelHandler::consume(const TopicSubscriber& ts,
+ const MessageSeqId& messageSeqId) {
+ ActiveSubscriberPtr as = getActiveSubscriber();
+ if (!as.get()) {
+ LOG4CXX_ERROR(logger, "No Active Subscriber found alive on channel " << channel.get());
+ return;
+ }
+ as->consume(messageSeqId);
+}
+
+void SimpleSubscriberClientChannelHandler::onChannelDisconnected(
+ const DuplexChannelPtr& channel) {
+ ActiveSubscriberPtr as = getActiveSubscriber();
+ if (!as.get()) {
+ LOG4CXX_ERROR(logger, "No Active Subscriber found when channel " << channel.get()
+ << " disconnected.");
+ // no active subscriber found, but we still need to close the channel
+ channelManager->removeChannel(channel);
+ return;
+ }
+ // Clear the topic owner ship
+ channelManager->clearHostForTopic(as->getTopic(), channel->getHostAddress());
+
+ // When the channel disconnected, if resubscribe is required, we would just
+ // cleanup the old channel when resubscribe succeed.
+ // Otherwise, we would cleanup the old channel then notify with a TOPIC_MOVED event
+ LOG4CXX_INFO(logger, "Tell " << *as << " his channel " << channel.get() << " is disconnected.");
+ if (!as->isResubscribeRequired()) {
+ OperationCallbackPtr closeCb(new CloseSubscriptionCallback(as));
+ TopicSubscriber ts(as->getTopic(), as->getSubscriberId());
+ channelManager->asyncCloseSubscription(ts, closeCb);
+ } else {
+ as->processEvent(as->getTopic(), as->getSubscriberId(), TOPIC_MOVED);
+ }
+}
+
+void SimpleSubscriberClientChannelHandler::closeHandler() {
+ // just remove the active subscriber
+ ActiveSubscriberPtr as = getActiveSubscriber();
+ if (as.get()) {
+ as->close();
+ clearActiveSubscriber();
+ LOG4CXX_DEBUG(logger, "Closed " << *as << ".");
+ }
+}
+
+//
+// Subscribe Response Handler
+//
+SimpleSubscribeResponseHandler::SimpleSubscribeResponseHandler(
+ const SimpleDuplexChannelManagerPtr& channelManager)
+ : ResponseHandler(boost::dynamic_pointer_cast<DuplexChannelManager>(channelManager)),
+ sChannelManager(channelManager) {
+}
+
+void SimpleSubscribeResponseHandler::handleSuccessResponse(
+ const PubSubResponsePtr& m, const PubSubDataPtr& txn,
+ const SimpleSubscriberClientChannelHandlerPtr& handler) {
+ // for subscribe request, check whether is any subscription preferences received
+ SubscriptionPreferencesPtr preferences;
+ if (m->has_responsebody()) {
+ const ResponseBody& respBody = m->responsebody();
+ if (respBody.has_subscriberesponse()) {
+ const SubscribeResponse& resp = respBody.subscriberesponse();
+ if (resp.has_preferences()) {
+ preferences = SubscriptionPreferencesPtr(new SubscriptionPreferences(resp.preferences()));
+ }
+ }
+ }
+
+ handler->setActiveSubscriber(txn, preferences);
+ TopicSubscriber ts(txn->getTopic(), txn->getSubscriberId());
+ if (!sChannelManager->storeSubscriptionChannelHandler(ts, txn, handler)) {
+ // found existed subscription channel handler
+ handler->close();
+ if (txn->isResubscribeRequest()) {
+ txn->getCallback()->operationFailed(ResubscribeException());
+ } else {
+ txn->getCallback()->operationFailed(AlreadySubscribedException());
+ }
+ return;
+ }
+ if (m->has_responsebody()) {
+ txn->getCallback()->operationComplete(m->responsebody());
+ } else {
+ txn->getCallback()->operationComplete(ResponseBody());
+ }
+}
+
+void SimpleSubscribeResponseHandler::handleResponse(const PubSubResponsePtr& m,
+ const PubSubDataPtr& txn,
+ const DuplexChannelPtr& channel) {
+ if (!txn.get()) {
+ LOG4CXX_ERROR(logger, "Invalid transaction recevied from channel " << channel.get());
+ return;
+ }
+
+ LOG4CXX_DEBUG(logger, "message received with status " << m->statuscode()
+ << " from channel " << channel.get());
+
+ SimpleSubscriberClientChannelHandlerPtr handler =
+ boost::dynamic_pointer_cast<SimpleSubscriberClientChannelHandler>(channel->getChannelHandler());
+ if (!handler.get()) {
+ LOG4CXX_ERROR(logger, "No simple subscriber client channel handler found for channel "
+ << channel.get() << ".");
+ // No channel handler, but we still need to close the channel
+ channel->close();
+ txn->getCallback()->operationFailed(NoChannelHandlerException());
+ return;
+ }
+
+ if (SUCCESS != m->statuscode()) {
+ // Subscribe request doesn't succeed, we close the handle and its binding channel
+ handler->close();
+ }
+
+ switch (m->statuscode()) {
+ case SUCCESS:
+ handleSuccessResponse(m, txn, handler);
+ break;
+ case SERVICE_DOWN:
+ txn->getCallback()->operationFailed(ServiceDownException());
+ break;
+ case CLIENT_ALREADY_SUBSCRIBED:
+ case TOPIC_BUSY:
+ txn->getCallback()->operationFailed(AlreadySubscribedException());
+ break;
+ case CLIENT_NOT_SUBSCRIBED:
+ txn->getCallback()->operationFailed(NotSubscribedException());
+ break;
+ case NOT_RESPONSIBLE_FOR_TOPIC:
+ redirectRequest(m, txn, channel);
+ break;
+ default:
+ LOG4CXX_ERROR(logger, "Unexpected response " << m->statuscode() << " for " << txn->getTxnId());
+ txn->getCallback()->operationFailed(UnexpectedResponseException());
+ break;
+ }
+}
+
+//
+// Simple Duplex Channel Manager
+//
+SimpleDuplexChannelManager::SimpleDuplexChannelManager(const Configuration& conf)
+ : DuplexChannelManager(conf) {
+ LOG4CXX_DEBUG(logger, "Created SimpleDuplexChannelManager " << this);
+}
+
+SimpleDuplexChannelManager::~SimpleDuplexChannelManager() {
+ LOG4CXX_DEBUG(logger, "Destroyed SimpleDuplexChannelManager " << this);
+}
+
+void SimpleDuplexChannelManager::start() {
+ // Add subscribe response handler
+ subscriptionHandlers[SUBSCRIBE] =
+ ResponseHandlerPtr(new SimpleSubscribeResponseHandler(
+ boost::dynamic_pointer_cast<SimpleDuplexChannelManager>(shared_from_this())));
+ DuplexChannelManager::start();
+}
+
+void SimpleDuplexChannelManager::close() {
+ DuplexChannelManager::close();
+ subscriptionHandlers.clear();
+}
+
+SubscriberClientChannelHandlerPtr
+SimpleDuplexChannelManager::getSubscriptionChannelHandler(const TopicSubscriber& ts) {
+ return boost::dynamic_pointer_cast<SubscriberClientChannelHandler>(
+ getSimpleSubscriptionChannelHandler(ts));
+}
+
+const SimpleSubscriberClientChannelHandlerPtr&
+SimpleDuplexChannelManager::getSimpleSubscriptionChannelHandler(const TopicSubscriber& ts) {
+ boost::shared_lock<boost::shared_mutex> lock(topicsubscriber2handler_lock);
+ return topicsubscriber2handler[ts];
+}
+
+DuplexChannelPtr SimpleDuplexChannelManager::getSubscriptionChannel(
+ const TopicSubscriber& ts, const bool isResubscribeRequest) {
+ SimpleSubscriberClientChannelHandlerPtr handler;
+ // for resubscribe request, we forced a new subscription channel
+ if (!isResubscribeRequest) {
+ handler = getSimpleSubscriptionChannelHandler(ts);
+ }
+ // found a live subscription channel
+ if (handler.get()) {
+ return boost::dynamic_pointer_cast<DuplexChannel>(handler->getChannel());
+ }
+ const HostAddress& addr = getHostForTopic(ts.first);
+ if (addr.isNullHost()) {
+ return DuplexChannelPtr();
+ } else {
+ // we had known which hub server owned the topic
+ DuplexChannelPtr ch = getSubscriptionChannel(addr);
+ if (ch.get()) {
+ return ch;
+ }
+ ch = createSubscriptionChannel(addr);
+ return storeSubscriptionChannel(ch, true);
+ }
+}
+
+DuplexChannelPtr SimpleDuplexChannelManager::getSubscriptionChannel(const HostAddress& addr) {
+ // for simple subscription channel, we established a new channel each time
+ return DuplexChannelPtr();
+}
+
+DuplexChannelPtr SimpleDuplexChannelManager::createSubscriptionChannel(const HostAddress& addr) {
+ // Create a simple subscriber channel handler
+ SimpleSubscriberClientChannelHandler * subscriberHandler =
+ new SimpleSubscriberClientChannelHandler(
+ boost::dynamic_pointer_cast<SimpleDuplexChannelManager>(shared_from_this()),
+ subscriptionHandlers);
+ ChannelHandlerPtr channelHandler(subscriberHandler);
+ // Create a subscription channel
+ DuplexChannelPtr channel = createChannel(dispatcher->getService(), addr, channelHandler);
+ subscriberHandler->setChannel(boost::dynamic_pointer_cast<AbstractDuplexChannel>(channel));
+ LOG4CXX_INFO(logger, "New subscription channel " << channel.get() << " is created to host "
+ << addr << ", whose channel handler is " << subscriberHandler);
+ return channel;
+}
+
+DuplexChannelPtr SimpleDuplexChannelManager::storeSubscriptionChannel(const DuplexChannelPtr& ch,
+ bool doConnect) {
+ // for simple duplex channel manager
+ // we just store subscription channel handler until subscribe successfully
+ if (doConnect) {
+ ch->connect();
+ }
+ return ch;
+}
+
+bool SimpleDuplexChannelManager::storeSubscriptionChannelHandler(
+ const TopicSubscriber& ts, const PubSubDataPtr& txn,
+ const SimpleSubscriberClientChannelHandlerPtr& handler) {
+ SimpleSubscriberClientChannelHandlerPtr other;
+ bool success = false;
+ bool isResubscribeRequest = txn->isResubscribeRequest();
+ {
+ boost::lock_guard<boost::shared_mutex> lock(topicsubscriber2handler_lock);
+ other = topicsubscriber2handler[ts];
+ if (other.get()) {
+ if (isResubscribeRequest) {
+ DuplexChannelPtr& origChannel = txn->getOrigChannelForResubscribe();
+ const AbstractDuplexChannelPtr& otherChannel =
+ other->getChannel();
+ if (origChannel.get() != otherChannel.get()) {
+ // channel has been changed for a specific subscriber
+ // which means the client closesub and subscribe again
+ // when channel disconnect to resubscribe for it.
+ // so we should not let the resubscribe succeed
+ success = false;
+ } else {
+ topicsubscriber2handler[ts] = handler;
+ success = true;
+ }
+ } else {
+ success = false;
+ }
+ } else {
+ if (isResubscribeRequest) {
+ // if it is a resubscribe request and there is no handler found
+ // which means a closesub has been called when resubscribing
+ // so we should not let the resubscribe succeed
+ success = false;
+ } else {
+ topicsubscriber2handler[ts] = handler;
+ success = true;
+ }
+ }
+ }
+ if (isResubscribeRequest && success && other.get()) {
+ // the old handler is evicted due to resubscribe succeed
+ // so it is the time to close the old disconnected channel now
+ other->close();
+ }
+ return success;
+}
+
+void SimpleDuplexChannelManager::asyncCloseSubscription(const TopicSubscriber& ts,
+ const OperationCallbackPtr& callback) {
+ SimpleSubscriberClientChannelHandlerPtr handler;
+ {
+ boost::lock_guard<boost::shared_mutex> lock(topicsubscriber2handler_lock);
+ handler = topicsubscriber2handler[ts];
+ topicsubscriber2handler.erase(ts);
+ LOG4CXX_DEBUG(logger, "CloseSubscription:: remove subscriber channel handler for (topic:"
+ << ts.first << ", subscriber:" << ts.second << ").");
+ }
+
+ if (handler.get() != 0) {
+ handler->close();
+ }
+ callback->operationComplete();
+}
+
+void SimpleDuplexChannelManager::handoverDelivery(const TopicSubscriber& ts,
+ const MessageHandlerCallbackPtr& msgHandler,
+ const ClientMessageFilterPtr& filter) {
+ SimpleSubscriberClientChannelHandlerPtr handler;
+ {
+ boost::shared_lock<boost::shared_mutex> lock(topicsubscriber2handler_lock);
+ handler = topicsubscriber2handler[ts];
+ }
+
+ if (!handler.get()) {
+ LOG4CXX_WARN(logger, "No channel handler found for (topic:" << ts.first << ", subscriber:"
+ << ts.second << ") to handover delivery with handler "
+ << msgHandler.get() << ", filter " << filter.get() << ".");
+ return;
+ }
+ try {
+ handler->startDelivery(ts, msgHandler, filter);
+ } catch(const AlreadyStartDeliveryException& ase) {
+ LOG4CXX_WARN(logger, "Other one has started delivery for (topic:" << ts.first <<
+ ", subscriber:" << ts.second << ") using brand new message handler. "
+ << "It is OK that we could give up handing over old message handler.");
+ } catch(const std::exception& e) {
+ LOG4CXX_WARN(logger, "Error when handing over old message handler for (topic:" << ts.first
+ << ", subscriber:" << ts.second << ") : " << e.what());
+ }
+}