You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@bookkeeper.apache.org by si...@apache.org on 2016/03/16 04:44:41 UTC

[31/49] bookkeeper git commit: BOOKKEEPER-769: Remove the Hedwig Code

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-client/src/main/cpp/inc/hedwig/callback.h
----------------------------------------------------------------------
diff --git a/hedwig-client/src/main/cpp/inc/hedwig/callback.h b/hedwig-client/src/main/cpp/inc/hedwig/callback.h
deleted file mode 100644
index 80e961b..0000000
--- a/hedwig-client/src/main/cpp/inc/hedwig/callback.h
+++ /dev/null
@@ -1,84 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-#ifndef HEDWIG_CALLBACK_H
-#define HEDWIG_CALLBACK_H
-
-#include <string>
-#include <hedwig/exceptions.h>
-#include <hedwig/protocol.h>
-
-#ifdef USE_BOOST_TR1
-#include <boost/tr1/memory.hpp>
-#else 
-#include <tr1/memory>
-#endif
-
-namespace Hedwig {
-
-  //
-  // A Listener registered for a Subscriber instance to emit events
-  // for those disable resubscribe subscriptions.
-  //
-  class SubscriptionListener {
-  public:
-    virtual void processEvent(const std::string &topic, const std::string &subscriberId,
-                              const Hedwig::SubscriptionEvent event) = 0;
-    virtual ~SubscriptionListener() {};
-  };
-  typedef std::tr1::shared_ptr<SubscriptionListener> SubscriptionListenerPtr;
-
-  template<class R>
-  class Callback {
-  public:
-    virtual void operationComplete(const R& result) = 0;
-    virtual void operationFailed(const std::exception& exception) = 0;
-
-    virtual ~Callback() {};
-  };
-
-  class OperationCallback {
-  public:
-    virtual void operationComplete() = 0;
-    virtual void operationFailed(const std::exception& exception) = 0;
-    
-    virtual ~OperationCallback() {};
-  };
-  typedef std::tr1::shared_ptr<OperationCallback> OperationCallbackPtr;
-
-  class MessageHandlerCallback {
-  public:
-    virtual void consume(const std::string& topic, const std::string& subscriberId, const Message& msg, OperationCallbackPtr& callback) = 0;
-    
-    virtual ~MessageHandlerCallback() {};
-  };
-  typedef std::tr1::shared_ptr<MessageHandlerCallback> MessageHandlerCallbackPtr;
-
-  typedef std::tr1::shared_ptr<SubscriptionPreferences> SubscriptionPreferencesPtr;
-
-  class ClientMessageFilter {
-  public:
-    virtual void setSubscriptionPreferences(const std::string& topic, const std::string& subscriberId,
-                                            const SubscriptionPreferencesPtr& preferences) = 0;
-    virtual bool testMessage(const Message& message) = 0;
-
-    virtual ~ClientMessageFilter() {};
-  };
-  typedef std::tr1::shared_ptr<ClientMessageFilter> ClientMessageFilterPtr;
-}
-
-#endif

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-client/src/main/cpp/inc/hedwig/client.h
----------------------------------------------------------------------
diff --git a/hedwig-client/src/main/cpp/inc/hedwig/client.h b/hedwig-client/src/main/cpp/inc/hedwig/client.h
deleted file mode 100644
index 7b914bc..0000000
--- a/hedwig-client/src/main/cpp/inc/hedwig/client.h
+++ /dev/null
@@ -1,98 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-#ifndef HEDWIG_CLIENT_H
-#define HEDWIG_CLIENT_H
-
-#include <string>
-
-#ifdef USE_BOOST_TR1
-#include <boost/tr1/memory.hpp>
-#else 
-#include <tr1/memory>
-#endif
-
-#include <hedwig/subscribe.h>
-#include <hedwig/publish.h>
-#include <hedwig/exceptions.h>
-#include <boost/noncopyable.hpp>
-#include <boost/shared_ptr.hpp>
-
-namespace Hedwig {
-
-  class ClientImpl;
-  typedef boost::shared_ptr<ClientImpl> ClientImplPtr;
-
-  class Configuration {
-  public:
-    static const std::string DEFAULT_SERVER;
-    static const std::string MESSAGE_CONSUME_RETRY_WAIT_TIME;
-    static const std::string SUBSCRIBER_CONSUME_RETRY_WAIT_TIME;
-    static const std::string MAX_MESSAGE_QUEUE_SIZE;
-    static const std::string RECONNECT_SUBSCRIBE_RETRY_WAIT_TIME;
-    static const std::string SYNC_REQUEST_TIMEOUT;
-    static const std::string SUBSCRIBER_AUTOCONSUME;
-    static const std::string NUM_DISPATCH_THREADS;
-    static const std::string SSL_ENABLED;
-    static const std::string SSL_PEM_FILE;
-    static const std::string SUBSCRIPTION_CHANNEL_SHARING_ENABLED;
-    /**
-     * The maximum number of messages the hub will queue for subscriptions
-     * created using this configuration. The hub will always queue the most
-     * recent messages. If there are enough publishes to the topic to hit
-     * the bound, then the oldest messages are dropped from the queue.
-     *
-     * A bound of 0 disabled the bound completely.
-     */
-    static const std::string SUBSCRIPTION_MESSAGE_BOUND;
-
-  public:
-    Configuration() {};
-    virtual int getInt(const std::string& key, int defaultVal) const = 0;
-    virtual const std::string get(const std::string& key, const std::string& defaultVal) const = 0;
-    virtual bool getBool(const std::string& key, bool defaultVal) const = 0;
-
-    virtual ~Configuration() {}
-  };
-
-  /** 
-      Main Hedwig client class. This class is used to acquire an instance of the Subscriber of Publisher.
-  */
-  class Client : private boost::noncopyable {
-  public: 
-    Client(const Configuration& conf);
-
-    /**
-       Retrieve the subscriber object
-    */
-    Subscriber& getSubscriber();
-
-    /**
-       Retrieve the publisher object
-    */
-    Publisher& getPublisher();
-
-    ~Client();
-
-  private:
-    ClientImplPtr clientimpl;
-  };
-
- 
-};
-
-#endif

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-client/src/main/cpp/inc/hedwig/exceptions.h
----------------------------------------------------------------------
diff --git a/hedwig-client/src/main/cpp/inc/hedwig/exceptions.h b/hedwig-client/src/main/cpp/inc/hedwig/exceptions.h
deleted file mode 100644
index b44fed9..0000000
--- a/hedwig-client/src/main/cpp/inc/hedwig/exceptions.h
+++ /dev/null
@@ -1,57 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-#ifndef HEDWIG_EXCEPTION_H
-#define HEDWIG_EXCEPTION_H
-
-#include <exception>
-
-namespace Hedwig {
-
-  class ClientException : public std::exception { };
-
-  class ClientTimeoutException : public ClientException {};
-
-  class ServiceDownException : public ClientException {};
-  class CannotConnectException : public ClientException {};
-  class UnexpectedResponseException : public ClientException {};
-  class OomException : public ClientException {};
-  class UnknownRequestException : public ClientException {};
-  class InvalidRedirectException : public ClientException {};
-  class NoChannelHandlerException : public ClientException {};
-
-  class PublisherException : public ClientException { };
-  
-  class SubscriberException : public ClientException { };
-  class AlreadySubscribedException : public SubscriberException {};
-  class NotSubscribedException : public SubscriberException {};
-  class ResubscribeException : public SubscriberException {};
-  class NullMessageHandlerException : public SubscriberException {};
-  class NullMessageFilterException : public SubscriberException {};
-
-  class AlreadyStartDeliveryException : public SubscriberException {};
-  class StartingDeliveryException : public SubscriberException {};
-
-  class ConfigurationException : public ClientException { };
-  class InvalidPortException : public ConfigurationException {};
-  class HostResolutionException : public ClientException {};
-  
-  class InvalidStateException : public ClientException {};
-  class ShuttingDownException : public InvalidStateException {};
-};
-
-#endif

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-client/src/main/cpp/inc/hedwig/publish.h
----------------------------------------------------------------------
diff --git a/hedwig-client/src/main/cpp/inc/hedwig/publish.h b/hedwig-client/src/main/cpp/inc/hedwig/publish.h
deleted file mode 100644
index ea08838..0000000
--- a/hedwig-client/src/main/cpp/inc/hedwig/publish.h
+++ /dev/null
@@ -1,72 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-#ifndef HEDWIG_PUBLISH_H
-#define HEDWIG_PUBLISH_H
-
-#include <string>
-
-#include <hedwig/exceptions.h>
-#include <hedwig/callback.h>
-#include <hedwig/protocol.h>
-#include <boost/noncopyable.hpp>
-
-namespace Hedwig {
-
-  typedef std::tr1::shared_ptr<PublishResponse> PublishResponsePtr;
-  typedef Callback<PublishResponsePtr> PublishResponseCallback;
-  typedef std::tr1::shared_ptr<PublishResponseCallback> PublishResponseCallbackPtr;
-
-  /**
-     Interface for publishing to a hedwig instance.
-  */
-  class Publisher : private boost::noncopyable {
-  public:
-    /**
-       Publish message for topic, and block until we receive a ACK response from the hedwig server.
-       
-       @param topic Topic to publish to.
-       @param message Data to publish for topic.
-    */
-    virtual PublishResponsePtr publish(const std::string& topic, const std::string& message) = 0;
-    
-    virtual PublishResponsePtr publish(const std::string& topic, const Message& message) = 0;
-
-    /** 
-	Asynchronously publish message for topic. 
-	
-	@code
-	OperationCallbackPtr callback(new MyCallback());
-	pub.asyncPublish(callback);
-	@endcode
-
-	@param topic Topic to publish to.
-	@param message Data to publish to topic
-	@param callback Callback which will be used to report success or failure. Success is only reported once the server replies with an ACK response to the publication.
-    */
-    virtual void asyncPublish(const std::string& topic, const std::string& message, const OperationCallbackPtr& callback) = 0;
-    
-    virtual void asyncPublish(const std::string& topic, const Message& message, const OperationCallbackPtr& callback) = 0;
-
-    virtual void asyncPublishWithResponse(const std::string& topic, const Message& messsage,
-                                          const PublishResponseCallbackPtr& callback) = 0;
-
-    virtual ~Publisher() {}
-  };
-};
-
-#endif

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-client/src/main/cpp/inc/hedwig/subscribe.h
----------------------------------------------------------------------
diff --git a/hedwig-client/src/main/cpp/inc/hedwig/subscribe.h b/hedwig-client/src/main/cpp/inc/hedwig/subscribe.h
deleted file mode 100644
index 4bc718c..0000000
--- a/hedwig-client/src/main/cpp/inc/hedwig/subscribe.h
+++ /dev/null
@@ -1,71 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-#ifndef HEDWIG_SUBSCRIBE_H
-#define HEDWIG_SUBSCRIBE_H
-
-#include <string>
-
-#include <hedwig/exceptions.h>
-#include <hedwig/callback.h>
-#include <hedwig/protocol.h>
-#include <boost/noncopyable.hpp>
-
-namespace Hedwig {
-
-  /**
-     Interface for subscribing to a hedwig instance. 
-  */
-  class Subscriber : private boost::noncopyable {
-  public:
-    virtual void subscribe(const std::string& topic, const std::string& subscriberId, const SubscribeRequest::CreateOrAttach mode) = 0;
-    virtual void asyncSubscribe(const std::string& topic, const std::string& subscriberId, const SubscribeRequest::CreateOrAttach mode, const OperationCallbackPtr& callback) = 0;
-    virtual void subscribe(const std::string& topic, const std::string& subscriberId, const SubscriptionOptions& options) = 0;
-    virtual void asyncSubscribe(const std::string& topic, const std::string& subscriberId, const SubscriptionOptions& options, const OperationCallbackPtr& callback) = 0;
-
-    virtual void unsubscribe(const std::string& topic, const std::string& subscriberId) = 0;
-    virtual void asyncUnsubscribe(const std::string& topic, const std::string& subscriberId, const OperationCallbackPtr& callback) = 0;  
-
-    virtual void consume(const std::string& topic, const std::string& subscriberId, const MessageSeqId& messageSeqId) = 0;
-
-    virtual void startDelivery(const std::string& topic, const std::string& subscriberId,
-                               const MessageHandlerCallbackPtr& callback) = 0;
-    virtual void startDeliveryWithFilter(const std::string& topic,
-                                         const std::string& subscriberId,
-                                         const MessageHandlerCallbackPtr& callback,
-                                         const ClientMessageFilterPtr& filter) = 0;
-
-    virtual void stopDelivery(const std::string& topic, const std::string& subscriberId) = 0;
-
-    virtual bool hasSubscription(const std::string& topic, const std::string& subscriberId) = 0;
-    virtual void closeSubscription(const std::string& topic, const std::string& subscriberId) = 0;
-    virtual void asyncCloseSubscription(const std::string& topic, const std::string& subscriberId,
-                                        const OperationCallbackPtr& callback) = 0;
-
-    //
-    // API to register/unregister subscription listeners for receiving
-    // events indicating subscription changes for those disable resubscribe
-    // subscriptions
-    //
-    virtual void addSubscriptionListener(SubscriptionListenerPtr& listener) = 0;
-    virtual void removeSubscriptionListener(SubscriptionListenerPtr& listener) = 0;
-
-    virtual ~Subscriber() {}
-  };
-};
-
-#endif

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-client/src/main/cpp/lib/Makefile.am
----------------------------------------------------------------------
diff --git a/hedwig-client/src/main/cpp/lib/Makefile.am b/hedwig-client/src/main/cpp/lib/Makefile.am
deleted file mode 100644
index f19a3da..0000000
--- a/hedwig-client/src/main/cpp/lib/Makefile.am
+++ /dev/null
@@ -1,32 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements.  See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership.  The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License.  You may obtain a copy of the License at
-#
-#     http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-PROTODEF = ../../../../../hedwig-protocol/src/main/protobuf/PubSubProtocol.proto
-
-lib_LTLIBRARIES = libhedwig01.la
-libhedwig01_la_SOURCES = protocol.cpp channel.cpp client.cpp util.cpp clientimpl.cpp publisherimpl.cpp subscriberimpl.cpp eventdispatcher.cpp data.cpp filterablemessagehandler.cpp simplesubscriberimpl.cpp multiplexsubscriberimpl.cpp
-libhedwig01_la_CPPFLAGS = -I$(top_srcdir)/inc $(DEPS_CFLAGS)
-libhedwig01_la_LIBADD = $(DEPS_LIBS) $(BOOST_CPPFLAGS) 
-libhedwig01_la_LDFLAGS = -no-undefined $(BOOST_ASIO_LIB) $(BOOST_LDFLAGS) $(BOOST_THREAD_LIB)
-
-protocol.cpp: $(PROTODEF)
-	protoc --cpp_out=. -I`dirname $(PROTODEF)` $(PROTODEF)
-	sed "s/PubSubProtocol.pb.h/hedwig\/protocol.h/" PubSubProtocol.pb.cc > protocol.cpp
-	rm PubSubProtocol.pb.cc
-	mv PubSubProtocol.pb.h $(top_srcdir)/inc/hedwig/protocol.h
-

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-client/src/main/cpp/lib/channel.cpp
----------------------------------------------------------------------
diff --git a/hedwig-client/src/main/cpp/lib/channel.cpp b/hedwig-client/src/main/cpp/lib/channel.cpp
deleted file mode 100644
index b980e53..0000000
--- a/hedwig-client/src/main/cpp/lib/channel.cpp
+++ /dev/null
@@ -1,801 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-#ifdef HAVE_CONFIG_H
-#include <config.h>
-#endif
-
-#include <sys/types.h>
-#include <sys/socket.h>
-#include <netinet/in.h>
-#include <arpa/inet.h>
-#include <netinet/tcp.h>
-#include <poll.h>
-#include <iostream>
-
-#include <stdio.h>
-#include <stdlib.h>
-#include <string.h>
-#include <unistd.h>
-#include <errno.h>
-#include <vector>
-#include <utility>
-#include <deque>
-#include "channel.h"
-#include "util.h"
-#include "clientimpl.h"
-
-#include <log4cxx/logger.h>
-#include <google/protobuf/io/zero_copy_stream_impl.h>
-
-static log4cxx::LoggerPtr logger(log4cxx::Logger::getLogger("hedwig."__FILE__));
-
-using namespace Hedwig;
-
-const std::string DEFAULT_SSL_PEM_FILE = "";
-
-AbstractDuplexChannel::AbstractDuplexChannel(IOServicePtr& service,
-                                             const HostAddress& addr, 
-                                             const ChannelHandlerPtr& handler)
-  : address(addr), handler(handler), service(service->getService()),
-    instream(&in_buf), copy_buf(NULL), copy_buf_length(0),
-    state(UNINITIALISED), receiving(false), reading(false), sending(false),
-    closed(false)
-{}
-
-AbstractDuplexChannel::~AbstractDuplexChannel() {
-  free(copy_buf);
-  copy_buf = NULL;
-  copy_buf_length = 0;
-
-  LOG4CXX_INFO(logger, "Destroying DuplexChannel(" << this << ")");
-}
-
-ChannelHandlerPtr AbstractDuplexChannel::getChannelHandler() {
-  return handler;
-}
-
-/*static*/ void AbstractDuplexChannel::connectCallbackHandler(
-                  AbstractDuplexChannelPtr channel,
-                  OperationCallbackPtr callback,
-                  const boost::system::error_code& error) {
-  channel->doAfterConnect(callback, error);
-}
-
-void AbstractDuplexChannel::connect() {
-  connect(OperationCallbackPtr());
-}
-
-void AbstractDuplexChannel::connect(const OperationCallbackPtr& callback) {  
-  setState(CONNECTING);
-  doConnect(callback);
-}
-
-void AbstractDuplexChannel::doAfterConnect(const OperationCallbackPtr& callback,
-                                           const boost::system::error_code& error) {
-  if (error) {
-    LOG4CXX_ERROR(logger, "Channel " << this << " connect error : " << error.message().c_str());
-    channelConnectFailed(ChannelConnectException(), callback);
-    return;
-  }
-
-  // set no delay option
-  boost::system::error_code ec;
-  setSocketOption(ec);
-  if (ec) {
-    LOG4CXX_WARN(logger, "Channel " << this << " set up socket error : " << ec.message().c_str());
-    channelConnectFailed(ChannelSetupException(), callback);
-    return;
-  } 
-
-  boost::asio::ip::tcp::endpoint localEp;
-  boost::asio::ip::tcp::endpoint remoteEp;
-  localEp = getLocalAddress(ec);
-  remoteEp = getRemoteAddress(ec);
-
-  if (!ec) {
-    LOG4CXX_INFO(logger, "Channel " << this << " connected :"
-                         << localEp.address().to_string() << ":" << localEp.port() << "=>"
-                         << remoteEp.address().to_string() << ":" << remoteEp.port());
-    // update ip address since if might connect to VIP
-    address.updateIP(remoteEp.address().to_v4().to_ulong());
-  }
-  // the channel is connected
-  channelConnected(callback);
-}
-
-void AbstractDuplexChannel::channelConnectFailed(const std::exception& e,
-                                                 const OperationCallbackPtr& callback) {
-  channelDisconnected(e);
-  setState(DEAD);
-  if (callback.get()) {
-    callback->operationFailed(e);
-  }
-}
-
-void AbstractDuplexChannel::channelConnected(const OperationCallbackPtr& callback) {
-  // for normal channel, we have done here
-  setState(CONNECTED);
-  if (callback.get()) {
-    callback->operationComplete();
-  }
-
-  // enable sending & receiving
-  startSending();
-  startReceiving();
-}
-
-/*static*/ void AbstractDuplexChannel::messageReadCallbackHandler(
-                AbstractDuplexChannelPtr channel, 
-                std::size_t message_size,
-                const boost::system::error_code& error, 
-                std::size_t bytes_transferred) {
-  LOG4CXX_DEBUG(logger, "DuplexChannel::messageReadCallbackHandler " << error << ", " 
-                        << bytes_transferred << " channel(" << channel.get() << ")");
-
-  if (error) {
-    if (!channel->isClosed()) {
-      LOG4CXX_INFO(logger, "Invalid read error (" << error << ") bytes_transferred (" 
-                           << bytes_transferred << ") channel(" << channel.get() << ")");
-    }
-    channel->channelDisconnected(ChannelReadException());
-    return;
-  }
-
-  if (channel->copy_buf_length < message_size) {
-    channel->copy_buf_length = message_size;
-    channel->copy_buf = (char*)realloc(channel->copy_buf, channel->copy_buf_length);
-    if (channel->copy_buf == NULL) {
-      LOG4CXX_ERROR(logger, "Error allocating buffer. channel(" << channel.get() << ")");
-      // if failed to realloc memory, we should disconnect the channel.
-      // then it would enter disconnect logic, which would close channel and release
-      // its resources includes the copy_buf memory.
-      channel->channelDisconnected(ChannelOutOfMemoryException());
-      return;
-    }
-  }
-  
-  channel->instream.read(channel->copy_buf, message_size);
-  PubSubResponsePtr response(new PubSubResponse());
-  bool err = response->ParseFromArray(channel->copy_buf, message_size);
-
-  if (!err) {
-    LOG4CXX_ERROR(logger, "Error parsing message. channel(" << channel.get() << ")");
-    channel->channelDisconnected(ChannelReadException());
-    return;
-  } else {
-    LOG4CXX_DEBUG(logger,  "channel(" << channel.get() << ") : " << channel->in_buf.size() 
-                           << " bytes left in buffer");
-  }
-
-  ChannelHandlerPtr h;
-  {
-    boost::shared_lock<boost::shared_mutex> lock(channel->destruction_lock);
-    if (channel->handler.get()) {
-      h = channel->handler;
-    }
-  }
-
-  // channel did stopReceiving, we should not call #messageReceived
-  // store this response in outstanding_response variable and did stop receiving
-  // when we startReceiving again, we can process this last response.
-  {
-    boost::lock_guard<boost::mutex> lock(channel->receiving_lock);
-    if (!channel->isReceiving()) {
-      // queue the response
-      channel->outstanding_response = response;
-      channel->reading = false;
-      return;
-    }
-  }
-
-  // channel is still in receiving status
-  if (h.get()) {
-    h->messageReceived(channel, response);
-  }
-
-  AbstractDuplexChannel::readSize(channel);
-}
-
-/*static*/ void AbstractDuplexChannel::sizeReadCallbackHandler(
-                   AbstractDuplexChannelPtr channel, 
-                   const boost::system::error_code& error, 
-                   std::size_t bytes_transferred) {
-  LOG4CXX_DEBUG(logger, "DuplexChannel::sizeReadCallbackHandler " << error << ", " 
-                        << bytes_transferred << " channel(" << channel.get() << ")");
-
-  if (error) {
-    if (!channel->isClosed()) {
-      LOG4CXX_INFO(logger, "Invalid read error (" << error << ") bytes_transferred (" 
-                           << bytes_transferred << ") channel(" << channel.get() << ")");
-    }
-    channel->channelDisconnected(ChannelReadException());
-    return;
-  }
-  
-  if (channel->in_buf.size() < sizeof(uint32_t)) {
-    LOG4CXX_ERROR(logger, "Not enough data in stream. Must have been an error reading. " 
-                          << " Closing channel(" << channel.get() << ")");
-    channel->channelDisconnected(ChannelReadException());
-    return;
-  }
-
-  uint32_t size;
-  std::istream is(&channel->in_buf);
-  is.read((char*)&size, sizeof(uint32_t));
-  size = ntohl(size);
-
-  int toread = size - channel->in_buf.size();
-  LOG4CXX_DEBUG(logger, " size of incoming message " << size << ", currently in buffer " 
-                        << channel->in_buf.size() << " channel(" << channel.get() << ")");
-  if (toread <= 0) {
-    AbstractDuplexChannel::messageReadCallbackHandler(channel, size, error, 0);
-  } else {
-    channel->readMsgBody(channel->in_buf, toread, size);
-  }
-}
-
-/*static*/ void AbstractDuplexChannel::readSize(AbstractDuplexChannelPtr channel) {
-  int toread = sizeof(uint32_t) - channel->in_buf.size();
-  LOG4CXX_DEBUG(logger, " size of incoming message " << sizeof(uint32_t) 
-                        << ", currently in buffer " << channel->in_buf.size() 
-                        << " channel(" << channel.get() << ")");
-
-  if (toread < 0) {
-    AbstractDuplexChannel::sizeReadCallbackHandler(channel, boost::system::error_code(), 0);
-  } else {
-    channel->readMsgSize(channel->in_buf);
-  }
-}
-
-void AbstractDuplexChannel::startReceiving() {
-  LOG4CXX_DEBUG(logger, "DuplexChannel::startReceiving channel(" << this
-                        << ") currently receiving = " << receiving);
-
-  PubSubResponsePtr response;
-  bool inReadingState;
-  {
-    boost::lock_guard<boost::mutex> lock(receiving_lock);
-    // receiving before just return
-    if (receiving) {
-      return;
-    } 
-    receiving = true;
-
-    // if we have last response collected in previous startReceiving
-    // we need to process it, but we should process it under receiving_lock
-    // otherwise we enter dead lock
-    // subscriber#startDelivery(subscriber#queue_lock) =>
-    // channel#startReceiving(channel#receiving_lock) =>
-    // sbuscriber#messageReceived(subscriber#queue_lock)
-    if (outstanding_response.get()) {
-      response = outstanding_response;
-      outstanding_response = PubSubResponsePtr();
-    }
-
-    // if channel is in reading status wait data from remote server
-    // we don't need to insert another readSize op
-    inReadingState = reading;
-    if (!reading) {
-      reading = true;
-    }
-  }
-
-  // consume message buffered in receiving queue
-  // there is at most one message buffered when we
-  // stopReceiving between #readSize and #readMsgBody
-  if (response.get()) {
-    ChannelHandlerPtr h;
-    {
-      boost::shared_lock<boost::shared_mutex> lock(this->destruction_lock);
-      if (this->handler.get()) {
-        h = this->handler;
-      }
-    }
-    if (h.get()) {
-      h->messageReceived(shared_from_this(), response);
-    }
-  }
-
-  // if channel is not in reading state, #readSize
-  if (!inReadingState) {
-    AbstractDuplexChannel::readSize(shared_from_this());
-  }
-}
-
-bool AbstractDuplexChannel::isReceiving() {
-  return receiving;
-}
-
-bool AbstractDuplexChannel::isClosed() {
-  return closed;
-}
-
-void AbstractDuplexChannel::stopReceiving() {
-  LOG4CXX_DEBUG(logger, "DuplexChannel::stopReceiving channel(" << this << ")");
-  
-  boost::lock_guard<boost::mutex> lock(receiving_lock);
-  receiving = false;
-}
-
-void AbstractDuplexChannel::startSending() {
-  {
-    boost::shared_lock<boost::shared_mutex> lock(state_lock);
-    if (state != CONNECTED) {
-      return;
-    }
-  }
-
-  boost::lock_guard<boost::mutex> lock(sending_lock);
-  if (sending) {
-    return;
-  }
-  LOG4CXX_DEBUG(logger, "AbstractDuplexChannel::startSending channel(" << this << ")");
-  
-  WriteRequest w;
-  { 
-    boost::lock_guard<boost::mutex> lock(write_lock);
-    if (write_queue.empty()) {
-      return;
-    }
-    w = write_queue.front();
-    write_queue.pop_front();
-  }
-
-  sending = true;
-
-  std::ostream os(&out_buf);
-  uint32_t size = htonl(w.first->ByteSize());
-  os.write((char*)&size, sizeof(uint32_t));
-  
-  bool err = w.first->SerializeToOstream(&os);
-  if (!err) {
-    w.second->operationFailed(ChannelWriteException());
-    channelDisconnected(ChannelWriteException());
-    return;
-  }
-
-  writeBuffer(out_buf, w.second);
-}
-
-const HostAddress& AbstractDuplexChannel::getHostAddress() const {
-  return address;
-}
-
-void AbstractDuplexChannel::channelDisconnected(const std::exception& e) {
-  setState(DEAD);
-  
-  {
-    boost::lock_guard<boost::mutex> lock(write_lock);
-    while (!write_queue.empty()) {
-      WriteRequest w = write_queue.front();
-      write_queue.pop_front();
-      w.second->operationFailed(e);
-    }
-  }
-
-  ChannelHandlerPtr h;
-  {
-    boost::shared_lock<boost::shared_mutex> lock(destruction_lock);
-    if (handler.get()) {
-      h = handler;
-    }
-  }
-  if (h.get()) {
-    h->channelDisconnected(shared_from_this(), e);
-  }
-}
-
-void AbstractDuplexChannel::close() {
-  {
-    boost::shared_lock<boost::shared_mutex> statelock(state_lock);
-    state = DEAD;
-  }
-
-  {
-    boost::lock_guard<boost::shared_mutex> lock(destruction_lock);
-    if (closed) {
-      // some one has closed the socket.
-      return;
-    }
-    closed = true;
-    handler = ChannelHandlerPtr(); // clear the handler in case it ever referenced the channel*/
-  }
-
-  LOG4CXX_INFO(logger, "Killing duplex channel (" << this << ")");
-
-  // If we are going away, fail all transactions that haven't been completed
-  failAllTransactions();
-  closeSocket();  
-}
-
-/*static*/ void AbstractDuplexChannel::writeCallbackHandler(
-                  AbstractDuplexChannelPtr channel,
-                  OperationCallbackPtr callback,
-                  const boost::system::error_code& error, 
-                  std::size_t bytes_transferred) {
-  if (error) {
-    if (!channel->isClosed()) {
-      LOG4CXX_DEBUG(logger, "AbstractDuplexChannel::writeCallbackHandler " << error << ", " 
-                            << bytes_transferred << " channel(" << channel.get() << ")");
-    }
-    callback->operationFailed(ChannelWriteException());
-    channel->channelDisconnected(ChannelWriteException());
-    return;
-  }
-
-  callback->operationComplete();
-
-  channel->out_buf.consume(bytes_transferred);
-
-  {
-    boost::lock_guard<boost::mutex> lock(channel->sending_lock);
-    channel->sending = false;
-  }
-
-  channel->startSending();
-}
-
-void AbstractDuplexChannel::writeRequest(const PubSubRequestPtr& m,
-                                         const OperationCallbackPtr& callback) {
-  {
-    boost::shared_lock<boost::shared_mutex> lock(state_lock);
-    if (state != CONNECTED && state != CONNECTING) {
-      LOG4CXX_ERROR(logger,"Tried to write transaction [" << m->txnid() << "] to a channel [" 
-                           << this << "] which is " << (state == DEAD ? "DEAD" : "UNINITIALISED"));
-      callback->operationFailed(UninitialisedChannelException());
-      return;
-    }
-  }
-
-  { 
-    boost::lock_guard<boost::mutex> lock(write_lock);
-    WriteRequest w(m, callback);
-    write_queue.push_back(w);
-  }
-
-  startSending();
-}
-
-//
-// Transaction operations
-//
-
-/**
-   Store the transaction data for a request.
-*/
-void AbstractDuplexChannel::storeTransaction(const PubSubDataPtr& data) {
-  LOG4CXX_DEBUG(logger, "Storing txnid(" << data->getTxnId() << ") for channel(" << this << ")");
-
-  boost::lock_guard<boost::mutex> lock(txnid2data_lock);
-  txnid2data[data->getTxnId()] = data;
-}
-
-/**
-   Give the transaction back to the caller. 
-*/
-PubSubDataPtr AbstractDuplexChannel::retrieveTransaction(long txnid) {
-  boost::lock_guard<boost::mutex> lock(txnid2data_lock);
-
-  PubSubDataPtr data = txnid2data[txnid];
-  txnid2data.erase(txnid);
-  if (data == NULL) {
-    LOG4CXX_ERROR(logger, "Transaction txnid(" << txnid 
-                          << ") doesn't exist in channel (" << this << ")");
-  }
-
-  return data;
-}
-
-void AbstractDuplexChannel::failAllTransactions() {
-  boost::lock_guard<boost::mutex> lock(txnid2data_lock);
-  for (TransactionMap::iterator iter = txnid2data.begin(); iter != txnid2data.end(); ++iter) {
-    PubSubDataPtr& data = (*iter).second;
-    data->getCallback()->operationFailed(ChannelDiedException());
-  }
-  txnid2data.clear();
-}
-
-// Set state for the channel
-void AbstractDuplexChannel::setState(State s) {
-  boost::lock_guard<boost::shared_mutex> lock(state_lock);
-  state = s;
-}
-
-//
-// Basic Asio Channel Implementation
-//
-
-AsioDuplexChannel::AsioDuplexChannel(IOServicePtr& service,
-                                     const HostAddress& addr, 
-                                     const ChannelHandlerPtr& handler)
-  : AbstractDuplexChannel(service, addr, handler) {
-  this->socket = boost_socket_ptr(new boost_socket(getService()));
-  LOG4CXX_DEBUG(logger, "Creating DuplexChannel(" << this << ")");
-}
-
-AsioDuplexChannel::~AsioDuplexChannel() {
-}
-
-void AsioDuplexChannel::doConnect(const OperationCallbackPtr& callback) {
-  boost::system::error_code error = boost::asio::error::host_not_found;
-  uint32_t ip2conn = address.ip();
-  uint16_t port2conn = address.port();
-  boost::asio::ip::tcp::endpoint endp(boost::asio::ip::address_v4(ip2conn), port2conn);
-
-  socket->async_connect(endp, boost::bind(&AbstractDuplexChannel::connectCallbackHandler, 
-                                          shared_from_this(), callback,
-                                          boost::asio::placeholders::error));
-  LOG4CXX_INFO(logger, "Channel (" << this << ") fire connect operation to ip (" 
-                                   << ip2conn << ") port (" << port2conn << ")");
-}
-
-void AsioDuplexChannel::setSocketOption(boost::system::error_code& ec) {
-  boost::asio::ip::tcp::no_delay option(true);
-  socket->set_option(option, ec);
-}
-
-boost::asio::ip::tcp::endpoint AsioDuplexChannel::getLocalAddress(
-    boost::system::error_code& ec) {
-  return socket->local_endpoint(ec);
-}
-
-boost::asio::ip::tcp::endpoint AsioDuplexChannel::getRemoteAddress(
-    boost::system::error_code& ec) {
-  return socket->remote_endpoint(ec);
-}
-
-void AsioDuplexChannel::writeBuffer(boost::asio::streambuf& buffer,
-                                    const OperationCallbackPtr& callback) {
-  boost::asio::async_write(*socket, buffer,
-    boost::bind(&AbstractDuplexChannel::writeCallbackHandler, 
-    shared_from_this(), callback,
-    boost::asio::placeholders::error, 
-    boost::asio::placeholders::bytes_transferred)
-  );
-}
-
-void AsioDuplexChannel::readMsgSize(boost::asio::streambuf& buffer) {
-  boost::asio::async_read(*socket, buffer, boost::asio::transfer_at_least(sizeof(uint32_t)),
-                          boost::bind(&AbstractDuplexChannel::sizeReadCallbackHandler,
-                                      shared_from_this(),
-                                      boost::asio::placeholders::error,
-                                      boost::asio::placeholders::bytes_transferred));
-}
-
-void AsioDuplexChannel::readMsgBody(boost::asio::streambuf& buffer,
-                                    int toread, uint32_t msgSize) {
-  boost::asio::async_read(*socket, buffer, boost::asio::transfer_at_least(toread),
-                          boost::bind(&AbstractDuplexChannel::messageReadCallbackHandler,
-                                      shared_from_this(), msgSize,
-                                      boost::asio::placeholders::error,
-                                      boost::asio::placeholders::bytes_transferred));
-}
-
-void AsioDuplexChannel::closeSocket() {
-  boost::system::error_code ec;
-
-  socket->cancel(ec);
-  if (ec) {
-    LOG4CXX_WARN(logger, "Channel " << this << " canceling io error : " << ec.message().c_str());
-  }
-
-  socket->shutdown(boost::asio::ip::tcp::socket::shutdown_both, ec);
-  if (ec) {
-    LOG4CXX_WARN(logger, "Channel " << this << " shutdown error : " << ec.message().c_str());
-  }
-
-  socket->close(ec);
-  if (ec) {
-    LOG4CXX_WARN(logger, "Channel " << this << " close error : " << ec.message().c_str());
-  }
-  LOG4CXX_DEBUG(logger, "Closed socket for channel " << this << ".");
-}
-
-// SSL Context Factory
-
-SSLContextFactory::SSLContextFactory(const Configuration& conf)
-  : conf(conf),
-    sslPemFile(conf.get(Configuration::SSL_PEM_FILE,
-                        DEFAULT_SSL_PEM_FILE)) {
-}
-
-SSLContextFactory::~SSLContextFactory() {}
-
-boost_ssl_context_ptr SSLContextFactory::createSSLContext(boost::asio::io_service& service) {
-  boost_ssl_context_ptr sslCtx(new boost_ssl_context(service,
-                               boost::asio::ssl::context::sslv23_client));
-  sslCtx->set_verify_mode(boost::asio::ssl::context::verify_none);
-  if (!sslPemFile.empty()) {
-    boost::system::error_code err;
-    sslCtx->load_verify_file(sslPemFile, err);
-
-    if (err) {
-      LOG4CXX_ERROR(logger, "Failed to load verify ssl pem file : "
-                            << sslPemFile);
-      throw InvalidSSLPermFileException();
-    }
-  }
-  return sslCtx;
-}
-
-//
-// SSL Channl Implementation
-//
-
-#ifndef __APPLE__
-AsioSSLDuplexChannel::AsioSSLDuplexChannel(IOServicePtr& service,
-                                           const boost_ssl_context_ptr& sslCtx,
-                                           const HostAddress& addr,
-                                           const ChannelHandlerPtr& handler)
-  : AbstractDuplexChannel(service, addr, handler), ssl_ctx(sslCtx),
-    sslclosed(false) {
-#else
-AsioSSLDuplexChannel::AsioSSLDuplexChannel(IOServicePtr& service,
-                                           const boost_ssl_context_ptr& sslCtx,
-                                           const HostAddress& addr,
-                                           const ChannelHandlerPtr& handler)
-  : AbstractDuplexChannel(service, addr, handler), ssl_ctx(sslCtx) {
-#endif
-  ssl_socket = boost_ssl_socket_ptr(new boost_ssl_socket(getService(), *ssl_ctx));
-  LOG4CXX_DEBUG(logger, "Created SSL DuplexChannel(" << this << ")");
-}
-
-AsioSSLDuplexChannel::~AsioSSLDuplexChannel() {
-}
-
-void AsioSSLDuplexChannel::doConnect(const OperationCallbackPtr& callback) {
-  boost::system::error_code error = boost::asio::error::host_not_found;
-  uint32_t ip2conn = address.ip();
-  uint16_t port2conn = address.sslPort();
-  boost::asio::ip::tcp::endpoint endp(boost::asio::ip::address_v4(ip2conn), port2conn);
-
-  ssl_socket->lowest_layer().async_connect(endp,
-      boost::bind(&AbstractDuplexChannel::connectCallbackHandler, 
-                  shared_from_this(), callback,
-                  boost::asio::placeholders::error));
-  LOG4CXX_INFO(logger, "SSL Channel (" << this << ") fire connect operation to ip (" 
-                                       << ip2conn << ") port (" << port2conn << ")");
-}
-
-void AsioSSLDuplexChannel::setSocketOption(boost::system::error_code& ec) {
-  boost::asio::ip::tcp::no_delay option(true);
-  ssl_socket->lowest_layer().set_option(option, ec);
-}
-
-boost::asio::ip::tcp::endpoint AsioSSLDuplexChannel::getLocalAddress(
-    boost::system::error_code& ec) {
-  return ssl_socket->lowest_layer().local_endpoint(ec);
-}
-
-boost::asio::ip::tcp::endpoint AsioSSLDuplexChannel::getRemoteAddress(
-    boost::system::error_code& ec) {
-  return ssl_socket->lowest_layer().remote_endpoint(ec);
-}
-
-void AsioSSLDuplexChannel::channelConnected(const OperationCallbackPtr& callback) {
-  // for SSL channel, we had to do SSL hand shake
-  startHandShake(callback);
-  LOG4CXX_INFO(logger, "SSL Channel " << this << " fire hand shake operation");
-}
-
-void AsioSSLDuplexChannel::sslChannelConnected(const OperationCallbackPtr& callback) {
-  LOG4CXX_INFO(logger, "SSL Channel " << this << " hand shake finish!!");
-  AbstractDuplexChannel::channelConnected(callback);
-}
-
-void AsioSSLDuplexChannel::startHandShake(const OperationCallbackPtr& callback) {
-  ssl_socket->async_handshake(boost::asio::ssl::stream_base::client,
-                              boost::bind(&AsioSSLDuplexChannel::handleHandshake,
-                                          boost::dynamic_pointer_cast<AsioSSLDuplexChannel>(shared_from_this()),
-                                          callback, boost::asio::placeholders::error));
-}
-
-void AsioSSLDuplexChannel::handleHandshake(AsioSSLDuplexChannelPtr channel,
-                                           OperationCallbackPtr callback,
-                                           const boost::system::error_code& error) {
-  if (error) {
-    LOG4CXX_ERROR(logger, "SSL Channel " << channel.get() << " hand shake error : "
-                          << error.message().c_str());
-    channel->channelConnectFailed(ChannelConnectException(), callback);
-    return;
-  }
-  channel->sslChannelConnected(callback);
-}
-
-void AsioSSLDuplexChannel::writeBuffer(boost::asio::streambuf& buffer,
-                                       const OperationCallbackPtr& callback) {
-  boost::asio::async_write(*ssl_socket, buffer,
-    boost::bind(&AbstractDuplexChannel::writeCallbackHandler, 
-    shared_from_this(), callback,
-    boost::asio::placeholders::error, 
-    boost::asio::placeholders::bytes_transferred)
-  );
-}
-
-void AsioSSLDuplexChannel::readMsgSize(boost::asio::streambuf& buffer) {
-  boost::asio::async_read(*ssl_socket, buffer, boost::asio::transfer_at_least(sizeof(uint32_t)),
-                          boost::bind(&AbstractDuplexChannel::sizeReadCallbackHandler, 
-                                      shared_from_this(),
-                                      boost::asio::placeholders::error, 
-                                      boost::asio::placeholders::bytes_transferred));
-}
-
-void AsioSSLDuplexChannel::readMsgBody(boost::asio::streambuf& buffer,
-                                       int toread, uint32_t msgSize) {
-  boost::asio::async_read(*ssl_socket, buffer, boost::asio::transfer_at_least(toread),
-                          boost::bind(&AbstractDuplexChannel::messageReadCallbackHandler, 
-                                      shared_from_this(), msgSize,
-                                      boost::asio::placeholders::error, 
-                                      boost::asio::placeholders::bytes_transferred));
-}
-
-#ifndef __APPLE__
-// boost asio doesn't provide time out mechanism to shutdown ssl
-void AsioSSLDuplexChannel::sslShutdown() {
-  ssl_socket->async_shutdown(boost::bind(&AsioSSLDuplexChannel::handleSSLShutdown,
-                                         boost::shared_dynamic_cast<AsioSSLDuplexChannel>(shared_from_this()),
-                                         boost::asio::placeholders::error));
-}
-
-void AsioSSLDuplexChannel::handleSSLShutdown(const boost::system::error_code& error) {
-  if (error) {
-    LOG4CXX_ERROR(logger, "SSL Channel " << this << " shutdown error : "
-                          << error.message().c_str());
-  }
-  {
-    boost::lock_guard<boost::mutex> lock(sslclosed_lock);
-    sslclosed = true;
-  }
-  sslclosed_cond.notify_all();
-}
-#endif
-
-void AsioSSLDuplexChannel::closeSocket() {
-#ifndef __APPLE__
-  // Shutdown ssl
-  sslShutdown();
-  // time wait 
-  {
-    boost::mutex::scoped_lock lock(sslclosed_lock);
-    if (!sslclosed) {
-      sslclosed_cond.timed_wait(lock, boost::posix_time::milliseconds(1000)); 
-    }
-  }
-#endif
-  closeLowestLayer();
-}
-
-void AsioSSLDuplexChannel::closeLowestLayer() {
-  boost::system::error_code ec;
-
-  ssl_socket->lowest_layer().cancel(ec);
-  if (ec) {
-    LOG4CXX_WARN(logger, "Channel " << this << " canceling io error : " << ec.message().c_str());
-  }
-
-  ssl_socket->lowest_layer().shutdown(boost::asio::ip::tcp::socket::shutdown_both, ec);
-  if (ec) {
-    LOG4CXX_WARN(logger, "Channel " << this << " shutdown error : " << ec.message().c_str());
-  }
-
-  ssl_socket->lowest_layer().close(ec);
-  if (ec) {
-    LOG4CXX_WARN(logger, "Channel " << this << " close error : " << ec.message().c_str());
-  }
-}

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-client/src/main/cpp/lib/channel.h
----------------------------------------------------------------------
diff --git a/hedwig-client/src/main/cpp/lib/channel.h b/hedwig-client/src/main/cpp/lib/channel.h
deleted file mode 100644
index c9ef289..0000000
--- a/hedwig-client/src/main/cpp/lib/channel.h
+++ /dev/null
@@ -1,438 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-#ifndef HEDWIG_CHANNEL_H
-#define HEDWIG_CHANNEL_H
-
-#include <hedwig/protocol.h>
-#include <hedwig/callback.h>
-#include <hedwig/client.h>
-#include "util.h"
-#include "data.h"
-#include "eventdispatcher.h"
-
-#ifdef USE_BOOST_TR1
-#include <boost/tr1/memory.hpp>
-#include <boost/tr1/unordered_map.hpp>
-#else
-#include <tr1/memory>
-#include <tr1/unordered_map>
-#endif
-
-#include <google/protobuf/io/zero_copy_stream_impl.h>
-
-#include <boost/shared_ptr.hpp>
-#include <boost/enable_shared_from_this.hpp>
-
-#include <boost/asio.hpp>
-#include <boost/asio/ip/tcp.hpp>
-#include <boost/asio/ssl.hpp>
-#include <boost/function.hpp>
-#include <boost/thread/mutex.hpp>
-#include <boost/thread/shared_mutex.hpp>
-
-namespace Hedwig {
-  class ChannelException : public std::exception { };
-  class UninitialisedChannelException : public ChannelException {};
-
-  class ChannelConnectException : public ChannelException {};
-  class CannotCreateSocketException : public ChannelConnectException {};
-  class ChannelSetupException : public ChannelConnectException {};
-  class ChannelNotConnectedException : public ChannelConnectException {};
-
-  class ChannelDiedException : public ChannelException {};
-
-  class ChannelWriteException : public ChannelException {};
-  class ChannelReadException : public ChannelException {};
-  class ChannelThreadException : public ChannelException {};
-  class ChannelOutOfMemoryException : public ChannelException {};
-
-  class InvalidSSLPermFileException : public std::exception {};
-
-  class DuplexChannel;
-  typedef boost::shared_ptr<DuplexChannel> DuplexChannelPtr;
-  typedef boost::asio::ip::tcp::socket boost_socket;
-  typedef boost::shared_ptr<boost_socket> boost_socket_ptr;
-  typedef boost::asio::ssl::stream<boost_socket> boost_ssl_socket;
-  typedef boost::shared_ptr<boost_ssl_socket> boost_ssl_socket_ptr;
-
-  class ChannelHandler {
-  public:
-    virtual void messageReceived(const DuplexChannelPtr& channel, const PubSubResponsePtr& m) = 0;
-    virtual void channelConnected(const DuplexChannelPtr& channel) = 0;
-
-    virtual void channelDisconnected(const DuplexChannelPtr& channel, const std::exception& e) = 0;
-    virtual void exceptionOccurred(const DuplexChannelPtr& channel, const std::exception& e) = 0;
-
-    virtual ~ChannelHandler() {}
-  };
-
-  typedef boost::shared_ptr<ChannelHandler> ChannelHandlerPtr;
-
-  // A channel interface to send requests
-  class DuplexChannel {
-  public:
-    virtual ~DuplexChannel() {}
-
-    // Return the channel handler bound with a channel
-    virtual ChannelHandlerPtr getChannelHandler() = 0;
-
-    // Issues a connect request to the target host
-    // User could writeRequest after issued connect request, those requests should
-    // be buffered and written until the channel is connected.
-    virtual void connect() = 0;
-
-    // Issues a connect request to the target host
-    // User could writeRequest after issued connect request, those requests should
-    // be buffered and written until the channel is connected.
-    // The provided callback would be triggered after connected.
-    virtual void connect(const OperationCallbackPtr& callback) = 0;
-
-    // Write the request to underlying channel
-    // If the channel is not established, all write requests would be buffered
-    // until channel is connected.
-    virtual void writeRequest(const PubSubRequestPtr& m,
-                              const OperationCallbackPtr& callback) = 0; 
-
-    // Returns the remote address where this channel is connected to.
-    virtual const HostAddress& getHostAddress() const = 0;
-
-    // Resumes the read operation of this channel asynchronously
-    virtual void startReceiving() = 0;
-
-    // Suspends the read operation of this channel asynchronously
-    virtual void stopReceiving() = 0;
-
-    // Returns if and only if the channel will read a message
-    virtual bool isReceiving() = 0;
-
-    //
-    // Transaction operations
-    //
-
-    // Store a pub/sub request
-    virtual void storeTransaction(const PubSubDataPtr& data) = 0;
-
-    // Remove a pub/sub request
-    virtual PubSubDataPtr retrieveTransaction(long txnid) = 0;
-
-    // Fail all transactions
-    virtual void failAllTransactions() = 0;
-
-    // Handle the case that the channel is disconnected due issues found
-    // when reading or writing
-    virtual void channelDisconnected(const std::exception& e) = 0;
-
-    // Close the channel to release the resources
-    // Once a channel is closed, it can not be open again. Calling this
-    // method on a closed channel has no efffect.
-    virtual void close() = 0;
-  };
-
-  typedef boost::asio::ssl::context boost_ssl_context;
-  typedef boost::shared_ptr<boost_ssl_context> boost_ssl_context_ptr;
-
-  class SSLContextFactory {
-  public:
-    SSLContextFactory(const Configuration& conf);
-    ~SSLContextFactory();
-
-    boost_ssl_context_ptr createSSLContext(boost::asio::io_service& service);
-  private:
-    const Configuration& conf;
-    std::string sslPemFile;
-  };
-
-  typedef boost::shared_ptr<SSLContextFactory> SSLContextFactoryPtr;
-
-  class AbstractDuplexChannel;
-  typedef boost::shared_ptr<AbstractDuplexChannel> AbstractDuplexChannelPtr;
-
-  class AbstractDuplexChannel : public DuplexChannel,
-                                public boost::enable_shared_from_this<AbstractDuplexChannel> {
-  public:
-    AbstractDuplexChannel(IOServicePtr& service,
-                          const HostAddress& addr, 
-                          const ChannelHandlerPtr& handler);
-    virtual ~AbstractDuplexChannel();
-
-    virtual ChannelHandlerPtr getChannelHandler();
-
-    //
-    // Connect Operation
-    //
-
-    // Asio Connect Callback Handler
-    static void connectCallbackHandler(AbstractDuplexChannelPtr channel, 
-                                       OperationCallbackPtr callback,
-                                       const boost::system::error_code& error);
-    virtual void connect();
-    virtual void connect(const OperationCallbackPtr& callback);
-
-    //
-    // Write Operation
-    //
-
-    // Asio Write Callback Handler
-    static void writeCallbackHandler(AbstractDuplexChannelPtr channel,
-                                     OperationCallbackPtr callback, 
-                                     const boost::system::error_code& error, 
-                                     std::size_t bytes_transferred);
-    // Write request
-    virtual void writeRequest(const PubSubRequestPtr& m,
-                              const OperationCallbackPtr& callback);
-
-    // get the target host
-    virtual const HostAddress& getHostAddress() const;
-
-    static void sizeReadCallbackHandler(AbstractDuplexChannelPtr channel, 
-                                        const boost::system::error_code& error, 
-                                        std::size_t bytes_transferred);
-    static void messageReadCallbackHandler(AbstractDuplexChannelPtr channel,
-                                           std::size_t messagesize, 
-                                           const boost::system::error_code& error, 
-                                           std::size_t bytes_transferred);
-    static void readSize(AbstractDuplexChannelPtr channel);
-
-    // start receiving responses from underlying channel
-    virtual void startReceiving();
-    // is the underlying channel in receiving state
-    virtual bool isReceiving();
-    // stop receiving responses from underlying channel
-    virtual void stopReceiving();
-
-    // Store a pub/sub request
-    virtual void storeTransaction(const PubSubDataPtr& data);
-
-    // Remove a pub/sub request
-    virtual PubSubDataPtr retrieveTransaction(long txnid);
-
-    // Fail all transactions
-    virtual void failAllTransactions();
-
-    // channel is disconnected for a specified exception
-    virtual void channelDisconnected(const std::exception& e);
-
-    // close the channel
-    virtual void close();
-
-    inline boost::asio::io_service & getService() const {
-      return service;
-    }
-
-  protected:
-    // execute the connect operation
-    virtual void doConnect(const OperationCallbackPtr& callback) = 0;
-
-    virtual void doAfterConnect(const OperationCallbackPtr& callback,
-                                const boost::system::error_code& error);
-
-    // Execute the action after channel connect
-    // It would be executed in asio connect callback handler
-    virtual void setSocketOption(boost::system::error_code& ec) = 0;
-    virtual boost::asio::ip::tcp::endpoint
-            getRemoteAddress(boost::system::error_code& ec) = 0;
-    virtual boost::asio::ip::tcp::endpoint
-            getLocalAddress(boost::system::error_code& ec) = 0;
-
-    // Channel failed to connect
-    virtual void channelConnectFailed(const std::exception& e,
-                                      const OperationCallbackPtr& callback);
-    // Channel connected
-    virtual void channelConnected(const OperationCallbackPtr& callback);
-
-    // Start sending buffered requests to target host
-    void startSending();
-
-    // Write a buffer to underlying socket
-    virtual void writeBuffer(boost::asio::streambuf& buffer,
-                             const OperationCallbackPtr& callback) = 0;
-
-    // Read a message from underlying socket
-    virtual void readMsgSize(boost::asio::streambuf& buffer) = 0;
-    virtual void readMsgBody(boost::asio::streambuf& buffer,
-                             int toread, uint32_t msgSize) = 0;
-
-    // is the channel under closing
-    bool isClosed();
-
-    // close the underlying socket to release resource 
-    virtual void closeSocket() = 0;
-
-    enum State { UNINITIALISED, CONNECTING, CONNECTED, DEAD };
-    void setState(State s);
-
-    // Address
-    HostAddress address;
-  private:
-    ChannelHandlerPtr handler;
-
-    boost::asio::io_service &service;
-
-    // buffers for input stream
-    boost::asio::streambuf in_buf;
-    std::istream instream;
-
-    // only exists because protobufs can't play nice with streams
-    // (if there's more than message len in it, it tries to read all)
-    char* copy_buf;
-    std::size_t copy_buf_length;
-
-    // buffers for output stream
-    boost::asio::streambuf out_buf;
-    // write requests queue 
-    typedef std::pair<PubSubRequestPtr, OperationCallbackPtr> WriteRequest;
-    boost::mutex write_lock;
-    std::deque<WriteRequest> write_queue;
-
-    // channel state
-    State state;
-    boost::shared_mutex state_lock;
-
-    // reading state
-    bool receiving;
-    bool reading;
-    PubSubResponsePtr outstanding_response;
-    boost::mutex receiving_lock;
-
-    // sending state
-    bool sending;
-    boost::mutex sending_lock;
-
-    // flag indicates the channel is closed
-    // some callback might return when closing
-    bool closed;
-
-    // transactions
-    typedef std::tr1::unordered_map<long, PubSubDataPtr> TransactionMap;
-
-    TransactionMap txnid2data;
-    boost::mutex txnid2data_lock;
-    boost::shared_mutex destruction_lock;
-  };
-
-  class AsioDuplexChannel : public AbstractDuplexChannel {
-  public:
-    AsioDuplexChannel(IOServicePtr& service,
-                      const HostAddress& addr, 
-                      const ChannelHandlerPtr& handler);
-    virtual ~AsioDuplexChannel();
-  protected:
-    // execute the connect operation
-    virtual void doConnect(const OperationCallbackPtr& callback);
-
-    // Execute the action after channel connect
-    // It would be executed in asio connect callback handler
-    virtual void setSocketOption(boost::system::error_code& ec);
-    virtual boost::asio::ip::tcp::endpoint
-            getRemoteAddress(boost::system::error_code& ec);
-    virtual boost::asio::ip::tcp::endpoint
-            getLocalAddress(boost::system::error_code& ec);
-
-    // Write a buffer to underlying socket
-    virtual void writeBuffer(boost::asio::streambuf& buffer,
-                             const OperationCallbackPtr& callback);
-
-    // Read a message from underlying socket
-    virtual void readMsgSize(boost::asio::streambuf& buffer);
-    virtual void readMsgBody(boost::asio::streambuf& buffer,
-                             int toread, uint32_t msgSize);
-
-    // close the underlying socket to release resource 
-    virtual void closeSocket();
-  private:
-    // underlying socket
-    boost_socket_ptr socket;
-  };
-
-  typedef boost::shared_ptr<AsioDuplexChannel> AsioDuplexChannelPtr;
-
-  class AsioSSLDuplexChannel;
-  typedef boost::shared_ptr<AsioSSLDuplexChannel> AsioSSLDuplexChannelPtr;
-
-  class AsioSSLDuplexChannel : public AbstractDuplexChannel {
-  public:
-    AsioSSLDuplexChannel(IOServicePtr& service,
-                         const boost_ssl_context_ptr& sslCtx,
-                         const HostAddress& addr, 
-                         const ChannelHandlerPtr& handler);
-    virtual ~AsioSSLDuplexChannel();
-  protected:
-    // execute the connect operation
-    virtual void doConnect(const OperationCallbackPtr& callback);
-    // Execute the action after channel connect
-    // It would be executed in asio connect callback handler
-    virtual void setSocketOption(boost::system::error_code& ec);
-    virtual boost::asio::ip::tcp::endpoint
-            getRemoteAddress(boost::system::error_code& ec);
-    virtual boost::asio::ip::tcp::endpoint
-            getLocalAddress(boost::system::error_code& ec);
-
-    virtual void channelConnected(const OperationCallbackPtr& callback);
-
-    // Start SSL Hand Shake after the channel is connected
-    void startHandShake(const OperationCallbackPtr& callback);
-    // Asio Callback After Hand Shake
-    static void handleHandshake(AsioSSLDuplexChannelPtr channel,
-                                OperationCallbackPtr callback,
-                                const boost::system::error_code& error);
-
-    void sslChannelConnected(const OperationCallbackPtr& callback);
-
-    // Write a buffer to underlying socket
-    virtual void writeBuffer(boost::asio::streambuf& buffer,
-                             const OperationCallbackPtr& callback);
-
-    // Read a message from underlying socket
-    virtual void readMsgSize(boost::asio::streambuf& buffer);
-    virtual void readMsgBody(boost::asio::streambuf& buffer,
-                             int toread, uint32_t msgSize);
-
-    // close the underlying socket to release resource 
-    virtual void closeSocket();
-
-  private:
-#ifndef __APPLE__
-    // Shutdown ssl
-    void sslShutdown();
-    // Handle ssl shutdown
-    void handleSSLShutdown(const boost::system::error_code& error);
-#endif
-    // Close lowest layer
-    void closeLowestLayer();
-
-    // underlying ssl socket
-    boost_ssl_socket_ptr ssl_socket;
-    // ssl context
-    boost_ssl_context_ptr ssl_ctx;
-
-#ifndef __APPLE__
-    // Flag indicated ssl is closed.
-    bool sslclosed;
-    boost::mutex sslclosed_lock;
-    boost::condition_variable sslclosed_cond;
-#endif
-  };
-  
-
-  struct DuplexChannelPtrHash : public std::unary_function<DuplexChannelPtr, size_t> {
-    size_t operator()(const Hedwig::DuplexChannelPtr& channel) const {
-      return reinterpret_cast<size_t>(channel.get());
-    }
-  };
-};
-#endif

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-client/src/main/cpp/lib/client.cpp
----------------------------------------------------------------------
diff --git a/hedwig-client/src/main/cpp/lib/client.cpp b/hedwig-client/src/main/cpp/lib/client.cpp
deleted file mode 100644
index e98c452..0000000
--- a/hedwig-client/src/main/cpp/lib/client.cpp
+++ /dev/null
@@ -1,66 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-#ifdef HAVE_CONFIG_H
-#include <config.h>
-#endif
-
-#include <hedwig/client.h>
-#include <memory>
-
-#include "clientimpl.h"
-#include <log4cxx/logger.h>
-
-static log4cxx::LoggerPtr logger(log4cxx::Logger::getLogger("hedwig."__FILE__));
-
-using namespace Hedwig;
-
-const std::string Configuration::DEFAULT_SERVER = "hedwig.cpp.default_server";
-const std::string Configuration::MESSAGE_CONSUME_RETRY_WAIT_TIME = "hedwig.cpp.message_consume_retry_wait_time";
-const std::string Configuration::SUBSCRIBER_CONSUME_RETRY_WAIT_TIME = "hedwig.cpp.subscriber_consume_retry_wait_time";
-const std::string Configuration::MAX_MESSAGE_QUEUE_SIZE = "hedwig.cpp.max_msgqueue_size";
-const std::string Configuration::RECONNECT_SUBSCRIBE_RETRY_WAIT_TIME = "hedwig.cpp.reconnect_subscribe_retry_wait_time";
-const std::string Configuration::SYNC_REQUEST_TIMEOUT = "hedwig.cpp.sync_request_timeout";
-const std::string Configuration::SUBSCRIBER_AUTOCONSUME = "hedwig.cpp.subscriber_autoconsume";
-const std::string Configuration::NUM_DISPATCH_THREADS = "hedwig.cpp.num_dispatch_threads";
-const std::string Configuration::SUBSCRIPTION_MESSAGE_BOUND = "hedwig.cpp.subscription_message_bound";
-const std::string Configuration::SSL_ENABLED = "hedwig.cpp.ssl_enabled";
-const std::string Configuration::SSL_PEM_FILE = "hedwig.cpp.ssl_pem";
-const std::string Configuration::SUBSCRIPTION_CHANNEL_SHARING_ENABLED = "hedwig.cpp.subscription_channel_sharing_enabled";
-
-Client::Client(const Configuration& conf) {
-  LOG4CXX_DEBUG(logger, "Client::Client (" << this << ")");
-
-  clientimpl = ClientImpl::Create( conf );
-}
-
-Subscriber& Client::getSubscriber() {
-  return clientimpl->getSubscriber();
-}
-
-Publisher& Client::getPublisher() {
-  return clientimpl->getPublisher();
-}
-
-Client::~Client() {
-  LOG4CXX_DEBUG(logger, "Client::~Client (" << this << ")");
-
-  clientimpl->Destroy();
-}
-
-