You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@bookkeeper.apache.org by si...@apache.org on 2016/03/16 04:44:40 UTC
[30/49] bookkeeper git commit: BOOKKEEPER-769: Remove the Hedwig Code
http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-client/src/main/cpp/lib/clientimpl.cpp
----------------------------------------------------------------------
diff --git a/hedwig-client/src/main/cpp/lib/clientimpl.cpp b/hedwig-client/src/main/cpp/lib/clientimpl.cpp
deleted file mode 100644
index 40114d6..0000000
--- a/hedwig-client/src/main/cpp/lib/clientimpl.cpp
+++ /dev/null
@@ -1,738 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-#ifdef HAVE_CONFIG_H
-#include <config.h>
-#endif
-
-#include "clientimpl.h"
-#include "channel.h"
-#include "publisherimpl.h"
-#include "subscriberimpl.h"
-#include "simplesubscriberimpl.h"
-#include "multiplexsubscriberimpl.h"
-#include <log4cxx/logger.h>
-
-static log4cxx::LoggerPtr logger(log4cxx::Logger::getLogger("hedwig."__FILE__));
-
-using namespace Hedwig;
-
-const int DEFAULT_MESSAGE_FORCE_CONSUME_RETRY_WAIT_TIME = 5000;
-const std::string DEFAULT_SERVER_DEFAULT_VAL = "";
-const bool DEFAULT_SSL_ENABLED = false;
-
-void SyncOperationCallback::wait() {
- boost::unique_lock<boost::mutex> lock(mut);
- while(response==PENDING) {
- if (cond.timed_wait(lock, boost::posix_time::milliseconds(timeout)) == false) {
- LOG4CXX_ERROR(logger, "Timeout waiting for operation to complete " << this);
-
- response = TIMEOUT;
- }
- }
-}
-
-void SyncOperationCallback::operationComplete() {
- if (response == TIMEOUT) {
- LOG4CXX_ERROR(logger, "operationCompleted successfully after timeout " << this);
- return;
- }
-
- {
- boost::lock_guard<boost::mutex> lock(mut);
- response = SUCCESS;
- }
- cond.notify_all();
-}
-
-void SyncOperationCallback::operationFailed(const std::exception& exception) {
- if (response == TIMEOUT) {
- LOG4CXX_ERROR(logger, "operationCompleted unsuccessfully after timeout " << this);
- return;
- }
-
- {
- boost::lock_guard<boost::mutex> lock(mut);
-
- if (typeid(exception) == typeid(ChannelConnectException)) {
- response = NOCONNECT;
- } else if (typeid(exception) == typeid(ServiceDownException)) {
- response = SERVICEDOWN;
- } else if (typeid(exception) == typeid(AlreadySubscribedException)) {
- response = ALREADY_SUBSCRIBED;
- } else if (typeid(exception) == typeid(NotSubscribedException)) {
- response = NOT_SUBSCRIBED;
- } else {
- response = UNKNOWN;
- }
- }
- cond.notify_all();
-}
-
-void SyncOperationCallback::throwExceptionIfNeeded() {
- switch (response) {
- case SUCCESS:
- break;
- case NOCONNECT:
- throw CannotConnectException();
- break;
- case SERVICEDOWN:
- throw ServiceDownException();
- break;
- case ALREADY_SUBSCRIBED:
- throw AlreadySubscribedException();
- break;
- case NOT_SUBSCRIBED:
- throw NotSubscribedException();
- break;
- case TIMEOUT:
- throw ClientTimeoutException();
- break;
- default:
- throw ClientException();
- break;
- }
-}
-
-ResponseHandler::ResponseHandler(const DuplexChannelManagerPtr& channelManager)
- : channelManager(channelManager) {
-}
-
-void ResponseHandler::redirectRequest(const PubSubResponsePtr& response,
- const PubSubDataPtr& data,
- const DuplexChannelPtr& channel) {
- HostAddress oldhost = channel->getHostAddress();
- data->addTriedServer(oldhost);
-
- HostAddress h;
- bool redirectToDefaultHost = true;
- try {
- if (response->has_statusmsg()) {
- try {
- h = HostAddress::fromString(response->statusmsg());
- redirectToDefaultHost = false;
- } catch (std::exception& e) {
- h = channelManager->getDefaultHost();
- }
- } else {
- h = channelManager->getDefaultHost();
- }
- } catch (std::exception& e) {
- LOG4CXX_ERROR(logger, "Failed to retrieve redirected host of request " << *data
- << " : " << e.what());
- data->getCallback()->operationFailed(InvalidRedirectException());
- return;
- }
- if (data->hasTriedServer(h)) {
- LOG4CXX_ERROR(logger, "We've been told to try request [" << data->getTxnId() << "] with ["
- << h.getAddressString()<< "] by " << oldhost.getAddressString()
- << " but we've already tried that. Failing operation");
- data->getCallback()->operationFailed(InvalidRedirectException());
- return;
- }
- LOG4CXX_INFO(logger, "We've been told [" << data->getTopic() << "] is on [" << h.getAddressString()
- << "] by [" << oldhost.getAddressString() << "]. Redirecting request "
- << data->getTxnId());
- data->setShouldClaim(true);
-
- // submit the request again to the target host
- if (redirectToDefaultHost) {
- channelManager->submitOpToDefaultServer(data);
- } else {
- channelManager->redirectOpToHost(data, h);
- }
-}
-
-HedwigClientChannelHandler::HedwigClientChannelHandler(const DuplexChannelManagerPtr& channelManager,
- ResponseHandlerMap& handlers)
- : channelManager(channelManager), handlers(handlers), closed(false), disconnected(false) {
-}
-
-void HedwigClientChannelHandler::messageReceived(const DuplexChannelPtr& channel, const PubSubResponsePtr& m) {
- LOG4CXX_DEBUG(logger, "Message received txnid(" << m->txnid() << ") status("
- << m->statuscode() << ")");
- if (m->has_message()) {
- LOG4CXX_ERROR(logger, "Subscription response, ignore for now");
- return;
- }
-
- PubSubDataPtr data = channel->retrieveTransaction(m->txnid());
- /* you now have ownership of data, don't leave this funciton without deleting it or
- palming it off to someone else */
-
- if (data.get() == 0) {
- LOG4CXX_ERROR(logger, "No pub/sub request for txnid(" << m->txnid() << ").");
- return;
- }
-
- // Store the topic2Host mapping if this wasn't a server redirect.
- // TODO: add specific response for failure of getting topic ownership
- // to distinguish SERVICE_DOWN to failure of getting topic ownership
- if (m->statuscode() != NOT_RESPONSIBLE_FOR_TOPIC) {
- const HostAddress& host = channel->getHostAddress();
- channelManager->setHostForTopic(data->getTopic(), host);
- }
-
- const ResponseHandlerPtr& respHandler = handlers[data->getType()];
- if (respHandler.get()) {
- respHandler->handleResponse(m, data, channel);
- } else {
- LOG4CXX_ERROR(logger, "Unimplemented request type " << data->getType() << " : "
- << *data);
- data->getCallback()->operationFailed(UnknownRequestException());
- }
-}
-
-void HedwigClientChannelHandler::channelConnected(const DuplexChannelPtr& channel) {
- // do nothing
-}
-
-void HedwigClientChannelHandler::channelDisconnected(const DuplexChannelPtr& channel,
- const std::exception& e) {
- if (channelManager->isClosed()) {
- return;
- }
-
- // If this channel was closed explicitly by the client code,
- // we do not need to do any of this logic. This could happen
- // for redundant Publish channels created or redirected subscribe
- // channels that are not used anymore or when we shutdown the
- // client and manually close all of the open channels.
- // Also don't do any of the disconnect logic if the client has stopped.
- {
- boost::lock_guard<boost::shared_mutex> lock(close_lock);
- if (closed) {
- return;
- }
- if (disconnected) {
- return;
- }
- disconnected = true;
- }
- LOG4CXX_INFO(logger, "Channel " << channel.get() << " was disconnected.");
- // execute logic after channel disconnected
- onChannelDisconnected(channel);
-}
-
-void HedwigClientChannelHandler::onChannelDisconnected(const DuplexChannelPtr& channel) {
- // Clean up the channel from channel manager
- channelManager->nonSubscriptionChannelDied(channel);
-}
-
-void HedwigClientChannelHandler::exceptionOccurred(const DuplexChannelPtr& channel, const std::exception& e) {
- LOG4CXX_ERROR(logger, "Exception occurred" << e.what());
-}
-
-void HedwigClientChannelHandler::close() {
- {
- boost::lock_guard<boost::shared_mutex> lock(close_lock);
- if (closed) {
- return;
- }
- closed = true;
- }
- // do close handle logic here
- doClose();
-}
-
-void HedwigClientChannelHandler::doClose() {
- // do nothing for generic client channel handler
-}
-
-//
-// Pub/Sub Request Write Callback
-//
-PubSubWriteCallback::PubSubWriteCallback(const DuplexChannelPtr& channel,
- const PubSubDataPtr& data)
- : channel(channel), data(data) {
-}
-
-void PubSubWriteCallback::operationComplete() {
- LOG4CXX_INFO(logger, "Successfully wrote pubsub request : " << *data << " to channel "
- << channel.get());
-}
-
-void PubSubWriteCallback::operationFailed(const std::exception& exception) {
- LOG4CXX_ERROR(logger, "Error writing pubsub request (" << *data << ") : " << exception.what());
-
- // remove the transaction from channel if write failed
- channel->retrieveTransaction(data->getTxnId());
- data->getCallback()->operationFailed(exception);
-}
-
-//
-// Default Server Connect Callback
-//
-DefaultServerConnectCallback::DefaultServerConnectCallback(const DuplexChannelManagerPtr& channelManager,
- const DuplexChannelPtr& channel,
- const PubSubDataPtr& data)
- : channelManager(channelManager), channel(channel), data(data) {
-}
-
-void DefaultServerConnectCallback::operationComplete() {
- LOG4CXX_DEBUG(logger, "Channel " << channel.get() << " is connected to host "
- << channel->getHostAddress() << ".");
- // After connected, we got the right ip for the target host
- // so we could submit the request right now
- channelManager->submitOpThruChannel(data, channel);
-}
-
-void DefaultServerConnectCallback::operationFailed(const std::exception& exception) {
- LOG4CXX_ERROR(logger, "Channel " << channel.get() << " failed to connect to host "
- << channel->getHostAddress() << " : " << exception.what());
- data->getCallback()->operationFailed(exception);
-}
-
-//
-// Subscription Event Emitter
-//
-SubscriptionEventEmitter::SubscriptionEventEmitter() {}
-
-void SubscriptionEventEmitter::addSubscriptionListener(
- SubscriptionListenerPtr& listener) {
- boost::lock_guard<boost::shared_mutex> lock(listeners_lock);
- listeners.insert(listener);
-}
-
-void SubscriptionEventEmitter::removeSubscriptionListener(
- SubscriptionListenerPtr& listener) {
- boost::lock_guard<boost::shared_mutex> lock(listeners_lock);
- listeners.erase(listener);
-}
-
-void SubscriptionEventEmitter::emitSubscriptionEvent(
- const std::string& topic, const std::string& subscriberId,
- const SubscriptionEvent event) {
- boost::shared_lock<boost::shared_mutex> lock(listeners_lock);
- if (0 == listeners.size()) {
- return;
- }
- for (SubscriptionListenerSet::iterator iter = listeners.begin();
- iter != listeners.end(); ++iter) {
- (*iter)->processEvent(topic, subscriberId, event);
- }
-}
-
-//
-// Channel Manager Used to manage all established channels
-//
-
-DuplexChannelManagerPtr DuplexChannelManager::create(const Configuration& conf) {
- DuplexChannelManager * managerPtr;
- if (conf.getBool(Configuration::SUBSCRIPTION_CHANNEL_SHARING_ENABLED, false)) {
- managerPtr = new MultiplexDuplexChannelManager(conf);
- } else {
- managerPtr = new SimpleDuplexChannelManager(conf);
- }
- DuplexChannelManagerPtr manager(managerPtr);
- LOG4CXX_DEBUG(logger, "Created DuplexChannelManager " << manager.get());
- return manager;
-}
-
-DuplexChannelManager::DuplexChannelManager(const Configuration& conf)
- : dispatcher(new EventDispatcher(conf)), conf(conf), closed(false), counterobj(),
- defaultHostAddress(conf.get(Configuration::DEFAULT_SERVER,
- DEFAULT_SERVER_DEFAULT_VAL)) {
- sslEnabled = conf.getBool(Configuration::SSL_ENABLED, DEFAULT_SSL_ENABLED);
- if (sslEnabled) {
- sslCtxFactory = SSLContextFactoryPtr(new SSLContextFactory(conf));
- }
- LOG4CXX_DEBUG(logger, "Created DuplexChannelManager " << this << " with default server "
- << defaultHostAddress);
-}
-
-DuplexChannelManager::~DuplexChannelManager() {
- LOG4CXX_DEBUG(logger, "Destroyed DuplexChannelManager " << this);
-}
-
-void DuplexChannelManager::submitTo(const PubSubDataPtr& op, const DuplexChannelPtr& channel) {
- if (channel.get()) {
- channel->storeTransaction(op);
- OperationCallbackPtr writecb(new PubSubWriteCallback(channel, op));
- LOG4CXX_DEBUG(logger, "Submit pub/sub request " << *op << " thru channel " << channel.get());
- channel->writeRequest(op->getRequest(), writecb);
- } else {
- submitOpToDefaultServer(op);
- }
-}
-
-// Submit a pub/sub request
-void DuplexChannelManager::submitOp(const PubSubDataPtr& op) {
- DuplexChannelPtr channel;
- switch (op->getType()) {
- case PUBLISH:
- case UNSUBSCRIBE:
- try {
- channel = getNonSubscriptionChannel(op->getTopic());
- } catch (std::exception& e) {
- LOG4CXX_ERROR(logger, "Failed to submit request " << *op << " : " << e.what());
- op->getCallback()->operationFailed(e);
- return;
- }
- break;
- default:
- TopicSubscriber ts(op->getTopic(), op->getSubscriberId());
- channel = getSubscriptionChannel(ts, op->isResubscribeRequest());
- break;
- }
- // write the pub/sub request
- submitTo(op, channel);
-}
-
-// Submit a pub/sub request to target host
-void DuplexChannelManager::redirectOpToHost(const PubSubDataPtr& op, const HostAddress& addr) {
- DuplexChannelPtr channel;
- switch (op->getType()) {
- case PUBLISH:
- case UNSUBSCRIBE:
- // check whether there is a channel existed for non-subscription requests
- channel = getNonSubscriptionChannel(addr);
- if (!channel.get()) {
- channel = createNonSubscriptionChannel(addr);
- channel = storeNonSubscriptionChannel(channel, true);
- }
- break;
- default:
- channel = getSubscriptionChannel(addr);
- if (!channel.get()) {
- channel = createSubscriptionChannel(addr);
- channel = storeSubscriptionChannel(channel, true);
- }
- break;
- }
- // write the pub/sub request
- submitTo(op, channel);
-}
-
-// Submit a pub/sub request to established request
-void DuplexChannelManager::submitOpThruChannel(const PubSubDataPtr& op,
- const DuplexChannelPtr& ch) {
- DuplexChannelPtr channel;
- switch (op->getType()) {
- case PUBLISH:
- case UNSUBSCRIBE:
- channel = storeNonSubscriptionChannel(ch, false);
- break;
- default:
- channel = storeSubscriptionChannel(ch, false);
- break;
- }
- // write the pub/sub request
- submitTo(op, channel);
-}
-
-// Submit a pub/sub request to default server
-void DuplexChannelManager::submitOpToDefaultServer(const PubSubDataPtr& op) {
- DuplexChannelPtr channel;
- try {
- switch (op->getType()) {
- case PUBLISH:
- case UNSUBSCRIBE:
- channel = createNonSubscriptionChannel(getDefaultHost());
- break;
- default:
- channel = createSubscriptionChannel(getDefaultHost());
- break;
- }
- } catch (std::exception& e) {
- LOG4CXX_ERROR(logger, "Failed to create channel to default host " << defaultHostAddress
- << " for request " << op << " : " << e.what());
- op->getCallback()->operationFailed(e);
- return;
- }
- OperationCallbackPtr connectCallback(new DefaultServerConnectCallback(shared_from_this(),
- channel, op));
- // connect to default server. usually default server is a VIP, we only got the real
- // IP address after connected. so before connected, we don't know the real target host.
- // we only submit the request after channel is connected (ip address would be updated).
- channel->connect(connectCallback);
-}
-
-DuplexChannelPtr DuplexChannelManager::getNonSubscriptionChannel(const std::string& topic) {
- HostAddress addr;
- {
- boost::shared_lock<boost::shared_mutex> lock(topic2host_lock);
- addr = topic2host[topic];
- }
- if (addr.isNullHost()) {
- return DuplexChannelPtr();
- } else {
- // we had known which hub server owned the topic
- DuplexChannelPtr ch = getNonSubscriptionChannel(addr);
- if (ch.get()) {
- return ch;
- }
- ch = createNonSubscriptionChannel(addr);
- return storeNonSubscriptionChannel(ch, true);
- }
-}
-
-DuplexChannelPtr DuplexChannelManager::getNonSubscriptionChannel(const HostAddress& addr) {
- boost::shared_lock<boost::shared_mutex> lock(host2channel_lock);
- return host2channel[addr];
-}
-
-DuplexChannelPtr DuplexChannelManager::createNonSubscriptionChannel(const HostAddress& addr) {
- // Create a non-subscription channel handler
- ChannelHandlerPtr handler(new HedwigClientChannelHandler(shared_from_this(),
- nonSubscriptionHandlers));
- // Create a non subscription channel
- return createChannel(dispatcher->getService(), addr, handler);
-}
-
-DuplexChannelPtr DuplexChannelManager::storeNonSubscriptionChannel(const DuplexChannelPtr& ch,
- bool doConnect) {
- const HostAddress& host = ch->getHostAddress();
-
- bool useOldCh;
- DuplexChannelPtr oldCh;
- {
- boost::lock_guard<boost::shared_mutex> lock(host2channel_lock);
-
- oldCh = host2channel[host];
- if (!oldCh.get()) {
- host2channel[host] = ch;
- useOldCh = false;
- } else {
- // If we've reached here, that means we already have a Channel
- // mapping for the given host. This should ideally not happen
- // and it means we are creating another Channel to a server host
- // to publish on when we could have used an existing one. This could
- // happen due to a race condition if initially multiple concurrent
- // threads are publishing on the same topic and no Channel exists
- // currently to the server. We are not synchronizing this initial
- // creation of Channels to a given host for performance.
- // Another possible way to have redundant Channels created is if
- // a new topic is being published to, we connect to the default
- // server host which should be a VIP that redirects to a "real"
- // server host. Since we don't know beforehand what is the full
- // set of server hosts, we could be redirected to a server that
- // we already have a channel connection to from a prior existing
- // topic. Close these redundant channels as they won't be used.
- useOldCh = true;
- }
- }
- if (useOldCh) {
- LOG4CXX_DEBUG(logger, "Channel " << oldCh.get() << " to host " << host
- << " already exists so close channel " << ch.get() << ".");
- ch->close();
- return oldCh;
- } else {
- if (doConnect) {
- ch->connect();
- }
- LOG4CXX_DEBUG(logger, "Storing channel " << ch.get() << " for host " << host << ".");
- return ch;
- }
-}
-
-DuplexChannelPtr DuplexChannelManager::createChannel(IOServicePtr& service,
- const HostAddress& addr,
- const ChannelHandlerPtr& handler) {
- DuplexChannelPtr channel;
- if (sslEnabled) {
- boost_ssl_context_ptr sslCtx = sslCtxFactory->createSSLContext(service->getService());
- channel = DuplexChannelPtr(new AsioSSLDuplexChannel(service, sslCtx, addr, handler));
- } else {
- channel = DuplexChannelPtr(new AsioDuplexChannel(service, addr, handler));
- }
-
- boost::lock_guard<boost::shared_mutex> lock(allchannels_lock);
- if (closed) {
- channel->close();
- throw ShuttingDownException();
- }
- allchannels.insert(channel);
- LOG4CXX_DEBUG(logger, "Created a channel to " << addr << ", all channels : " << allchannels.size());
-
- return channel;
-}
-
-long DuplexChannelManager::nextTxnId() {
- return counterobj.next();
-}
-
-void DuplexChannelManager::setHostForTopic(const std::string& topic, const HostAddress& host) {
- boost::lock_guard<boost::shared_mutex> h2clock(host2topics_lock);
- boost::lock_guard<boost::shared_mutex> t2hlock(topic2host_lock);
- topic2host[topic] = host;
- TopicSetPtr ts = host2topics[host];
- if (!ts.get()) {
- ts = TopicSetPtr(new TopicSet());
- host2topics[host] = ts;
- }
- ts->insert(topic);
- LOG4CXX_DEBUG(logger, "Set ownership of topic " << topic << " to " << host << ".");
-}
-
-void DuplexChannelManager::clearAllTopicsForHost(const HostAddress& addr) {
- // remove topic mapping
- boost::lock_guard<boost::shared_mutex> h2tlock(host2topics_lock);
- boost::lock_guard<boost::shared_mutex> t2hlock(topic2host_lock);
- Host2TopicsMap::iterator iter = host2topics.find(addr);
- if (iter != host2topics.end()) {
- for (TopicSet::iterator tsIter = iter->second->begin();
- tsIter != iter->second->end(); ++tsIter) {
- topic2host.erase(*tsIter);
- }
- host2topics.erase(iter);
- }
-}
-
-void DuplexChannelManager::clearHostForTopic(const std::string& topic,
- const HostAddress& addr) {
- // remove topic mapping
- boost::lock_guard<boost::shared_mutex> h2tlock(host2topics_lock);
- boost::lock_guard<boost::shared_mutex> t2hlock(topic2host_lock);
- Host2TopicsMap::iterator iter = host2topics.find(addr);
- if (iter != host2topics.end()) {
- iter->second->erase(topic);
- }
- HostAddress existed = topic2host[topic];
- if (existed == addr) {
- topic2host.erase(topic);
- }
-}
-
-const HostAddress& DuplexChannelManager::getHostForTopic(const std::string& topic) {
- boost::shared_lock<boost::shared_mutex> t2hlock(topic2host_lock);
- return topic2host[topic];
-}
-
-/**
- A channel has just died. Remove it so we never give it to any other publisher or subscriber.
-
- This does not delete the channel. Some publishers or subscribers will still hold it and will be errored
- when they try to do anything with it.
-*/
-void DuplexChannelManager::nonSubscriptionChannelDied(const DuplexChannelPtr& channel) {
- // get host
- HostAddress addr = channel->getHostAddress();
-
- // Clear the topic owner ship when a nonsubscription channel disconnected
- clearAllTopicsForHost(addr);
-
- // remove channel mapping
- {
- boost::lock_guard<boost::shared_mutex> h2clock(host2channel_lock);
- host2channel.erase(addr);
- }
- removeChannel(channel);
-}
-
-void DuplexChannelManager::removeChannel(const DuplexChannelPtr& channel) {
- {
- boost::lock_guard<boost::shared_mutex> aclock(allchannels_lock);
- allchannels.erase(channel); // channel should be deleted here
- }
- channel->close();
-}
-
-void DuplexChannelManager::start() {
- // add non-subscription response handlers
- nonSubscriptionHandlers[PUBLISH] =
- ResponseHandlerPtr(new PublishResponseHandler(shared_from_this()));
- nonSubscriptionHandlers[UNSUBSCRIBE] =
- ResponseHandlerPtr(new UnsubscribeResponseHandler(shared_from_this()));
-
- // start the dispatcher
- dispatcher->start();
-}
-
-bool DuplexChannelManager::isClosed() {
- boost::shared_lock<boost::shared_mutex> lock(allchannels_lock);
- return closed;
-}
-
-void DuplexChannelManager::close() {
- // stop the dispatcher
- dispatcher->stop();
- {
- boost::lock_guard<boost::shared_mutex> lock(allchannels_lock);
-
- closed = true;
- for (ChannelMap::iterator iter = allchannels.begin(); iter != allchannels.end(); ++iter ) {
- (*iter)->close();
- }
- allchannels.clear();
- }
-
- // Unregistered response handlers
- nonSubscriptionHandlers.clear();
- /* destruction of the maps will clean up any items they hold */
-}
-
-ClientImplPtr ClientImpl::Create(const Configuration& conf) {
- ClientImplPtr impl(new ClientImpl(conf));
- LOG4CXX_DEBUG(logger, "Creating Clientimpl " << impl);
- impl->channelManager->start();
- return impl;
-}
-
-void ClientImpl::Destroy() {
- LOG4CXX_DEBUG(logger, "destroying Clientimpl " << this);
-
- // close the channel manager
- channelManager->close();
-
- if (subscriber != NULL) {
- delete subscriber;
- subscriber = NULL;
- }
- if (publisher != NULL) {
- delete publisher;
- publisher = NULL;
- }
-}
-
-ClientImpl::ClientImpl(const Configuration& conf)
- : conf(conf), publisher(NULL), subscriber(NULL)
-{
- channelManager = DuplexChannelManager::create(conf);
-}
-
-Subscriber& ClientImpl::getSubscriber() {
- return getSubscriberImpl();
-}
-
-Publisher& ClientImpl::getPublisher() {
- return getPublisherImpl();
-}
-
-SubscriberImpl& ClientImpl::getSubscriberImpl() {
- if (subscriber == NULL) {
- boost::lock_guard<boost::mutex> lock(subscribercreate_lock);
- if (subscriber == NULL) {
- subscriber = new SubscriberImpl(channelManager);
- }
- }
- return *subscriber;
-}
-
-PublisherImpl& ClientImpl::getPublisherImpl() {
- if (publisher == NULL) {
- boost::lock_guard<boost::mutex> lock(publishercreate_lock);
- if (publisher == NULL) {
- publisher = new PublisherImpl(channelManager);
- }
- }
- return *publisher;
-}
-
-ClientImpl::~ClientImpl() {
- LOG4CXX_DEBUG(logger, "deleting Clientimpl " << this);
-}
http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-client/src/main/cpp/lib/clientimpl.h
----------------------------------------------------------------------
diff --git a/hedwig-client/src/main/cpp/lib/clientimpl.h b/hedwig-client/src/main/cpp/lib/clientimpl.h
deleted file mode 100644
index fd7915c..0000000
--- a/hedwig-client/src/main/cpp/lib/clientimpl.h
+++ /dev/null
@@ -1,493 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-#ifndef HEDWIG_CLIENT_IMPL_H
-#define HEDWIG_CLIENT_IMPL_H
-
-#include <hedwig/client.h>
-#include <hedwig/protocol.h>
-
-#include <boost/asio.hpp>
-#include <boost/shared_ptr.hpp>
-#include <boost/enable_shared_from_this.hpp>
-#include <boost/thread/mutex.hpp>
-#include <boost/thread/condition_variable.hpp>
-
-#ifdef USE_BOOST_TR1
-#include <boost/tr1/unordered_map.hpp>
-#else
-#include <tr1/unordered_map>
-#endif
-
-#include <list>
-
-#include "util.h"
-#include "channel.h"
-#include "data.h"
-#include "eventdispatcher.h"
-
-namespace Hedwig {
- const int DEFAULT_SYNC_REQUEST_TIMEOUT = 5000;
-
- template<class R>
- class SyncCallback : public Callback<R> {
- public:
- SyncCallback(int timeout) : response(PENDING), timeout(timeout) {}
- virtual void operationComplete(const R& r) {
- if (response == TIMEOUT) {
- return;
- }
-
- {
- boost::lock_guard<boost::mutex> lock(mut);
- response = SUCCESS;
- result = r;
- }
- cond.notify_all();
- }
-
- virtual void operationFailed(const std::exception& exception) {
- if (response == TIMEOUT) {
- return;
- }
-
- {
- boost::lock_guard<boost::mutex> lock(mut);
-
- if (typeid(exception) == typeid(ChannelConnectException)) {
- response = NOCONNECT;
- } else if (typeid(exception) == typeid(ServiceDownException)) {
- response = SERVICEDOWN;
- } else if (typeid(exception) == typeid(AlreadySubscribedException)) {
- response = ALREADY_SUBSCRIBED;
- } else if (typeid(exception) == typeid(NotSubscribedException)) {
- response = NOT_SUBSCRIBED;
- } else {
- response = UNKNOWN;
- }
- }
- cond.notify_all();
- }
-
- void wait() {
- boost::unique_lock<boost::mutex> lock(mut);
- while(response==PENDING) {
- if (cond.timed_wait(lock, boost::posix_time::milliseconds(timeout)) == false) {
- response = TIMEOUT;
- }
- }
- }
-
- void throwExceptionIfNeeded() {
- switch (response) {
- case SUCCESS:
- break;
- case NOCONNECT:
- throw CannotConnectException();
- break;
- case SERVICEDOWN:
- throw ServiceDownException();
- break;
- case ALREADY_SUBSCRIBED:
- throw AlreadySubscribedException();
- break;
- case NOT_SUBSCRIBED:
- throw NotSubscribedException();
- break;
- case TIMEOUT:
- throw ClientTimeoutException();
- break;
- default:
- throw ClientException();
- break;
- }
- }
-
- R getResult() { return result; }
-
- private:
- enum {
- PENDING,
- SUCCESS,
- NOCONNECT,
- SERVICEDOWN,
- NOT_SUBSCRIBED,
- ALREADY_SUBSCRIBED,
- TIMEOUT,
- UNKNOWN
- } response;
-
- boost::condition_variable cond;
- boost::mutex mut;
- int timeout;
- R result;
- };
-
- class SyncOperationCallback : public OperationCallback {
- public:
- SyncOperationCallback(int timeout) : response(PENDING), timeout(timeout) {}
- virtual void operationComplete();
- virtual void operationFailed(const std::exception& exception);
-
- void wait();
- void throwExceptionIfNeeded();
-
- private:
- enum {
- PENDING,
- SUCCESS,
- NOCONNECT,
- SERVICEDOWN,
- NOT_SUBSCRIBED,
- ALREADY_SUBSCRIBED,
- TIMEOUT,
- UNKNOWN
- } response;
-
- boost::condition_variable cond;
- boost::mutex mut;
- int timeout;
- };
-
- class DuplexChannelManager;
- typedef boost::shared_ptr<DuplexChannelManager> DuplexChannelManagerPtr;
-
- //
- // Hedwig Response Handler
- //
-
- // Response Handler used to process response for different types of requests
- class ResponseHandler {
- public:
- ResponseHandler(const DuplexChannelManagerPtr& channelManager);
- virtual ~ResponseHandler() {};
-
- virtual void handleResponse(const PubSubResponsePtr& m, const PubSubDataPtr& txn,
- const DuplexChannelPtr& channel) = 0;
- protected:
- // common method used to redirect request
- void redirectRequest(const PubSubResponsePtr& response, const PubSubDataPtr& data,
- const DuplexChannelPtr& channel);
-
- // channel manager to manage all established channels
- const DuplexChannelManagerPtr channelManager;
- };
-
- typedef boost::shared_ptr<ResponseHandler> ResponseHandlerPtr;
- typedef std::tr1::unordered_map<OperationType, ResponseHandlerPtr, OperationTypeHash> ResponseHandlerMap;
-
- class PubSubWriteCallback : public OperationCallback {
- public:
- PubSubWriteCallback(const DuplexChannelPtr& channel, const PubSubDataPtr& data);
- virtual void operationComplete();
- virtual void operationFailed(const std::exception& exception);
- private:
- DuplexChannelPtr channel;
- PubSubDataPtr data;
- };
-
- class DefaultServerConnectCallback : public OperationCallback {
- public:
- DefaultServerConnectCallback(const DuplexChannelManagerPtr& channelManager,
- const DuplexChannelPtr& channel,
- const PubSubDataPtr& data);
- virtual void operationComplete();
- virtual void operationFailed(const std::exception& exception);
- private:
- DuplexChannelManagerPtr channelManager;
- DuplexChannelPtr channel;
- PubSubDataPtr data;
- };
-
- struct SubscriptionListenerPtrHash : public std::unary_function<SubscriptionListenerPtr, size_t> {
- size_t operator()(const Hedwig::SubscriptionListenerPtr& listener) const {
- return reinterpret_cast<size_t>(listener.get());
- }
- };
-
- // Subscription Event Emitter
- class SubscriptionEventEmitter {
- public:
- SubscriptionEventEmitter();
-
- void addSubscriptionListener(SubscriptionListenerPtr& listener);
- void removeSubscriptionListener(SubscriptionListenerPtr& listener);
- void emitSubscriptionEvent(const std::string& topic,
- const std::string& subscriberId,
- const SubscriptionEvent event);
-
- private:
- typedef std::tr1::unordered_set<SubscriptionListenerPtr, SubscriptionListenerPtrHash> SubscriptionListenerSet;
- SubscriptionListenerSet listeners;
- boost::shared_mutex listeners_lock;
- };
-
- class SubscriberClientChannelHandler;
-
- //
- // Duplex Channel Manager to manage all established channels
- //
-
- class DuplexChannelManager : public boost::enable_shared_from_this<DuplexChannelManager> {
- public:
- static DuplexChannelManagerPtr create(const Configuration& conf);
- virtual ~DuplexChannelManager();
-
- inline const Configuration& getConfiguration() const {
- return conf;
- }
-
- // Submit a pub/sub request
- void submitOp(const PubSubDataPtr& op);
-
- // Submit a pub/sub request to default host
- // It is called only when client doesn't have the knowledge of topic ownership
- void submitOpToDefaultServer(const PubSubDataPtr& op);
-
- // Redirect pub/sub request to a target hosts
- void redirectOpToHost(const PubSubDataPtr& op, const HostAddress& host);
-
- // Submit a pub/sub request thru established channel
- // It is called when connecting to default server to established a channel
- void submitOpThruChannel(const PubSubDataPtr& op, const DuplexChannelPtr& channel);
-
- // Generate next transaction id for pub/sub requests sending thru this manager
- long nextTxnId();
-
- // return default host
- inline const HostAddress getDefaultHost() {
- return HostAddress::fromString(defaultHostAddress);
- }
-
- // set the owner host of a topic
- void setHostForTopic(const std::string& topic, const HostAddress& host);
-
- // clear all topics that hosted by a hub server
- void clearAllTopicsForHost(const HostAddress& host);
-
- // clear host for a given topic
- void clearHostForTopic(const std::string& topic, const HostAddress& host);
-
- // Called when a channel is disconnected
- void nonSubscriptionChannelDied(const DuplexChannelPtr& channel);
-
- // Remove a channel from all channel map
- void removeChannel(const DuplexChannelPtr& channel);
-
- // Get the subscription channel handler for a given subscription
- virtual boost::shared_ptr<SubscriberClientChannelHandler>
- getSubscriptionChannelHandler(const TopicSubscriber& ts) = 0;
-
- // Close subscription for a given subscription
- virtual void asyncCloseSubscription(const TopicSubscriber& ts,
- const OperationCallbackPtr& callback) = 0;
-
- virtual void handoverDelivery(const TopicSubscriber& ts,
- const MessageHandlerCallbackPtr& handler,
- const ClientMessageFilterPtr& filter) = 0;
-
- // start the channel manager
- virtual void start();
- // close the channel manager
- virtual void close();
- // whether the channel manager is closed
- bool isClosed();
-
- // Return an available service
- inline boost::asio::io_service & getService() const {
- return dispatcher->getService()->getService();
- }
-
- // Return the event emitter
- inline SubscriptionEventEmitter& getEventEmitter() {
- return eventEmitter;
- }
-
- protected:
- DuplexChannelManager(const Configuration& conf);
-
- // Get the ownership for a given topic.
- const HostAddress& getHostForTopic(const std::string& topic);
-
- //
- // Channel Management
- //
-
- // Non subscription channel management
-
- // Get a non subscription channel for a given topic
- // If the topic's owner is known, retrieve a subscription channel to
- // target host (if there is no channel existed, create one);
- // If the topic's owner is unknown, return null
- DuplexChannelPtr getNonSubscriptionChannel(const std::string& topic);
-
- // Get an existed non subscription channel to a given host
- DuplexChannelPtr getNonSubscriptionChannel(const HostAddress& addr);
-
- // Create a non subscription channel to a given host
- DuplexChannelPtr createNonSubscriptionChannel(const HostAddress& addr);
-
- // Store the established non subscription channel
- DuplexChannelPtr storeNonSubscriptionChannel(const DuplexChannelPtr& ch,
- bool doConnect);
-
- //
- // Subscription Channel Management
- //
-
- // Get a subscription channel for a given subscription.
- // If there is subscription channel established before, return it.
- // Otherwise, check whether the topic's owner is known. If the topic owner
- // is known, retrieve a subscription channel to target host (if there is no
- // channel exsited, create one); If unknown, return null
- virtual DuplexChannelPtr getSubscriptionChannel(const TopicSubscriber& ts,
- const bool isResubscribeRequest) = 0;
-
- // Get an existed subscription channel to a given host
- virtual DuplexChannelPtr getSubscriptionChannel(const HostAddress& addr) = 0;
-
- // Create a subscription channel to a given host
- // If store is true, store the channel for future usage.
- // If store is false, return a newly created channel.
- virtual DuplexChannelPtr createSubscriptionChannel(const HostAddress& addr) = 0;
-
- // Store the established subscription channel
- virtual DuplexChannelPtr storeSubscriptionChannel(const DuplexChannelPtr& ch,
- bool doConnect) = 0;
-
- //
- // Raw Channel Management
- //
-
- // Create a raw channel
- DuplexChannelPtr createChannel(IOServicePtr& service,
- const HostAddress& addr, const ChannelHandlerPtr& handler);
-
- // event dispatcher running io threads
- typedef boost::shared_ptr<EventDispatcher> EventDispatcherPtr;
- EventDispatcherPtr dispatcher;
-
- // topic2host mapping for topic ownership
- std::tr1::unordered_map<std::string, HostAddress> topic2host;
- boost::shared_mutex topic2host_lock;
- typedef std::tr1::unordered_set<std::string> TopicSet;
- typedef boost::shared_ptr<TopicSet> TopicSetPtr;
- typedef std::tr1::unordered_map<HostAddress, TopicSetPtr, HostAddressHash> Host2TopicsMap;
- Host2TopicsMap host2topics;
- boost::shared_mutex host2topics_lock;
- private:
- // write the request to target channel
- void submitTo(const PubSubDataPtr& op, const DuplexChannelPtr& channel);
-
- const Configuration& conf;
- bool sslEnabled;
- SSLContextFactoryPtr sslCtxFactory;
-
- // whether the channel manager is shutting down
- bool closed;
-
- // counter used for generating transaction ids
- ClientTxnCounter counterobj;
-
- std::string defaultHostAddress;
-
- // non-subscription channels
- std::tr1::unordered_map<HostAddress, DuplexChannelPtr, HostAddressHash > host2channel;
- boost::shared_mutex host2channel_lock;
-
- // maintain all established channels
- typedef std::tr1::unordered_set<DuplexChannelPtr, DuplexChannelPtrHash > ChannelMap;
- ChannelMap allchannels;
- boost::shared_mutex allchannels_lock;
-
- // Response Handlers for non-subscription requests
- ResponseHandlerMap nonSubscriptionHandlers;
-
- // Subscription Event Emitter
- SubscriptionEventEmitter eventEmitter;
- };
-
- //
- // Hedwig Client Channel Handler to handle responses received from the channel
- //
-
- class HedwigClientChannelHandler : public ChannelHandler {
- public:
- HedwigClientChannelHandler(const DuplexChannelManagerPtr& channelManager,
- ResponseHandlerMap& handlers);
- virtual ~HedwigClientChannelHandler() {}
-
- virtual void messageReceived(const DuplexChannelPtr& channel, const PubSubResponsePtr& m);
- virtual void channelConnected(const DuplexChannelPtr& channel);
- virtual void channelDisconnected(const DuplexChannelPtr& channel, const std::exception& e);
- virtual void exceptionOccurred(const DuplexChannelPtr& channel, const std::exception& e);
-
- void close();
- protected:
- // real channel disconnected logic
- virtual void onChannelDisconnected(const DuplexChannelPtr& channel);
-
- // real close logic
- virtual void doClose();
-
- // channel manager to manage all established channels
- const DuplexChannelManagerPtr channelManager;
- ResponseHandlerMap& handlers;
-
- boost::shared_mutex close_lock;
- // Boolean indicating if we closed the handler explicitly or not.
- // If so, we do not need to do the channel disconnected logic here.
- bool closed;
- // whether channel is disconnected.
- bool disconnected;
- };
-
- class PublisherImpl;
- class SubscriberImpl;
-
- /**
- Implementation of the hedwig client. This class takes care of globals such as the topic->host map and the transaction id counter.
- */
- class ClientImpl : public boost::enable_shared_from_this<ClientImpl> {
- public:
- static ClientImplPtr Create(const Configuration& conf);
- void Destroy();
-
- Subscriber& getSubscriber();
- Publisher& getPublisher();
-
- SubscriberImpl& getSubscriberImpl();
- PublisherImpl& getPublisherImpl();
-
- ~ClientImpl();
- private:
- ClientImpl(const Configuration& conf);
-
- const Configuration& conf;
-
- boost::mutex publishercreate_lock;
- PublisherImpl* publisher;
-
- boost::mutex subscribercreate_lock;
- SubscriberImpl* subscriber;
-
- // channel manager manage all channels for the client
- DuplexChannelManagerPtr channelManager;
- };
-};
-#endif
http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-client/src/main/cpp/lib/data.cpp
----------------------------------------------------------------------
diff --git a/hedwig-client/src/main/cpp/lib/data.cpp b/hedwig-client/src/main/cpp/lib/data.cpp
deleted file mode 100644
index 24d458e..0000000
--- a/hedwig-client/src/main/cpp/lib/data.cpp
+++ /dev/null
@@ -1,277 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-#ifdef HAVE_CONFIG_H
-#include <config.h>
-#endif
-
-#include <hedwig/protocol.h>
-#include "data.h"
-
-#include <log4cxx/logger.h>
-#include <iostream>
-#include <boost/thread/locks.hpp>
-
-#define stringify( name ) #name
-
-static log4cxx::LoggerPtr logger(log4cxx::Logger::getLogger("hedwig."__FILE__));
-
-using namespace Hedwig;
-
-const char* OPERATION_TYPE_NAMES[] = {
- stringify( PUBLISH ),
- stringify( SUBSCRIBE ),
- stringify( CONSUME ),
- stringify( UNSUBSCRIBE ),
- stringify( START_DELIVERY ),
- stringify( STOP_DELIVERY ),
- stringify( CLOSESUBSCRIPTION )
-};
-
-PubSubDataPtr PubSubData::forPublishRequest(long txnid, const std::string& topic, const Message& body,
- const ResponseCallbackPtr& callback) {
- PubSubDataPtr ptr(new PubSubData());
- ptr->type = PUBLISH;
- ptr->txnid = txnid;
- ptr->topic = topic;
- ptr->body.CopyFrom(body);
- ptr->callback = callback;
- return ptr;
-}
-
-PubSubDataPtr PubSubData::forSubscribeRequest(long txnid, const std::string& subscriberid, const std::string& topic,
- const ResponseCallbackPtr& callback, const SubscriptionOptions& options) {
- PubSubDataPtr ptr(new PubSubData());
- ptr->type = SUBSCRIBE;
- ptr->txnid = txnid;
- ptr->subscriberid = subscriberid;
- ptr->topic = topic;
- ptr->callback = callback;
- ptr->options = options;
- return ptr;
-}
-
-PubSubDataPtr PubSubData::forUnsubscribeRequest(long txnid, const std::string& subscriberid, const std::string& topic,
- const ResponseCallbackPtr& callback) {
- PubSubDataPtr ptr(new PubSubData());
- ptr->type = UNSUBSCRIBE;
- ptr->txnid = txnid;
- ptr->subscriberid = subscriberid;
- ptr->topic = topic;
- ptr->callback = callback;
- return ptr;
-}
-
-PubSubDataPtr PubSubData::forCloseSubscriptionRequest(
- long txnid, const std::string& subscriberid, const std::string& topic,
- const ResponseCallbackPtr& callback) {
- PubSubDataPtr ptr(new PubSubData());
- ptr->type = CLOSESUBSCRIPTION;
- ptr->txnid = txnid;
- ptr->subscriberid = subscriberid;
- ptr->topic = topic;
- ptr->callback = callback;
- return ptr;
-}
-
-PubSubDataPtr PubSubData::forConsumeRequest(long txnid, const std::string& subscriberid, const std::string& topic, const MessageSeqId msgid) {
- PubSubDataPtr ptr(new PubSubData());
- ptr->type = CONSUME;
- ptr->txnid = txnid;
- ptr->subscriberid = subscriberid;
- ptr->topic = topic;
- ptr->msgid = msgid;
- return ptr;
-}
-
-PubSubData::PubSubData() : shouldClaim(false), messageBound(0) {
-}
-
-PubSubData::~PubSubData() {
-}
-
-OperationType PubSubData::getType() const {
- return type;
-}
-
-long PubSubData::getTxnId() const {
- return txnid;
-}
-
-const std::string& PubSubData::getTopic() const {
- return topic;
-}
-
-const Message& PubSubData::getBody() const {
- return body;
-}
-
-const MessageSeqId PubSubData::getMessageSeqId() const {
- return msgid;
-}
-
-void PubSubData::setPreferencesForSubRequest(SubscribeRequest * subreq,
- const SubscriptionOptions &options) {
- Hedwig::SubscriptionPreferences* preferences = subreq->mutable_preferences();
- if (options.messagebound() > 0) {
- preferences->set_messagebound(options.messagebound());
- }
- if (options.has_messagefilter()) {
- preferences->set_messagefilter(options.messagefilter());
- }
- if (options.has_options()) {
- preferences->mutable_options()->CopyFrom(options.options());
- }
- if (options.has_messagewindowsize()) {
- preferences->set_messagewindowsize(options.messagewindowsize());
- }
-}
-
-const PubSubRequestPtr PubSubData::getRequest() {
- PubSubRequestPtr request(new Hedwig::PubSubRequest());
- request->set_protocolversion(Hedwig::VERSION_ONE);
- request->set_type(type);
- request->set_txnid(txnid);
- if (shouldClaim) {
- request->set_shouldclaim(shouldClaim);
- }
- request->set_topic(topic);
-
- if (type == PUBLISH) {
- LOG4CXX_DEBUG(logger, "Creating publish request");
-
- Hedwig::PublishRequest* pubreq = request->mutable_publishrequest();
- Hedwig::Message* msg = pubreq->mutable_msg();
- msg->CopyFrom(body);
- } else if (type == SUBSCRIBE) {
- LOG4CXX_DEBUG(logger, "Creating subscribe request");
-
- Hedwig::SubscribeRequest* subreq = request->mutable_subscriberequest();
- subreq->set_subscriberid(subscriberid);
- subreq->set_createorattach(options.createorattach());
- subreq->set_forceattach(options.forceattach());
- setPreferencesForSubRequest(subreq, options);
- } else if (type == CONSUME) {
- LOG4CXX_DEBUG(logger, "Creating consume request");
-
- Hedwig::ConsumeRequest* conreq = request->mutable_consumerequest();
- conreq->set_subscriberid(subscriberid);
- conreq->mutable_msgid()->CopyFrom(msgid);
- } else if (type == UNSUBSCRIBE) {
- LOG4CXX_DEBUG(logger, "Creating unsubscribe request");
-
- Hedwig::UnsubscribeRequest* unsubreq = request->mutable_unsubscriberequest();
- unsubreq->set_subscriberid(subscriberid);
- } else if (type == CLOSESUBSCRIPTION) {
- LOG4CXX_DEBUG(logger, "Creating closeSubscription request");
-
- Hedwig::CloseSubscriptionRequest* closesubreq = request->mutable_closesubscriptionrequest();
- closesubreq->set_subscriberid(subscriberid);
- } else {
- LOG4CXX_ERROR(logger, "Tried to create a request message for the wrong type [" << type << "]");
- throw UnknownRequestException();
- }
-
- return request;
-}
-
-void PubSubData::setShouldClaim(bool shouldClaim) {
- this->shouldClaim = shouldClaim;
-}
-
-void PubSubData::addTriedServer(HostAddress& h) {
- triedservers.insert(h);
-}
-
-bool PubSubData::hasTriedServer(HostAddress& h) {
- return triedservers.count(h) > 0;
-}
-
-void PubSubData::clearTriedServers() {
- triedservers.clear();
-}
-
-ResponseCallbackPtr& PubSubData::getCallback() {
- return callback;
-}
-
-void PubSubData::setCallback(const ResponseCallbackPtr& callback) {
- this->callback = callback;
-}
-
-const std::string& PubSubData::getSubscriberId() const {
- return subscriberid;
-}
-
-const SubscriptionOptions& PubSubData::getSubscriptionOptions() const {
- return options;
-}
-
-void PubSubData::setOrigChannelForResubscribe(
- boost::shared_ptr<DuplexChannel>& channel) {
- this->origChannel = channel;
-}
-
-boost::shared_ptr<DuplexChannel>& PubSubData::getOrigChannelForResubscribe() {
- return this->origChannel;
-}
-
-bool PubSubData::isResubscribeRequest() {
- return 0 != this->origChannel.get();
-}
-
-ClientTxnCounter::ClientTxnCounter() : counter(0)
-{
-}
-
-ClientTxnCounter::~ClientTxnCounter() {
-}
-
-/**
-Increment the transaction counter and return the new value.
-
-@returns the next transaction id
-*/
-long ClientTxnCounter::next() { // would be nice to remove lock from here, look more into it
- boost::lock_guard<boost::mutex> lock(mutex);
-
- long next= ++counter;
-
- return next;
-}
-
-std::ostream& Hedwig::operator<<(std::ostream& os, const PubSubData& data) {
- OperationType type = data.getType();
- os << "[" << OPERATION_TYPE_NAMES[type] << " request (txn:" << data.getTxnId()
- << ") for (topic:" << data.getTopic();
- switch (type) {
- case SUBSCRIBE:
- case UNSUBSCRIBE:
- case CLOSESUBSCRIPTION:
- os << ", subscriber:" << data.getSubscriberId() << ")";
- break;
- case CONSUME:
- os << ", subscriber:" << data.getSubscriberId() << ", seq:"
- << data.getMessageSeqId().localcomponent() << ")";
- break;
- case PUBLISH:
- default:
- os << ")";
- break;
- }
- return os;
-}
http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-client/src/main/cpp/lib/data.h
----------------------------------------------------------------------
diff --git a/hedwig-client/src/main/cpp/lib/data.h b/hedwig-client/src/main/cpp/lib/data.h
deleted file mode 100644
index 0639f4a..0000000
--- a/hedwig-client/src/main/cpp/lib/data.h
+++ /dev/null
@@ -1,131 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-#ifndef DATA_H
-#define DATA_H
-
-#include <hedwig/protocol.h>
-#include <hedwig/callback.h>
-
-#include <pthread.h>
-#include <iostream>
-
-#ifdef USE_BOOST_TR1
-#include <boost/tr1/unordered_set.hpp>
-#else
-#include <tr1/unordered_set>
-#endif
-
-#include "util.h"
-#include <boost/shared_ptr.hpp>
-#include <boost/thread/mutex.hpp>
-
-namespace Hedwig {
- /**
- Simple counter for transaction ids from the client
- */
- class ClientTxnCounter {
- public:
- ClientTxnCounter();
- ~ClientTxnCounter();
- long next();
-
- private:
- long counter;
- boost::mutex mutex;
- };
-
- typedef Callback<ResponseBody> ResponseCallback;
- typedef std::tr1::shared_ptr<ResponseCallback> ResponseCallbackPtr;
-
- class PubSubData;
- typedef boost::shared_ptr<PubSubData> PubSubDataPtr;
- typedef boost::shared_ptr<PubSubRequest> PubSubRequestPtr;
- typedef boost::shared_ptr<PubSubResponse> PubSubResponsePtr;
-
- class DuplexChannel;
-
- /**
- Data structure to hold information about requests and build request messages.
- Used to store requests which may need to be resent to another server.
- */
- class PubSubData {
- public:
- // to be used for publish
- static PubSubDataPtr forPublishRequest(long txnid, const std::string& topic, const Message& body,
- const ResponseCallbackPtr& callback);
- static PubSubDataPtr forSubscribeRequest(long txnid, const std::string& subscriberid, const std::string& topic,
- const ResponseCallbackPtr& callback, const SubscriptionOptions& options);
- static PubSubDataPtr forUnsubscribeRequest(long txnid, const std::string& subscriberid, const std::string& topic,
- const ResponseCallbackPtr& callback);
- static PubSubDataPtr forConsumeRequest(long txnid, const std::string& subscriberid, const std::string& topic, const MessageSeqId msgid);
-
- static PubSubDataPtr forCloseSubscriptionRequest(long txnid, const std::string& subscriberid,
- const std::string& topic,
- const ResponseCallbackPtr& callback);
-
- ~PubSubData();
-
- OperationType getType() const;
- long getTxnId() const;
- const std::string& getSubscriberId() const;
- const std::string& getTopic() const;
- const Message& getBody() const;
- const MessageSeqId getMessageSeqId() const;
-
- void setShouldClaim(bool shouldClaim);
- void setMessageBound(int messageBound);
-
- const PubSubRequestPtr getRequest();
- void setCallback(const ResponseCallbackPtr& callback);
- ResponseCallbackPtr& getCallback();
- const SubscriptionOptions& getSubscriptionOptions() const;
-
- void addTriedServer(HostAddress& h);
- bool hasTriedServer(HostAddress& h);
- void clearTriedServers();
-
- void setOrigChannelForResubscribe(boost::shared_ptr<DuplexChannel>& channel);
- bool isResubscribeRequest();
- boost::shared_ptr<DuplexChannel>& getOrigChannelForResubscribe();
-
- friend std::ostream& operator<<(std::ostream& os, const PubSubData& data);
- private:
-
- PubSubData();
-
- void setPreferencesForSubRequest(SubscribeRequest * subreq,
- const SubscriptionOptions &options);
-
- OperationType type;
- long txnid;
- std::string subscriberid;
- std::string topic;
- Message body;
- bool shouldClaim;
- int messageBound;
- ResponseCallbackPtr callback;
- SubscriptionOptions options;
- MessageSeqId msgid;
- std::tr1::unordered_set<HostAddress, HostAddressHash > triedservers;
- // record the origChannel for a resubscribe request
- boost::shared_ptr<DuplexChannel> origChannel;
- };
-
-};
-#endif
http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-client/src/main/cpp/lib/eventdispatcher.cpp
----------------------------------------------------------------------
diff --git a/hedwig-client/src/main/cpp/lib/eventdispatcher.cpp b/hedwig-client/src/main/cpp/lib/eventdispatcher.cpp
deleted file mode 100644
index af3560c..0000000
--- a/hedwig-client/src/main/cpp/lib/eventdispatcher.cpp
+++ /dev/null
@@ -1,131 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-#ifdef HAVE_CONFIG_H
-#include <config.h>
-#endif
-
-#include "eventdispatcher.h"
-
-#include <log4cxx/logger.h>
-
-static log4cxx::LoggerPtr logger(log4cxx::Logger::getLogger("hedwig."__FILE__));
-
-using namespace Hedwig;
-
-const int DEFAULT_NUM_DISPATCH_THREADS = 1;
-
-IOService::IOService() {
-}
-
-IOService::~IOService() {}
-
-void IOService::start() {
- if (work.get()) {
- return;
- }
- work = work_ptr(new boost::asio::io_service::work(service));
-}
-
-void IOService::stop() {
- if (!work.get()) {
- return;
- }
-
- work = work_ptr();
- service.stop();
-}
-
-void IOService::run() {
- while (true) {
- try {
- service.run();
- break;
- } catch (std::exception &e) {
- LOG4CXX_ERROR(logger, "Exception in IO Service " << this << " : " << e.what());
- }
- }
-}
-
-EventDispatcher::EventDispatcher(const Configuration& conf)
- : conf(conf), running(false), next_io_service(0) {
- num_threads = conf.getInt(Configuration::NUM_DISPATCH_THREADS,
- DEFAULT_NUM_DISPATCH_THREADS);
- if (0 == num_threads) {
- LOG4CXX_ERROR(logger, "Number of threads in dispatcher is zero");
- throw std::runtime_error("number of threads in dispatcher is zero");
- }
- for (size_t i = 0; i < num_threads; i++) {
- services.push_back(IOServicePtr(new IOService()));
- }
- LOG4CXX_DEBUG(logger, "Created EventDispatcher " << this);
-}
-
-void EventDispatcher::run_forever(IOServicePtr service, size_t idx) {
- LOG4CXX_INFO(logger, "Starting event dispatcher " << idx);
-
- service->run();
-
- LOG4CXX_INFO(logger, "Event dispatcher " << idx << " done");
-}
-
-void EventDispatcher::start() {
- if (running) {
- return;
- }
-
- for (size_t i = 0; i < num_threads; i++) {
- IOServicePtr service = services[i];
- service->start();
- // new thread
- thread_ptr t(new boost::thread(boost::bind(&EventDispatcher::run_forever,
- this, service, i)));
- threads.push_back(t);
- }
- running = true;
-}
-
-void EventDispatcher::stop() {
- if (!running) {
- return;
- }
-
- for (size_t i = 0; i < num_threads; i++) {
- services[i]->stop();
- }
-
- for (size_t i = 0; i < num_threads; i++) {
- threads[i]->join();
- }
- threads.clear();
-
- running = false;
-}
-
-EventDispatcher::~EventDispatcher() {
- services.clear();
-}
-
-IOServicePtr& EventDispatcher::getService() {
- size_t next = 0;
- {
- boost::lock_guard<boost::mutex> lock(next_lock);
- next = next_io_service;
- next_io_service = (next_io_service + 1) % num_threads;
- }
- return services[next];
-}
http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-client/src/main/cpp/lib/eventdispatcher.h
----------------------------------------------------------------------
diff --git a/hedwig-client/src/main/cpp/lib/eventdispatcher.h b/hedwig-client/src/main/cpp/lib/eventdispatcher.h
deleted file mode 100644
index b6a7504..0000000
--- a/hedwig-client/src/main/cpp/lib/eventdispatcher.h
+++ /dev/null
@@ -1,87 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-#ifndef EVENTDISPATCHER_H
-#define EVENTDISPATCHER_H
-
-#include <vector>
-
-#include <hedwig/client.h>
-
-#include <boost/asio.hpp>
-#include <boost/thread.hpp>
-#include <boost/shared_ptr.hpp>
-
-namespace Hedwig {
- typedef boost::shared_ptr<boost::asio::io_service::work> work_ptr;
- typedef boost::shared_ptr<boost::thread> thread_ptr;
-
- class IOService;
- typedef boost::shared_ptr<IOService> IOServicePtr;
-
- class IOService {
- public:
- IOService();
- virtual ~IOService();
-
- // start the io service
- void start();
- // stop the io service
- void stop();
- // run the io service
- void run();
-
- inline boost::asio::io_service& getService() {
- return service;
- }
-
- private:
- boost::asio::io_service service;
- work_ptr work;
- };
-
- class EventDispatcher {
- public:
- EventDispatcher(const Configuration& conf);
- ~EventDispatcher();
-
- void start();
-
- void stop();
-
- IOServicePtr& getService();
-
- private:
- void run_forever(IOServicePtr service, size_t idx);
-
- const Configuration& conf;
-
- // number of threads
- size_t num_threads;
- // running flag
- bool running;
- // pool of io_services.
- std::vector<IOServicePtr> services;
- // threads
- std::vector<thread_ptr> threads;
- // next io_service used for a connection
- boost::mutex next_lock;
- std::size_t next_io_service;
- };
-}
-
-#endif
http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-client/src/main/cpp/lib/exceptions.cpp
----------------------------------------------------------------------
diff --git a/hedwig-client/src/main/cpp/lib/exceptions.cpp b/hedwig-client/src/main/cpp/lib/exceptions.cpp
deleted file mode 100644
index 9e062dc..0000000
--- a/hedwig-client/src/main/cpp/lib/exceptions.cpp
+++ /dev/null
@@ -1,31 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-#ifdef HAVE_CONFIG_H
-#include <config.h>
-#endif
-
-#include <hedwig/exceptions.h>
-#include <stdlib.h>
-#include <string.h>
-
-using namespace Hedwig;
-
-
-
-
http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-client/src/main/cpp/lib/filterablemessagehandler.cpp
----------------------------------------------------------------------
diff --git a/hedwig-client/src/main/cpp/lib/filterablemessagehandler.cpp b/hedwig-client/src/main/cpp/lib/filterablemessagehandler.cpp
deleted file mode 100644
index 07d884c..0000000
--- a/hedwig-client/src/main/cpp/lib/filterablemessagehandler.cpp
+++ /dev/null
@@ -1,45 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-#ifdef HAVE_CONFIG_H
-#include <config.h>
-#endif
-
-#include "filterablemessagehandler.h"
-
-using namespace Hedwig;
-
-FilterableMessageHandler::FilterableMessageHandler(const MessageHandlerCallbackPtr& msgHandler,
- const ClientMessageFilterPtr& msgFilter)
- : msgHandler(msgHandler), msgFilter(msgFilter) {
-}
-
-FilterableMessageHandler::~FilterableMessageHandler() {
-}
-
-void FilterableMessageHandler::consume(const std::string& topic, const std::string& subscriberId,
- const Message& msg, OperationCallbackPtr& callback) {
- bool deliver = true;
- if (0 != msgFilter.get()) {
- deliver = msgFilter->testMessage(msg);
- }
- if (deliver) {
- msgHandler->consume(topic, subscriberId, msg, callback);
- } else {
- callback->operationComplete();
- }
-}
http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-client/src/main/cpp/lib/filterablemessagehandler.h
----------------------------------------------------------------------
diff --git a/hedwig-client/src/main/cpp/lib/filterablemessagehandler.h b/hedwig-client/src/main/cpp/lib/filterablemessagehandler.h
deleted file mode 100644
index 2d24bd5..0000000
--- a/hedwig-client/src/main/cpp/lib/filterablemessagehandler.h
+++ /dev/null
@@ -1,49 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-#ifndef FILTERABLE_MESSAGE_HANDLER_H
-#define FILTERABLE_MESSAGE_HANDLER_H
-
-#include <hedwig/callback.h>
-#include <hedwig/protocol.h>
-
-#ifdef USE_BOOST_TR1
-#include <boost/tr1/memory.hpp>
-#else
-#include <tr1/memory>
-#endif
-
-namespace Hedwig {
-
- class FilterableMessageHandler : public MessageHandlerCallback {
- public:
- FilterableMessageHandler(const MessageHandlerCallbackPtr& msgHandler,
- const ClientMessageFilterPtr& msgFilter);
-
- virtual void consume(const std::string& topic, const std::string& subscriberId,
- const Message& msg, OperationCallbackPtr& callback);
-
- virtual ~FilterableMessageHandler();
- private:
- const MessageHandlerCallbackPtr msgHandler;
- const ClientMessageFilterPtr msgFilter;
- };
-
-};
-
-#endif
-