You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@bookkeeper.apache.org by si...@apache.org on 2016/03/16 04:44:40 UTC

[30/49] bookkeeper git commit: BOOKKEEPER-769: Remove the Hedwig Code

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-client/src/main/cpp/lib/clientimpl.cpp
----------------------------------------------------------------------
diff --git a/hedwig-client/src/main/cpp/lib/clientimpl.cpp b/hedwig-client/src/main/cpp/lib/clientimpl.cpp
deleted file mode 100644
index 40114d6..0000000
--- a/hedwig-client/src/main/cpp/lib/clientimpl.cpp
+++ /dev/null
@@ -1,738 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-#ifdef HAVE_CONFIG_H
-#include <config.h>
-#endif
-
-#include "clientimpl.h"
-#include "channel.h"
-#include "publisherimpl.h"
-#include "subscriberimpl.h"
-#include "simplesubscriberimpl.h"
-#include "multiplexsubscriberimpl.h"
-#include <log4cxx/logger.h>
-
-static log4cxx::LoggerPtr logger(log4cxx::Logger::getLogger("hedwig."__FILE__));
-
-using namespace Hedwig;
-
-const int DEFAULT_MESSAGE_FORCE_CONSUME_RETRY_WAIT_TIME = 5000;
-const std::string DEFAULT_SERVER_DEFAULT_VAL = "";
-const bool DEFAULT_SSL_ENABLED = false;
-
-void SyncOperationCallback::wait() {
-  boost::unique_lock<boost::mutex> lock(mut);
-  while(response==PENDING) {
-    if (cond.timed_wait(lock, boost::posix_time::milliseconds(timeout)) == false) {
-      LOG4CXX_ERROR(logger, "Timeout waiting for operation to complete " << this);
-
-      response = TIMEOUT;
-    }
-  }
-}
-
-void SyncOperationCallback::operationComplete() {
-  if (response == TIMEOUT) {
-    LOG4CXX_ERROR(logger, "operationCompleted successfully after timeout " << this);
-    return;
-  }
-
-  {
-    boost::lock_guard<boost::mutex> lock(mut);
-    response = SUCCESS;
-  }
-  cond.notify_all();
-}
-
-void SyncOperationCallback::operationFailed(const std::exception& exception) {
-  if (response == TIMEOUT) {
-    LOG4CXX_ERROR(logger, "operationCompleted unsuccessfully after timeout " << this);
-    return;
-  }
-
-  {
-    boost::lock_guard<boost::mutex> lock(mut);
-    
-    if (typeid(exception) == typeid(ChannelConnectException)) {
-      response = NOCONNECT;
-    } else if (typeid(exception) == typeid(ServiceDownException)) {
-      response = SERVICEDOWN;
-    } else if (typeid(exception) == typeid(AlreadySubscribedException)) {
-      response = ALREADY_SUBSCRIBED;
-    } else if (typeid(exception) == typeid(NotSubscribedException)) {
-      response = NOT_SUBSCRIBED;
-    } else {
-      response = UNKNOWN;
-    }
-  }
-  cond.notify_all();
-}
-
-void SyncOperationCallback::throwExceptionIfNeeded() {
-  switch (response) {
-  case SUCCESS:
-    break;
-  case NOCONNECT:
-    throw CannotConnectException();
-    break;
-  case SERVICEDOWN:
-    throw ServiceDownException();
-    break;
-  case ALREADY_SUBSCRIBED:
-    throw AlreadySubscribedException();
-    break;
-  case NOT_SUBSCRIBED:
-    throw NotSubscribedException();
-    break;
-  case TIMEOUT:
-    throw ClientTimeoutException();
-    break;
-  default:
-    throw ClientException();
-    break;
-  }
-}
-
-ResponseHandler::ResponseHandler(const DuplexChannelManagerPtr& channelManager)
-  : channelManager(channelManager) {
-}
-
-void ResponseHandler::redirectRequest(const PubSubResponsePtr& response,
-                                      const PubSubDataPtr& data,
-                                      const DuplexChannelPtr& channel) {
-  HostAddress oldhost = channel->getHostAddress();
-  data->addTriedServer(oldhost);
-
-  HostAddress h;
-  bool redirectToDefaultHost = true;
-  try {
-    if (response->has_statusmsg()) {
-      try {
-        h = HostAddress::fromString(response->statusmsg());
-        redirectToDefaultHost = false;
-      } catch (std::exception& e) {
-        h = channelManager->getDefaultHost();
-      }
-    } else {
-      h = channelManager->getDefaultHost();
-    }
-  } catch (std::exception& e) {
-    LOG4CXX_ERROR(logger, "Failed to retrieve redirected host of request " << *data
-                          << " : " << e.what());
-    data->getCallback()->operationFailed(InvalidRedirectException());
-    return;
-  }
-  if (data->hasTriedServer(h)) {
-    LOG4CXX_ERROR(logger, "We've been told to try request [" << data->getTxnId() << "] with [" 
-		                      << h.getAddressString()<< "] by " << oldhost.getAddressString() 
-		                      << " but we've already tried that. Failing operation");
-    data->getCallback()->operationFailed(InvalidRedirectException());
-    return;
-  }
-  LOG4CXX_INFO(logger, "We've been told  [" << data->getTopic() << "] is on [" << h.getAddressString() 
-		                   << "] by [" << oldhost.getAddressString() << "]. Redirecting request "
-                       << data->getTxnId());
-  data->setShouldClaim(true);
-
-  // submit the request again to the target host
-  if (redirectToDefaultHost) {
-    channelManager->submitOpToDefaultServer(data);
-  } else {
-    channelManager->redirectOpToHost(data, h);
-  }
-}
-
-HedwigClientChannelHandler::HedwigClientChannelHandler(const DuplexChannelManagerPtr& channelManager,
-                                                       ResponseHandlerMap& handlers)
-  : channelManager(channelManager), handlers(handlers), closed(false), disconnected(false) {
-}
-
-void HedwigClientChannelHandler::messageReceived(const DuplexChannelPtr& channel, const PubSubResponsePtr& m) {
-  LOG4CXX_DEBUG(logger, "Message received txnid(" << m->txnid() << ") status(" 
-		<< m->statuscode() << ")");
-  if (m->has_message()) {
-    LOG4CXX_ERROR(logger, "Subscription response, ignore for now");
-    return;
-  }
-  
-  PubSubDataPtr data = channel->retrieveTransaction(m->txnid()); 
-  /* you now have ownership of data, don't leave this funciton without deleting it or 
-     palming it off to someone else */
-
-  if (data.get() == 0) {
-    LOG4CXX_ERROR(logger, "No pub/sub request for txnid(" << m->txnid() << ").");
-    return;
-  }
-
-  // Store the topic2Host mapping if this wasn't a server redirect.
-  // TODO: add specific response for failure of getting topic ownership
-  //       to distinguish SERVICE_DOWN to failure of getting topic ownership
-  if (m->statuscode() != NOT_RESPONSIBLE_FOR_TOPIC) {
-    const HostAddress& host = channel->getHostAddress();
-    channelManager->setHostForTopic(data->getTopic(), host);
-  }
-
-  const ResponseHandlerPtr& respHandler = handlers[data->getType()];
-  if (respHandler.get()) {
-    respHandler->handleResponse(m, data, channel);
-  } else {
-    LOG4CXX_ERROR(logger, "Unimplemented request type " << data->getType() << " : "
-                          << *data);
-    data->getCallback()->operationFailed(UnknownRequestException());
-  }
-}
-
-void HedwigClientChannelHandler::channelConnected(const DuplexChannelPtr& channel) {
-  // do nothing 
-}
-
-void HedwigClientChannelHandler::channelDisconnected(const DuplexChannelPtr& channel,
-                                                     const std::exception& e) {
-  if (channelManager->isClosed()) {
-    return;
-  }
-
-  // If this channel was closed explicitly by the client code,
-  // we do not need to do any of this logic. This could happen
-  // for redundant Publish channels created or redirected subscribe
-  // channels that are not used anymore or when we shutdown the
-  // client and manually close all of the open channels.
-  // Also don't do any of the disconnect logic if the client has stopped.
-  {
-    boost::lock_guard<boost::shared_mutex> lock(close_lock);
-    if (closed) {
-      return;
-    }
-    if (disconnected) {
-      return;
-    }
-    disconnected = true;
-  }
-  LOG4CXX_INFO(logger, "Channel " << channel.get() << " was disconnected.");
-  // execute logic after channel disconnected
-  onChannelDisconnected(channel);
-}
-
-void HedwigClientChannelHandler::onChannelDisconnected(const DuplexChannelPtr& channel) {
-  // Clean up the channel from channel manager
-  channelManager->nonSubscriptionChannelDied(channel);
-}
-
-void HedwigClientChannelHandler::exceptionOccurred(const DuplexChannelPtr& channel, const std::exception& e) {
-  LOG4CXX_ERROR(logger, "Exception occurred" << e.what());
-}
-
-void HedwigClientChannelHandler::close() {
-  {
-    boost::lock_guard<boost::shared_mutex> lock(close_lock);
-    if (closed) {
-      return;
-    }
-    closed = true;
-  }
-  // do close handle logic here
-  doClose();
-}
-
-void HedwigClientChannelHandler::doClose() {
-  // do nothing for generic client channel handler
-}
-
-//
-// Pub/Sub Request Write Callback
-//
-PubSubWriteCallback::PubSubWriteCallback(const DuplexChannelPtr& channel,
-                                         const PubSubDataPtr& data)
-  : channel(channel), data(data) {
-}
-
-void PubSubWriteCallback::operationComplete() {
-  LOG4CXX_INFO(logger, "Successfully wrote pubsub request : " << *data << " to channel "
-                       << channel.get());
-}
-
-void PubSubWriteCallback::operationFailed(const std::exception& exception) {
-  LOG4CXX_ERROR(logger, "Error writing pubsub request (" << *data << ") : " << exception.what());
-
-  // remove the transaction from channel if write failed
-  channel->retrieveTransaction(data->getTxnId());
-  data->getCallback()->operationFailed(exception);
-}
-
-//
-// Default Server Connect Callback
-//
-DefaultServerConnectCallback::DefaultServerConnectCallback(const DuplexChannelManagerPtr& channelManager,
-                                                           const DuplexChannelPtr& channel,
-                                                           const PubSubDataPtr& data)
-  : channelManager(channelManager), channel(channel), data(data) {
-}
-
-void DefaultServerConnectCallback::operationComplete() {
-  LOG4CXX_DEBUG(logger, "Channel " << channel.get() << " is connected to host "
-                        << channel->getHostAddress() << ".");
-  // After connected, we got the right ip for the target host
-  // so we could submit the request right now
-  channelManager->submitOpThruChannel(data, channel);
-}
-
-void DefaultServerConnectCallback::operationFailed(const std::exception& exception) {
-  LOG4CXX_ERROR(logger, "Channel " << channel.get() << " failed to connect to host "
-                        << channel->getHostAddress() << " : " << exception.what());
-  data->getCallback()->operationFailed(exception);
-}
-
-//
-// Subscription Event Emitter
-//
-SubscriptionEventEmitter::SubscriptionEventEmitter() {}
-
-void SubscriptionEventEmitter::addSubscriptionListener(
-  SubscriptionListenerPtr& listener) {
-  boost::lock_guard<boost::shared_mutex> lock(listeners_lock);
-  listeners.insert(listener);
-}
-
-void SubscriptionEventEmitter::removeSubscriptionListener(
-  SubscriptionListenerPtr& listener) {
-  boost::lock_guard<boost::shared_mutex> lock(listeners_lock);
-  listeners.erase(listener);
-}
-
-void SubscriptionEventEmitter::emitSubscriptionEvent(
-  const std::string& topic, const std::string& subscriberId,
-  const SubscriptionEvent event) {
-  boost::shared_lock<boost::shared_mutex> lock(listeners_lock);
-  if (0 == listeners.size()) {
-    return;
-  }
-  for (SubscriptionListenerSet::iterator iter = listeners.begin();
-       iter != listeners.end(); ++iter) {
-    (*iter)->processEvent(topic, subscriberId, event);
-  }
-}
-
-//
-// Channel Manager Used to manage all established channels
-//
-
-DuplexChannelManagerPtr DuplexChannelManager::create(const Configuration& conf) {
-  DuplexChannelManager * managerPtr;
-  if (conf.getBool(Configuration::SUBSCRIPTION_CHANNEL_SHARING_ENABLED, false)) {
-    managerPtr = new MultiplexDuplexChannelManager(conf);
-  } else {
-    managerPtr = new SimpleDuplexChannelManager(conf);
-  }
-  DuplexChannelManagerPtr manager(managerPtr);
-  LOG4CXX_DEBUG(logger, "Created DuplexChannelManager " << manager.get());
-  return manager;
-}
-
-DuplexChannelManager::DuplexChannelManager(const Configuration& conf)
-  : dispatcher(new EventDispatcher(conf)), conf(conf), closed(false), counterobj(),
-    defaultHostAddress(conf.get(Configuration::DEFAULT_SERVER,
-                                DEFAULT_SERVER_DEFAULT_VAL)) {
-  sslEnabled = conf.getBool(Configuration::SSL_ENABLED, DEFAULT_SSL_ENABLED); 
-  if (sslEnabled) {
-    sslCtxFactory = SSLContextFactoryPtr(new SSLContextFactory(conf));
-  }
-  LOG4CXX_DEBUG(logger, "Created DuplexChannelManager " << this << " with default server "
-                        << defaultHostAddress);
-}
-
-DuplexChannelManager::~DuplexChannelManager() {
-  LOG4CXX_DEBUG(logger, "Destroyed DuplexChannelManager " << this);
-}
-
-void DuplexChannelManager::submitTo(const PubSubDataPtr& op, const DuplexChannelPtr& channel) {
-  if (channel.get()) {
-    channel->storeTransaction(op);
-    OperationCallbackPtr writecb(new PubSubWriteCallback(channel, op));
-    LOG4CXX_DEBUG(logger, "Submit pub/sub request " << *op << " thru channel " << channel.get());
-    channel->writeRequest(op->getRequest(), writecb);
-  } else {
-    submitOpToDefaultServer(op);
-  }
-}
-    
-// Submit a pub/sub request
-void DuplexChannelManager::submitOp(const PubSubDataPtr& op) {
-  DuplexChannelPtr channel;
-  switch (op->getType()) {
-  case PUBLISH:
-  case UNSUBSCRIBE:
-    try {
-      channel = getNonSubscriptionChannel(op->getTopic());  
-    } catch (std::exception& e) {
-      LOG4CXX_ERROR(logger, "Failed to submit request " << *op << " : " << e.what());
-      op->getCallback()->operationFailed(e);
-      return;
-    }
-    break;
-  default:
-    TopicSubscriber ts(op->getTopic(), op->getSubscriberId());
-    channel = getSubscriptionChannel(ts, op->isResubscribeRequest());
-    break;
-  }
-  // write the pub/sub request
-  submitTo(op, channel);
-}
-
-// Submit a pub/sub request to target host
-void DuplexChannelManager::redirectOpToHost(const PubSubDataPtr& op, const HostAddress& addr) {
-  DuplexChannelPtr channel;
-  switch (op->getType()) {
-  case PUBLISH:
-  case UNSUBSCRIBE:
-    // check whether there is a channel existed for non-subscription requests
-    channel = getNonSubscriptionChannel(addr);
-    if (!channel.get()) {
-      channel = createNonSubscriptionChannel(addr);
-      channel = storeNonSubscriptionChannel(channel, true);
-    }
-    break;
-  default:
-    channel = getSubscriptionChannel(addr);
-    if (!channel.get()) {
-      channel = createSubscriptionChannel(addr);
-      channel = storeSubscriptionChannel(channel, true);
-    }
-    break;
-  }
-  // write the pub/sub request
-  submitTo(op, channel);
-}
-
-// Submit a pub/sub request to established request
-void DuplexChannelManager::submitOpThruChannel(const PubSubDataPtr& op,
-                                               const DuplexChannelPtr& ch) {
-  DuplexChannelPtr channel;
-  switch (op->getType()) {
-  case PUBLISH:
-  case UNSUBSCRIBE:
-    channel = storeNonSubscriptionChannel(ch, false);
-    break;
-  default:
-    channel = storeSubscriptionChannel(ch, false);
-    break;
-  }
-  // write the pub/sub request
-  submitTo(op, channel);
-}
-
-// Submit a pub/sub request to default server
-void DuplexChannelManager::submitOpToDefaultServer(const PubSubDataPtr& op) {
-  DuplexChannelPtr channel;
-  try {
-    switch (op->getType()) {
-    case PUBLISH:
-    case UNSUBSCRIBE:
-      channel = createNonSubscriptionChannel(getDefaultHost());
-      break;
-    default:
-      channel = createSubscriptionChannel(getDefaultHost());
-      break;
-    }
-  } catch (std::exception& e) {
-    LOG4CXX_ERROR(logger, "Failed to create channel to default host " << defaultHostAddress
-                          << " for request " << op << " : " << e.what());
-    op->getCallback()->operationFailed(e);
-    return;
-  }
-  OperationCallbackPtr connectCallback(new DefaultServerConnectCallback(shared_from_this(),
-                                                                        channel, op));
-  // connect to default server. usually default server is a VIP, we only got the real
-  // IP address after connected. so before connected, we don't know the real target host.
-  // we only submit the request after channel is connected (ip address would be updated).
-  channel->connect(connectCallback);
-}
-
-DuplexChannelPtr DuplexChannelManager::getNonSubscriptionChannel(const std::string& topic) {
-  HostAddress addr;
-  {
-    boost::shared_lock<boost::shared_mutex> lock(topic2host_lock);
-    addr = topic2host[topic];
-  }
-  if (addr.isNullHost()) {
-    return DuplexChannelPtr();
-  } else {
-    // we had known which hub server owned the topic
-    DuplexChannelPtr ch = getNonSubscriptionChannel(addr);
-    if (ch.get()) {
-      return ch;
-    }
-    ch = createNonSubscriptionChannel(addr);
-    return storeNonSubscriptionChannel(ch, true);
-  }
-}
-
-DuplexChannelPtr DuplexChannelManager::getNonSubscriptionChannel(const HostAddress& addr) {
-  boost::shared_lock<boost::shared_mutex> lock(host2channel_lock);
-  return host2channel[addr];
-}
-
-DuplexChannelPtr DuplexChannelManager::createNonSubscriptionChannel(const HostAddress& addr) {
-  // Create a non-subscription channel handler
-  ChannelHandlerPtr handler(new HedwigClientChannelHandler(shared_from_this(),
-                                                           nonSubscriptionHandlers));
-  // Create a non subscription channel
-  return createChannel(dispatcher->getService(), addr, handler);
-}
-
-DuplexChannelPtr DuplexChannelManager::storeNonSubscriptionChannel(const DuplexChannelPtr& ch,
-                                                                   bool doConnect) {
-  const HostAddress& host = ch->getHostAddress();
-
-  bool useOldCh;
-  DuplexChannelPtr oldCh;
-  {
-    boost::lock_guard<boost::shared_mutex> lock(host2channel_lock);
-
-    oldCh = host2channel[host];
-    if (!oldCh.get()) {
-      host2channel[host] = ch;
-      useOldCh = false;
-    } else {
-      // If we've reached here, that means we already have a Channel
-      // mapping for the given host. This should ideally not happen
-      // and it means we are creating another Channel to a server host
-      // to publish on when we could have used an existing one. This could
-      // happen due to a race condition if initially multiple concurrent
-      // threads are publishing on the same topic and no Channel exists
-      // currently to the server. We are not synchronizing this initial
-      // creation of Channels to a given host for performance.
-      // Another possible way to have redundant Channels created is if
-      // a new topic is being published to, we connect to the default
-      // server host which should be a VIP that redirects to a "real"
-      // server host. Since we don't know beforehand what is the full
-      // set of server hosts, we could be redirected to a server that
-      // we already have a channel connection to from a prior existing
-      // topic. Close these redundant channels as they won't be used.
-      useOldCh = true;
-    }
-  } 
-  if (useOldCh) {
-    LOG4CXX_DEBUG(logger, "Channel " << oldCh.get() << " to host " << host
-                          << " already exists so close channel " << ch.get() << ".");
-    ch->close();
-    return oldCh;
-  } else {
-    if (doConnect) {
-      ch->connect();
-    }
-    LOG4CXX_DEBUG(logger, "Storing channel " << ch.get() << " for host " << host << ".");
-    return ch;
-  }
-}
-
-DuplexChannelPtr DuplexChannelManager::createChannel(IOServicePtr& service,
-                                                     const HostAddress& addr,
-                                                     const ChannelHandlerPtr& handler) {
-  DuplexChannelPtr channel;
-  if (sslEnabled) {
-    boost_ssl_context_ptr sslCtx = sslCtxFactory->createSSLContext(service->getService());
-    channel = DuplexChannelPtr(new AsioSSLDuplexChannel(service, sslCtx, addr, handler));
-  } else {
-    channel = DuplexChannelPtr(new AsioDuplexChannel(service, addr, handler));
-  }
-
-  boost::lock_guard<boost::shared_mutex> lock(allchannels_lock);
-  if (closed) {
-    channel->close();
-    throw ShuttingDownException();
-  }
-  allchannels.insert(channel);
-  LOG4CXX_DEBUG(logger, "Created a channel to " << addr << ", all channels : " << allchannels.size());
-
-  return channel;
-}
-
-long DuplexChannelManager::nextTxnId() {
-  return counterobj.next();
-}
-
-void DuplexChannelManager::setHostForTopic(const std::string& topic, const HostAddress& host) {
-  boost::lock_guard<boost::shared_mutex> h2clock(host2topics_lock);
-  boost::lock_guard<boost::shared_mutex> t2hlock(topic2host_lock);
-  topic2host[topic] = host;
-  TopicSetPtr ts = host2topics[host];
-  if (!ts.get()) {
-    ts = TopicSetPtr(new TopicSet());
-    host2topics[host] = ts;
-  }
-  ts->insert(topic);
-  LOG4CXX_DEBUG(logger, "Set ownership of topic " << topic << " to " << host << ".");
-}
-
-void DuplexChannelManager::clearAllTopicsForHost(const HostAddress& addr) {
-  // remove topic mapping
-  boost::lock_guard<boost::shared_mutex> h2tlock(host2topics_lock);
-  boost::lock_guard<boost::shared_mutex> t2hlock(topic2host_lock);
-  Host2TopicsMap::iterator iter = host2topics.find(addr);
-  if (iter != host2topics.end()) {
-    for (TopicSet::iterator tsIter = iter->second->begin();
-         tsIter != iter->second->end(); ++tsIter) {
-      topic2host.erase(*tsIter);
-    }
-    host2topics.erase(iter);
-  }
-}
-
-void DuplexChannelManager::clearHostForTopic(const std::string& topic,
-                                             const HostAddress& addr) {
-  // remove topic mapping
-  boost::lock_guard<boost::shared_mutex> h2tlock(host2topics_lock);
-  boost::lock_guard<boost::shared_mutex> t2hlock(topic2host_lock);
-  Host2TopicsMap::iterator iter = host2topics.find(addr);
-  if (iter != host2topics.end()) {
-    iter->second->erase(topic);
-  }
-  HostAddress existed = topic2host[topic];
-  if (existed == addr) {
-    topic2host.erase(topic);
-  }
-}
-
-const HostAddress& DuplexChannelManager::getHostForTopic(const std::string& topic) {
-  boost::shared_lock<boost::shared_mutex> t2hlock(topic2host_lock);
-  return topic2host[topic];
-}
-
-/**
-   A channel has just died. Remove it so we never give it to any other publisher or subscriber.
-   
-   This does not delete the channel. Some publishers or subscribers will still hold it and will be errored
-   when they try to do anything with it. 
-*/
-void DuplexChannelManager::nonSubscriptionChannelDied(const DuplexChannelPtr& channel) {
-  // get host
-  HostAddress addr = channel->getHostAddress();
-
-  // Clear the topic owner ship when a nonsubscription channel disconnected
-  clearAllTopicsForHost(addr);
-
-  // remove channel mapping
-  {
-    boost::lock_guard<boost::shared_mutex> h2clock(host2channel_lock);
-    host2channel.erase(addr);
-  }
-  removeChannel(channel);
-}
-
-void DuplexChannelManager::removeChannel(const DuplexChannelPtr& channel) {
-  {
-    boost::lock_guard<boost::shared_mutex> aclock(allchannels_lock);
-    allchannels.erase(channel); // channel should be deleted here
-  }
-  channel->close();
-}
-
-void DuplexChannelManager::start() {
-  // add non-subscription response handlers
-  nonSubscriptionHandlers[PUBLISH] =
-    ResponseHandlerPtr(new PublishResponseHandler(shared_from_this()));
-  nonSubscriptionHandlers[UNSUBSCRIBE] =
-    ResponseHandlerPtr(new UnsubscribeResponseHandler(shared_from_this()));
-
-  // start the dispatcher
-  dispatcher->start();
-}
-
-bool DuplexChannelManager::isClosed() {
-  boost::shared_lock<boost::shared_mutex> lock(allchannels_lock);
-  return closed;
-}
-
-void DuplexChannelManager::close() {
-  // stop the dispatcher
-  dispatcher->stop();
-  {
-    boost::lock_guard<boost::shared_mutex> lock(allchannels_lock);
-    
-    closed = true;
-    for (ChannelMap::iterator iter = allchannels.begin(); iter != allchannels.end(); ++iter ) {
-      (*iter)->close();
-    }  
-    allchannels.clear();
-  }
-
-  // Unregistered response handlers
-  nonSubscriptionHandlers.clear();
-  /* destruction of the maps will clean up any items they hold */
-}
-
-ClientImplPtr ClientImpl::Create(const Configuration& conf) {
-  ClientImplPtr impl(new ClientImpl(conf));
-  LOG4CXX_DEBUG(logger, "Creating Clientimpl " << impl);
-  impl->channelManager->start();
-  return impl;
-}
-
-void ClientImpl::Destroy() {
-  LOG4CXX_DEBUG(logger, "destroying Clientimpl " << this);
-
-  // close the channel manager
-  channelManager->close();
-
-  if (subscriber != NULL) {
-    delete subscriber;
-    subscriber = NULL;
-  }
-  if (publisher != NULL) {
-    delete publisher;
-    publisher = NULL;
-  }
-}
-
-ClientImpl::ClientImpl(const Configuration& conf) 
-  : conf(conf), publisher(NULL), subscriber(NULL)
-{
-  channelManager = DuplexChannelManager::create(conf);
-}
-
-Subscriber& ClientImpl::getSubscriber() {
-  return getSubscriberImpl();
-}
-
-Publisher& ClientImpl::getPublisher() {
-  return getPublisherImpl();
-}
-    
-SubscriberImpl& ClientImpl::getSubscriberImpl() {
-  if (subscriber == NULL) {
-    boost::lock_guard<boost::mutex> lock(subscribercreate_lock);
-    if (subscriber == NULL) {
-      subscriber = new SubscriberImpl(channelManager);
-    }
-  }
-  return *subscriber;
-}
-
-PublisherImpl& ClientImpl::getPublisherImpl() {
-  if (publisher == NULL) { 
-    boost::lock_guard<boost::mutex> lock(publishercreate_lock);
-    if (publisher == NULL) {
-      publisher = new PublisherImpl(channelManager);
-    }
-  }
-  return *publisher;
-}
-
-ClientImpl::~ClientImpl() {
-  LOG4CXX_DEBUG(logger, "deleting Clientimpl " << this);
-}

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-client/src/main/cpp/lib/clientimpl.h
----------------------------------------------------------------------
diff --git a/hedwig-client/src/main/cpp/lib/clientimpl.h b/hedwig-client/src/main/cpp/lib/clientimpl.h
deleted file mode 100644
index fd7915c..0000000
--- a/hedwig-client/src/main/cpp/lib/clientimpl.h
+++ /dev/null
@@ -1,493 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-#ifndef HEDWIG_CLIENT_IMPL_H
-#define HEDWIG_CLIENT_IMPL_H
-
-#include <hedwig/client.h>
-#include <hedwig/protocol.h>
-
-#include <boost/asio.hpp>
-#include <boost/shared_ptr.hpp>
-#include <boost/enable_shared_from_this.hpp>
-#include <boost/thread/mutex.hpp>
-#include <boost/thread/condition_variable.hpp>
-
-#ifdef USE_BOOST_TR1
-#include <boost/tr1/unordered_map.hpp>
-#else
-#include <tr1/unordered_map>
-#endif
-
-#include <list>
-
-#include "util.h"
-#include "channel.h"
-#include "data.h"
-#include "eventdispatcher.h"
-
-namespace Hedwig {
-  const int DEFAULT_SYNC_REQUEST_TIMEOUT = 5000;
-
-  template<class R>
-  class SyncCallback : public Callback<R> {
-  public:
-    SyncCallback(int timeout) : response(PENDING), timeout(timeout) {}
-    virtual void operationComplete(const R& r) {
-      if (response == TIMEOUT) {
-        return;
-      }
-
-      {
-        boost::lock_guard<boost::mutex> lock(mut);
-        response = SUCCESS;
-        result = r;
-      }
-      cond.notify_all();
-    }
-
-    virtual void operationFailed(const std::exception& exception) {
-      if (response == TIMEOUT) {
-        return;
-      }
-
-      {
-        boost::lock_guard<boost::mutex> lock(mut);
-
-        if (typeid(exception) == typeid(ChannelConnectException)) {
-          response = NOCONNECT;
-        } else if (typeid(exception) == typeid(ServiceDownException)) {
-          response = SERVICEDOWN;
-        } else if (typeid(exception) == typeid(AlreadySubscribedException)) {
-          response = ALREADY_SUBSCRIBED;
-        } else if (typeid(exception) == typeid(NotSubscribedException)) {
-          response = NOT_SUBSCRIBED;
-        } else {
-          response = UNKNOWN;
-        }
-      }
-      cond.notify_all();
-    }
-
-    void wait() {
-      boost::unique_lock<boost::mutex> lock(mut);
-      while(response==PENDING) {
-        if (cond.timed_wait(lock, boost::posix_time::milliseconds(timeout)) == false) {
-          response = TIMEOUT;
-        }
-      }
-    }
-
-    void throwExceptionIfNeeded() {
-      switch (response) {
-        case SUCCESS:
-          break;
-        case NOCONNECT:
-          throw CannotConnectException();
-          break;
-        case SERVICEDOWN:
-          throw ServiceDownException();
-          break;
-        case ALREADY_SUBSCRIBED:
-          throw AlreadySubscribedException();
-          break;
-        case NOT_SUBSCRIBED:
-          throw NotSubscribedException();
-          break;
-        case TIMEOUT:
-          throw ClientTimeoutException();
-          break;
-        default:
-          throw ClientException();
-          break;
-      }
-    }
-
-    R getResult() { return result; }
-    
-  private:
-    enum { 
-      PENDING, 
-      SUCCESS,
-      NOCONNECT,
-      SERVICEDOWN,
-      NOT_SUBSCRIBED,
-      ALREADY_SUBSCRIBED,
-      TIMEOUT,
-      UNKNOWN
-    } response;
-
-    boost::condition_variable cond;
-    boost::mutex mut;
-    int timeout;
-    R result;
-  };
-
-  class SyncOperationCallback : public OperationCallback {
-  public:
-    SyncOperationCallback(int timeout) : response(PENDING), timeout(timeout) {}
-    virtual void operationComplete();
-    virtual void operationFailed(const std::exception& exception);
-    
-    void wait();
-    void throwExceptionIfNeeded();
-    
-  private:
-    enum { 
-      PENDING, 
-      SUCCESS,
-      NOCONNECT,
-      SERVICEDOWN,
-      NOT_SUBSCRIBED,
-      ALREADY_SUBSCRIBED,
-      TIMEOUT,
-      UNKNOWN
-    } response;
-
-    boost::condition_variable cond;
-    boost::mutex mut;
-    int timeout;
-  };
-
-  class DuplexChannelManager;
-  typedef boost::shared_ptr<DuplexChannelManager> DuplexChannelManagerPtr;
-
-  //
-  // Hedwig Response Handler
-  //
-
-  // Response Handler used to process response for different types of requests
-  class ResponseHandler {
-  public:
-    ResponseHandler(const DuplexChannelManagerPtr& channelManager); 
-    virtual ~ResponseHandler() {};
-
-    virtual void handleResponse(const PubSubResponsePtr& m, const PubSubDataPtr& txn,
-                                const DuplexChannelPtr& channel) = 0;
-  protected:
-    // common method used to redirect request
-    void redirectRequest(const PubSubResponsePtr& response, const PubSubDataPtr& data,
-                         const DuplexChannelPtr& channel);
-
-    // channel manager to manage all established channels
-    const DuplexChannelManagerPtr channelManager;
-  };
-
-  typedef boost::shared_ptr<ResponseHandler> ResponseHandlerPtr;
-  typedef std::tr1::unordered_map<OperationType, ResponseHandlerPtr, OperationTypeHash> ResponseHandlerMap;
-
-  class PubSubWriteCallback : public OperationCallback {
-  public:
-    PubSubWriteCallback(const DuplexChannelPtr& channel, const PubSubDataPtr& data); 
-    virtual void operationComplete();
-    virtual void operationFailed(const std::exception& exception);
-  private:
-    DuplexChannelPtr channel;
-    PubSubDataPtr data;
-  };
-
-  class DefaultServerConnectCallback : public OperationCallback {
-  public:
-    DefaultServerConnectCallback(const DuplexChannelManagerPtr& channelManager,
-                                 const DuplexChannelPtr& channel,
-                                 const PubSubDataPtr& data);
-    virtual void operationComplete();
-    virtual void operationFailed(const std::exception& exception);
-  private:
-    DuplexChannelManagerPtr channelManager;
-    DuplexChannelPtr channel;
-    PubSubDataPtr data;
-  };
-
-  struct SubscriptionListenerPtrHash : public std::unary_function<SubscriptionListenerPtr, size_t> {
-    size_t operator()(const Hedwig::SubscriptionListenerPtr& listener) const {
-      return reinterpret_cast<size_t>(listener.get());
-    }
-  };
-
-  // Subscription Event Emitter
-  class SubscriptionEventEmitter {
-  public:
-    SubscriptionEventEmitter();
-
-    void addSubscriptionListener(SubscriptionListenerPtr& listener);
-    void removeSubscriptionListener(SubscriptionListenerPtr& listener);
-    void emitSubscriptionEvent(const std::string& topic,
-                               const std::string& subscriberId,
-                               const SubscriptionEvent event);
-
-  private:
-    typedef std::tr1::unordered_set<SubscriptionListenerPtr, SubscriptionListenerPtrHash> SubscriptionListenerSet;
-    SubscriptionListenerSet listeners;
-    boost::shared_mutex listeners_lock;
-  };
-
-  class SubscriberClientChannelHandler;
-
-  //
-  // Duplex Channel Manager to manage all established channels
-  //
-
-  class DuplexChannelManager : public boost::enable_shared_from_this<DuplexChannelManager> {
-  public:
-    static DuplexChannelManagerPtr create(const Configuration& conf);
-    virtual ~DuplexChannelManager();
-
-    inline const Configuration& getConfiguration() const {
-      return conf;
-    }
-
-    // Submit a pub/sub request
-    void submitOp(const PubSubDataPtr& op);
-
-    // Submit a pub/sub request to default host
-    // It is called only when client doesn't have the knowledge of topic ownership
-    void submitOpToDefaultServer(const PubSubDataPtr& op);
-
-    // Redirect pub/sub request to a target hosts
-    void redirectOpToHost(const PubSubDataPtr& op, const HostAddress& host);
-
-    // Submit a pub/sub request thru established channel
-    // It is called when connecting to default server to established a channel
-    void submitOpThruChannel(const PubSubDataPtr& op, const DuplexChannelPtr& channel);
-
-    // Generate next transaction id for pub/sub requests sending thru this manager
-    long nextTxnId();
-
-    // return default host
-    inline const HostAddress getDefaultHost() {
-      return HostAddress::fromString(defaultHostAddress);
-    }
-
-    // set the owner host of a topic
-    void setHostForTopic(const std::string& topic, const HostAddress& host);
-
-    // clear all topics that hosted by a hub server
-    void clearAllTopicsForHost(const HostAddress& host);
-
-    // clear host for a given topic
-    void clearHostForTopic(const std::string& topic, const HostAddress& host);
-
-    // Called when a channel is disconnected
-    void nonSubscriptionChannelDied(const DuplexChannelPtr& channel);
-
-    // Remove a channel from all channel map
-    void removeChannel(const DuplexChannelPtr& channel);
-
-    // Get the subscription channel handler for a given subscription
-    virtual boost::shared_ptr<SubscriberClientChannelHandler>
-            getSubscriptionChannelHandler(const TopicSubscriber& ts) = 0;
-
-    // Close subscription for a given subscription
-    virtual void asyncCloseSubscription(const TopicSubscriber& ts,
-                                        const OperationCallbackPtr& callback) = 0;
-
-    virtual void handoverDelivery(const TopicSubscriber& ts,
-                                  const MessageHandlerCallbackPtr& handler,
-                                  const ClientMessageFilterPtr& filter) = 0;
-
-    // start the channel manager
-    virtual void start();
-    // close the channel manager
-    virtual void close();
-    // whether the channel manager is closed
-    bool isClosed();
-
-    // Return an available service
-    inline boost::asio::io_service & getService() const {
-      return dispatcher->getService()->getService();
-    }
-
-    // Return the event emitter
-    inline SubscriptionEventEmitter& getEventEmitter() {
-      return eventEmitter;
-    }
-
-  protected:
-    DuplexChannelManager(const Configuration& conf);
-
-    // Get the ownership for a given topic.
-    const HostAddress& getHostForTopic(const std::string& topic);
-
-    //
-    // Channel Management
-    //
-
-    // Non subscription channel management
-
-    // Get a non subscription channel for a given topic
-    // If the topic's owner is known, retrieve a subscription channel to
-    // target host (if there is no channel existed, create one);
-    // If the topic's owner is unknown, return null
-    DuplexChannelPtr getNonSubscriptionChannel(const std::string& topic);
-
-    // Get an existed non subscription channel to a given host
-    DuplexChannelPtr getNonSubscriptionChannel(const HostAddress& addr);
-
-    // Create a non subscription channel to a given host
-    DuplexChannelPtr createNonSubscriptionChannel(const HostAddress& addr);
-
-    // Store the established non subscription channel
-    DuplexChannelPtr storeNonSubscriptionChannel(const DuplexChannelPtr& ch,
-                                                 bool doConnect); 
-
-    //
-    // Subscription Channel Management
-    //
-
-    // Get a subscription channel for a given subscription.
-    // If there is subscription channel established before, return it.
-    // Otherwise, check whether the topic's owner is known. If the topic owner
-    // is known, retrieve a subscription channel to target host (if there is no
-    // channel exsited, create one); If unknown, return null
-    virtual DuplexChannelPtr getSubscriptionChannel(const TopicSubscriber& ts,
-                                                    const bool isResubscribeRequest) = 0;
-
-    // Get an existed subscription channel to a given host
-    virtual DuplexChannelPtr getSubscriptionChannel(const HostAddress& addr) = 0;
-
-    // Create a subscription channel to a given host
-    // If store is true, store the channel for future usage.
-    // If store is false, return a newly created channel.
-    virtual DuplexChannelPtr createSubscriptionChannel(const HostAddress& addr) = 0;
-
-    // Store the established subscription channel
-    virtual DuplexChannelPtr storeSubscriptionChannel(const DuplexChannelPtr& ch,
-                                                      bool doConnect) = 0;
-
-    //
-    // Raw Channel Management
-    //
-
-    // Create a raw channel
-    DuplexChannelPtr createChannel(IOServicePtr& service,
-                                   const HostAddress& addr, const ChannelHandlerPtr& handler);
-
-    // event dispatcher running io threads
-    typedef boost::shared_ptr<EventDispatcher> EventDispatcherPtr;
-    EventDispatcherPtr dispatcher;
-
-    // topic2host mapping for topic ownership
-    std::tr1::unordered_map<std::string, HostAddress> topic2host;
-    boost::shared_mutex topic2host_lock;
-    typedef std::tr1::unordered_set<std::string> TopicSet;
-    typedef boost::shared_ptr<TopicSet> TopicSetPtr;
-    typedef std::tr1::unordered_map<HostAddress, TopicSetPtr, HostAddressHash> Host2TopicsMap;
-    Host2TopicsMap host2topics;
-    boost::shared_mutex host2topics_lock;
-  private:
-    // write the request to target channel
-    void submitTo(const PubSubDataPtr& op, const DuplexChannelPtr& channel);
-
-    const Configuration& conf;
-    bool sslEnabled;
-    SSLContextFactoryPtr sslCtxFactory;
-
-    // whether the channel manager is shutting down
-    bool closed;
-
-    // counter used for generating transaction ids
-    ClientTxnCounter counterobj;
-
-    std::string defaultHostAddress;
-
-    // non-subscription channels
-    std::tr1::unordered_map<HostAddress, DuplexChannelPtr, HostAddressHash > host2channel;
-    boost::shared_mutex host2channel_lock;
-
-    // maintain all established channels
-    typedef std::tr1::unordered_set<DuplexChannelPtr, DuplexChannelPtrHash > ChannelMap;
-    ChannelMap allchannels;
-    boost::shared_mutex allchannels_lock;
-
-    // Response Handlers for non-subscription requests
-    ResponseHandlerMap nonSubscriptionHandlers;
-
-    // Subscription Event Emitter
-    SubscriptionEventEmitter eventEmitter;
-  };
-
-  //
-  // Hedwig Client Channel Handler to handle responses received from the channel
-  //
-
-  class HedwigClientChannelHandler : public ChannelHandler {
-  public:
-    HedwigClientChannelHandler(const DuplexChannelManagerPtr& channelManager,
-                               ResponseHandlerMap& handlers);
-    virtual ~HedwigClientChannelHandler() {}
-    
-    virtual void messageReceived(const DuplexChannelPtr& channel, const PubSubResponsePtr& m);
-    virtual void channelConnected(const DuplexChannelPtr& channel);
-    virtual void channelDisconnected(const DuplexChannelPtr& channel, const std::exception& e);
-    virtual void exceptionOccurred(const DuplexChannelPtr& channel, const std::exception& e);
-
-    void close();
-  protected:
-    // real channel disconnected logic
-    virtual void onChannelDisconnected(const DuplexChannelPtr& channel);
-
-    // real close logic
-    virtual void doClose();
-
-    // channel manager to manage all established channels
-    const DuplexChannelManagerPtr channelManager;
-    ResponseHandlerMap& handlers;
-
-    boost::shared_mutex close_lock;
-    // Boolean indicating if we closed the handler explicitly or not.
-    // If so, we do not need to do the channel disconnected logic here.
-    bool closed;
-    // whether channel is disconnected.
-    bool disconnected;
-  };
-  
-  class PublisherImpl;
-  class SubscriberImpl;
-
-  /**
-     Implementation of the hedwig client. This class takes care of globals such as the topic->host map and the transaction id counter.
-  */
-  class ClientImpl : public boost::enable_shared_from_this<ClientImpl> {
-  public:
-    static ClientImplPtr Create(const Configuration& conf);
-    void Destroy();
-
-    Subscriber& getSubscriber();
-    Publisher& getPublisher();
-
-    SubscriberImpl& getSubscriberImpl();
-    PublisherImpl& getPublisherImpl();
-
-    ~ClientImpl();
-  private:
-    ClientImpl(const Configuration& conf);
-
-    const Configuration& conf;
-
-    boost::mutex publishercreate_lock;
-    PublisherImpl* publisher;
-
-    boost::mutex subscribercreate_lock;
-    SubscriberImpl* subscriber;
-
-    // channel manager manage all channels for the client
-    DuplexChannelManagerPtr channelManager;
-  };
-};
-#endif

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-client/src/main/cpp/lib/data.cpp
----------------------------------------------------------------------
diff --git a/hedwig-client/src/main/cpp/lib/data.cpp b/hedwig-client/src/main/cpp/lib/data.cpp
deleted file mode 100644
index 24d458e..0000000
--- a/hedwig-client/src/main/cpp/lib/data.cpp
+++ /dev/null
@@ -1,277 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-#ifdef HAVE_CONFIG_H
-#include <config.h>
-#endif
-
-#include <hedwig/protocol.h>
-#include "data.h"
-
-#include <log4cxx/logger.h>
-#include <iostream>
-#include <boost/thread/locks.hpp>
-
-#define stringify( name ) #name
-
-static log4cxx::LoggerPtr logger(log4cxx::Logger::getLogger("hedwig."__FILE__));
-
-using namespace Hedwig;
-
-const char* OPERATION_TYPE_NAMES[] = {
-  stringify( PUBLISH ),
-  stringify( SUBSCRIBE ),
-  stringify( CONSUME ),
-  stringify( UNSUBSCRIBE ),
-  stringify( START_DELIVERY ),
-  stringify( STOP_DELIVERY ),
-  stringify( CLOSESUBSCRIPTION )
-};
-
-PubSubDataPtr PubSubData::forPublishRequest(long txnid, const std::string& topic, const Message& body,
-                                            const ResponseCallbackPtr& callback) {
-  PubSubDataPtr ptr(new PubSubData());
-  ptr->type = PUBLISH;
-  ptr->txnid = txnid;
-  ptr->topic = topic;
-  ptr->body.CopyFrom(body);
-  ptr->callback = callback;
-  return ptr;
-}
-
-PubSubDataPtr PubSubData::forSubscribeRequest(long txnid, const std::string& subscriberid, const std::string& topic,
-                                              const ResponseCallbackPtr& callback, const SubscriptionOptions& options) {
-  PubSubDataPtr ptr(new PubSubData());
-  ptr->type = SUBSCRIBE;
-  ptr->txnid = txnid;
-  ptr->subscriberid = subscriberid;
-  ptr->topic = topic;
-  ptr->callback = callback;
-  ptr->options = options;
-  return ptr;  
-}
-
-PubSubDataPtr PubSubData::forUnsubscribeRequest(long txnid, const std::string& subscriberid, const std::string& topic,
-                                                const ResponseCallbackPtr& callback) {
-  PubSubDataPtr ptr(new PubSubData());
-  ptr->type = UNSUBSCRIBE;
-  ptr->txnid = txnid;
-  ptr->subscriberid = subscriberid;
-  ptr->topic = topic;
-  ptr->callback = callback;
-  return ptr;  
-}
-
-PubSubDataPtr PubSubData::forCloseSubscriptionRequest(
-  long txnid, const std::string& subscriberid, const std::string& topic,
-  const ResponseCallbackPtr& callback) {
-  PubSubDataPtr ptr(new PubSubData());
-  ptr->type = CLOSESUBSCRIPTION;
-  ptr->txnid = txnid;
-  ptr->subscriberid = subscriberid;
-  ptr->topic = topic;
-  ptr->callback = callback;
-  return ptr;  
-}
-
-PubSubDataPtr PubSubData::forConsumeRequest(long txnid, const std::string& subscriberid, const std::string& topic, const MessageSeqId msgid) {
-  PubSubDataPtr ptr(new PubSubData());
-  ptr->type = CONSUME;
-  ptr->txnid = txnid;
-  ptr->subscriberid = subscriberid;
-  ptr->topic = topic;
-  ptr->msgid = msgid;
-  return ptr;  
-}
-
-PubSubData::PubSubData() : shouldClaim(false), messageBound(0) {  
-}
-
-PubSubData::~PubSubData() {
-}
-
-OperationType PubSubData::getType() const {
-  return type;
-}
-
-long PubSubData::getTxnId() const {
-  return txnid;
-}
-
-const std::string& PubSubData::getTopic() const {
-  return topic;
-}
-
-const Message& PubSubData::getBody() const {
-  return body;
-}
-
-const MessageSeqId PubSubData::getMessageSeqId() const {
-  return msgid;
-}
-
-void PubSubData::setPreferencesForSubRequest(SubscribeRequest * subreq,
-                                             const SubscriptionOptions &options) {
-  Hedwig::SubscriptionPreferences* preferences = subreq->mutable_preferences();
-  if (options.messagebound() > 0) {
-    preferences->set_messagebound(options.messagebound());
-  }
-  if (options.has_messagefilter()) {
-    preferences->set_messagefilter(options.messagefilter());
-  }
-  if (options.has_options()) {
-    preferences->mutable_options()->CopyFrom(options.options());
-  }
-  if (options.has_messagewindowsize()) {
-    preferences->set_messagewindowsize(options.messagewindowsize());
-  }
-}
-
-const PubSubRequestPtr PubSubData::getRequest() {
-  PubSubRequestPtr request(new Hedwig::PubSubRequest());
-  request->set_protocolversion(Hedwig::VERSION_ONE);
-  request->set_type(type);
-  request->set_txnid(txnid);
-  if (shouldClaim) {
-    request->set_shouldclaim(shouldClaim);
-  }
-  request->set_topic(topic);
-    
-  if (type == PUBLISH) {
-    LOG4CXX_DEBUG(logger, "Creating publish request");
-
-    Hedwig::PublishRequest* pubreq = request->mutable_publishrequest();
-    Hedwig::Message* msg = pubreq->mutable_msg();
-    msg->CopyFrom(body);
-  } else if (type == SUBSCRIBE) {
-    LOG4CXX_DEBUG(logger, "Creating subscribe request");
-
-    Hedwig::SubscribeRequest* subreq = request->mutable_subscriberequest();
-    subreq->set_subscriberid(subscriberid);
-    subreq->set_createorattach(options.createorattach());
-    subreq->set_forceattach(options.forceattach());
-    setPreferencesForSubRequest(subreq, options);
-  } else if (type == CONSUME) {
-    LOG4CXX_DEBUG(logger, "Creating consume request");
-
-    Hedwig::ConsumeRequest* conreq = request->mutable_consumerequest();
-    conreq->set_subscriberid(subscriberid);
-    conreq->mutable_msgid()->CopyFrom(msgid);
-  } else if (type == UNSUBSCRIBE) {
-    LOG4CXX_DEBUG(logger, "Creating unsubscribe request");
-    
-    Hedwig::UnsubscribeRequest* unsubreq = request->mutable_unsubscriberequest();
-    unsubreq->set_subscriberid(subscriberid);    
-  } else if (type == CLOSESUBSCRIPTION) {
-    LOG4CXX_DEBUG(logger, "Creating closeSubscription request");
-    
-    Hedwig::CloseSubscriptionRequest* closesubreq = request->mutable_closesubscriptionrequest();
-    closesubreq->set_subscriberid(subscriberid);    
-  } else {
-    LOG4CXX_ERROR(logger, "Tried to create a request message for the wrong type [" << type << "]");
-    throw UnknownRequestException();
-  }
-
-  return request;
-}
-
-void PubSubData::setShouldClaim(bool shouldClaim) {
-  this->shouldClaim = shouldClaim;
-}
-
-void PubSubData::addTriedServer(HostAddress& h) {
-  triedservers.insert(h);
-}
-
-bool PubSubData::hasTriedServer(HostAddress& h) {
-  return triedservers.count(h) > 0;
-}
-
-void PubSubData::clearTriedServers() {
-  triedservers.clear();
-}
-
-ResponseCallbackPtr& PubSubData::getCallback() {
-  return callback;
-}
-
-void PubSubData::setCallback(const ResponseCallbackPtr& callback) {
-  this->callback = callback;
-}
-
-const std::string& PubSubData::getSubscriberId() const {
-  return subscriberid;
-}
-
-const SubscriptionOptions& PubSubData::getSubscriptionOptions() const {
-  return options;
-}
-
-void PubSubData::setOrigChannelForResubscribe(
-  boost::shared_ptr<DuplexChannel>& channel) {
-  this->origChannel = channel;
-}
-
-boost::shared_ptr<DuplexChannel>& PubSubData::getOrigChannelForResubscribe() {
-  return this->origChannel;
-}
-
-bool PubSubData::isResubscribeRequest() {
-  return 0 != this->origChannel.get();
-}
-
-ClientTxnCounter::ClientTxnCounter() : counter(0) 
-{
-}
-
-ClientTxnCounter::~ClientTxnCounter() {
-}
-
-/**
-Increment the transaction counter and return the new value.
-
-@returns the next transaction id
-*/
-long ClientTxnCounter::next() {  // would be nice to remove lock from here, look more into it
-  boost::lock_guard<boost::mutex> lock(mutex);
-
-  long next= ++counter; 
-
-  return next;
-}
-
-std::ostream& Hedwig::operator<<(std::ostream& os, const PubSubData& data) {
-  OperationType type = data.getType();
-  os << "[" << OPERATION_TYPE_NAMES[type] << " request (txn:" << data.getTxnId()
-     << ") for (topic:" << data.getTopic();
-  switch (type) {
-  case SUBSCRIBE:
-  case UNSUBSCRIBE:
-  case CLOSESUBSCRIPTION:
-    os << ", subscriber:" << data.getSubscriberId() << ")";
-    break;
-  case CONSUME:
-    os << ", subscriber:" << data.getSubscriberId() << ", seq:"
-    << data.getMessageSeqId().localcomponent() << ")";
-    break;
-  case PUBLISH:
-  default:
-    os << ")";
-    break;
-  }
-  return os;
-}

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-client/src/main/cpp/lib/data.h
----------------------------------------------------------------------
diff --git a/hedwig-client/src/main/cpp/lib/data.h b/hedwig-client/src/main/cpp/lib/data.h
deleted file mode 100644
index 0639f4a..0000000
--- a/hedwig-client/src/main/cpp/lib/data.h
+++ /dev/null
@@ -1,131 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-#ifndef DATA_H
-#define DATA_H
-
-#include <hedwig/protocol.h>
-#include <hedwig/callback.h>
-
-#include <pthread.h>
-#include <iostream>
-
-#ifdef USE_BOOST_TR1
-#include <boost/tr1/unordered_set.hpp>
-#else
-#include <tr1/unordered_set>
-#endif
-
-#include "util.h"
-#include <boost/shared_ptr.hpp>
-#include <boost/thread/mutex.hpp>
-
-namespace Hedwig {
-  /**
-     Simple counter for transaction ids from the client
-  */
-  class ClientTxnCounter {
-  public:
-    ClientTxnCounter();
-    ~ClientTxnCounter();
-    long next();
-    
-  private:
-    long counter;
-    boost::mutex mutex;
-  };
-
-  typedef Callback<ResponseBody> ResponseCallback;
-  typedef std::tr1::shared_ptr<ResponseCallback> ResponseCallbackPtr;
-
-  class PubSubData;
-  typedef boost::shared_ptr<PubSubData> PubSubDataPtr;
-  typedef boost::shared_ptr<PubSubRequest> PubSubRequestPtr;
-  typedef boost::shared_ptr<PubSubResponse> PubSubResponsePtr;
-
-  class DuplexChannel;
-
-  /**
-     Data structure to hold information about requests and build request messages.
-     Used to store requests which may need to be resent to another server. 
-   */
-  class PubSubData {
-  public:
-    // to be used for publish
-    static PubSubDataPtr forPublishRequest(long txnid, const std::string& topic, const Message& body,
-                                           const ResponseCallbackPtr& callback);
-    static PubSubDataPtr forSubscribeRequest(long txnid, const std::string& subscriberid, const std::string& topic,
-                                             const ResponseCallbackPtr& callback, const SubscriptionOptions& options);
-    static PubSubDataPtr forUnsubscribeRequest(long txnid, const std::string& subscriberid, const std::string& topic,
-                                               const ResponseCallbackPtr& callback);
-    static PubSubDataPtr forConsumeRequest(long txnid, const std::string& subscriberid, const std::string& topic, const MessageSeqId msgid);
-
-    static PubSubDataPtr forCloseSubscriptionRequest(long txnid, const std::string& subscriberid,
-                                                     const std::string& topic,
-                                                     const ResponseCallbackPtr& callback);
-
-    ~PubSubData();
-
-    OperationType getType() const;
-    long getTxnId() const;
-    const std::string& getSubscriberId() const;
-    const std::string& getTopic() const;
-    const Message& getBody() const;
-    const MessageSeqId getMessageSeqId() const;
-
-    void setShouldClaim(bool shouldClaim);
-    void setMessageBound(int messageBound);
-
-    const PubSubRequestPtr getRequest();
-    void setCallback(const ResponseCallbackPtr& callback);
-    ResponseCallbackPtr& getCallback();
-    const SubscriptionOptions& getSubscriptionOptions() const;
-
-    void addTriedServer(HostAddress& h);
-    bool hasTriedServer(HostAddress& h);
-    void clearTriedServers();
-
-    void setOrigChannelForResubscribe(boost::shared_ptr<DuplexChannel>& channel);
-    bool isResubscribeRequest();
-    boost::shared_ptr<DuplexChannel>& getOrigChannelForResubscribe();
-
-    friend std::ostream& operator<<(std::ostream& os, const PubSubData& data);
-  private:
-
-    PubSubData();
-
-    void setPreferencesForSubRequest(SubscribeRequest * subreq,
-                                     const SubscriptionOptions &options);
-    
-    OperationType type;
-    long txnid;
-    std::string subscriberid;
-    std::string topic;
-    Message body;
-    bool shouldClaim;
-    int messageBound;
-    ResponseCallbackPtr callback;
-    SubscriptionOptions options;
-    MessageSeqId msgid;
-    std::tr1::unordered_set<HostAddress, HostAddressHash > triedservers;
-    // record the origChannel for a resubscribe request
-    boost::shared_ptr<DuplexChannel> origChannel;
-  };
-
-};
-#endif

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-client/src/main/cpp/lib/eventdispatcher.cpp
----------------------------------------------------------------------
diff --git a/hedwig-client/src/main/cpp/lib/eventdispatcher.cpp b/hedwig-client/src/main/cpp/lib/eventdispatcher.cpp
deleted file mode 100644
index af3560c..0000000
--- a/hedwig-client/src/main/cpp/lib/eventdispatcher.cpp
+++ /dev/null
@@ -1,131 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-#ifdef HAVE_CONFIG_H
-#include <config.h>
-#endif
-
-#include "eventdispatcher.h"
-
-#include <log4cxx/logger.h>
-
-static log4cxx::LoggerPtr logger(log4cxx::Logger::getLogger("hedwig."__FILE__));
-
-using namespace Hedwig;
-
-const int DEFAULT_NUM_DISPATCH_THREADS = 1;
-
-IOService::IOService() {
-}
-
-IOService::~IOService() {}
-
-void IOService::start() {
-  if (work.get()) {
-    return;
-  }
-  work = work_ptr(new boost::asio::io_service::work(service));
-}
-
-void IOService::stop() {
-  if (!work.get()) {
-    return;
-  }
-
-  work = work_ptr();
-  service.stop();
-}
-
-void IOService::run() {
-  while (true) {
-    try {
-      service.run();
-      break;
-    } catch (std::exception &e) {
-      LOG4CXX_ERROR(logger, "Exception in IO Service " << this << " : " << e.what());
-    }
-  }
-}
-
-EventDispatcher::EventDispatcher(const Configuration& conf)
-  : conf(conf), running(false), next_io_service(0) {
-  num_threads = conf.getInt(Configuration::NUM_DISPATCH_THREADS,
-                            DEFAULT_NUM_DISPATCH_THREADS);
-  if (0 == num_threads) {
-    LOG4CXX_ERROR(logger, "Number of threads in dispatcher is zero");
-    throw std::runtime_error("number of threads in dispatcher is zero");
-  }
-  for (size_t i = 0; i < num_threads; i++) {
-    services.push_back(IOServicePtr(new IOService()));
-  }
-  LOG4CXX_DEBUG(logger, "Created EventDispatcher " << this);
-}
-
-void EventDispatcher::run_forever(IOServicePtr service, size_t idx) {
-  LOG4CXX_INFO(logger, "Starting event dispatcher " << idx);
-
-  service->run();
-
-  LOG4CXX_INFO(logger, "Event dispatcher " << idx << " done");
-}
-
-void EventDispatcher::start() {
-  if (running) {
-    return;
-  }
-
-  for (size_t i = 0; i < num_threads; i++) {
-    IOServicePtr service = services[i];
-    service->start();
-    // new thread
-    thread_ptr t(new boost::thread(boost::bind(&EventDispatcher::run_forever,
-                                               this, service, i)));
-    threads.push_back(t);
-  }
-  running = true;
-}
-
-void EventDispatcher::stop() {
-  if (!running) {
-    return;
-  }
-
-  for (size_t i = 0; i < num_threads; i++) {
-    services[i]->stop();
-  }
-
-  for (size_t i = 0; i < num_threads; i++) {
-    threads[i]->join();
-  }
-  threads.clear();
-
-  running = false;
-}
-
-EventDispatcher::~EventDispatcher() {
-  services.clear();
-}
-
-IOServicePtr& EventDispatcher::getService() {
-  size_t next = 0;
-  {
-    boost::lock_guard<boost::mutex> lock(next_lock);
-    next = next_io_service;
-    next_io_service = (next_io_service + 1) % num_threads;
-  }
-  return services[next];
-}

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-client/src/main/cpp/lib/eventdispatcher.h
----------------------------------------------------------------------
diff --git a/hedwig-client/src/main/cpp/lib/eventdispatcher.h b/hedwig-client/src/main/cpp/lib/eventdispatcher.h
deleted file mode 100644
index b6a7504..0000000
--- a/hedwig-client/src/main/cpp/lib/eventdispatcher.h
+++ /dev/null
@@ -1,87 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-#ifndef EVENTDISPATCHER_H
-#define EVENTDISPATCHER_H
-
-#include <vector>
-
-#include <hedwig/client.h>
-
-#include <boost/asio.hpp>
-#include <boost/thread.hpp>
-#include <boost/shared_ptr.hpp>
-
-namespace Hedwig {
-  typedef boost::shared_ptr<boost::asio::io_service::work> work_ptr;
-  typedef boost::shared_ptr<boost::thread> thread_ptr;
-
-  class IOService;
-  typedef boost::shared_ptr<IOService> IOServicePtr;
-
-  class IOService {
-  public:
-    IOService();
-    virtual ~IOService();
-
-    // start the io service
-    void start();
-    // stop the io service
-    void stop();
-    // run the io service
-    void run();
-
-    inline boost::asio::io_service& getService() {
-      return service;
-    }
-
-  private:
-    boost::asio::io_service service;  
-    work_ptr work;
-  };
-
-  class EventDispatcher {
-  public:  
-    EventDispatcher(const Configuration& conf);
-    ~EventDispatcher();
-    
-    void start();
-
-    void stop();
-    
-    IOServicePtr& getService();
-
-  private:
-    void run_forever(IOServicePtr service, size_t idx);
-
-    const Configuration& conf;
-
-    // number of threads
-    size_t num_threads;
-    // running flag
-    bool running;
-    // pool of io_services.
-    std::vector<IOServicePtr> services;
-    // threads
-    std::vector<thread_ptr> threads;
-    // next io_service used for a connection
-    boost::mutex next_lock;
-    std::size_t next_io_service;
-  };
-}
-
-#endif

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-client/src/main/cpp/lib/exceptions.cpp
----------------------------------------------------------------------
diff --git a/hedwig-client/src/main/cpp/lib/exceptions.cpp b/hedwig-client/src/main/cpp/lib/exceptions.cpp
deleted file mode 100644
index 9e062dc..0000000
--- a/hedwig-client/src/main/cpp/lib/exceptions.cpp
+++ /dev/null
@@ -1,31 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-#ifdef HAVE_CONFIG_H
-#include <config.h>
-#endif
-
-#include <hedwig/exceptions.h>
-#include <stdlib.h>
-#include <string.h>
-
-using namespace Hedwig;
-
-
-
-  

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-client/src/main/cpp/lib/filterablemessagehandler.cpp
----------------------------------------------------------------------
diff --git a/hedwig-client/src/main/cpp/lib/filterablemessagehandler.cpp b/hedwig-client/src/main/cpp/lib/filterablemessagehandler.cpp
deleted file mode 100644
index 07d884c..0000000
--- a/hedwig-client/src/main/cpp/lib/filterablemessagehandler.cpp
+++ /dev/null
@@ -1,45 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-#ifdef HAVE_CONFIG_H
-#include <config.h>
-#endif
-
-#include "filterablemessagehandler.h"
-
-using namespace Hedwig;
-
-FilterableMessageHandler::FilterableMessageHandler(const MessageHandlerCallbackPtr& msgHandler,
-                                                   const ClientMessageFilterPtr& msgFilter)
-  : msgHandler(msgHandler), msgFilter(msgFilter) {
-}
-
-FilterableMessageHandler::~FilterableMessageHandler() {
-}
-
-void FilterableMessageHandler::consume(const std::string& topic, const std::string& subscriberId,
-                                       const Message& msg, OperationCallbackPtr& callback) {
-  bool deliver = true;
-  if (0 != msgFilter.get()) {
-    deliver = msgFilter->testMessage(msg);
-  }
-  if (deliver) {
-    msgHandler->consume(topic, subscriberId, msg, callback);
-  } else {
-    callback->operationComplete();
-  }
-}

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-client/src/main/cpp/lib/filterablemessagehandler.h
----------------------------------------------------------------------
diff --git a/hedwig-client/src/main/cpp/lib/filterablemessagehandler.h b/hedwig-client/src/main/cpp/lib/filterablemessagehandler.h
deleted file mode 100644
index 2d24bd5..0000000
--- a/hedwig-client/src/main/cpp/lib/filterablemessagehandler.h
+++ /dev/null
@@ -1,49 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-#ifndef FILTERABLE_MESSAGE_HANDLER_H
-#define FILTERABLE_MESSAGE_HANDLER_H
-
-#include <hedwig/callback.h>
-#include <hedwig/protocol.h>
-
-#ifdef USE_BOOST_TR1
-#include <boost/tr1/memory.hpp>
-#else 
-#include <tr1/memory>
-#endif
-
-namespace Hedwig {
-
-  class FilterableMessageHandler : public MessageHandlerCallback {
-  public:
-    FilterableMessageHandler(const MessageHandlerCallbackPtr& msgHandler,
-                             const ClientMessageFilterPtr& msgFilter);
-
-    virtual void consume(const std::string& topic, const std::string& subscriberId,
-                         const Message& msg, OperationCallbackPtr& callback);
-
-    virtual ~FilterableMessageHandler();
-  private:
-    const MessageHandlerCallbackPtr msgHandler;
-    const ClientMessageFilterPtr msgFilter;
-  };
-
-};
-
-#endif
-